diff --git a/Cargo.toml b/Cargo.toml index 1a3d809..3022d66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,9 +13,9 @@ default = ["tokio-rustls"] async-trait = "0.1.52" celes = "2.1" chrono = { version = "0.4.23", features = ["serde"] } -quick-xml = { version = "0.26", features = [ "serialize" ] } +quick-xml = { version = "0.26", features = ["serialize"] } serde = { version = "1.0", features = ["derive"] } -tokio = { version = "1.0", features = [ "full" ] } +tokio = { version = "1.0", features = ["full"] } tokio-rustls = { version = "0.23", optional = true } tracing = "0.1.29" webpki-roots = "0.22.1" diff --git a/src/client.rs b/src/client.rs index 3bc48f8..613282f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,10 +1,15 @@ -use std::time::Duration; +use std::borrow::Cow; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use tokio::sync::mpsc; use tracing::{debug, error}; use crate::common::NoExtension; pub use crate::connect::Connector; -use crate::connection::EppConnection; +use crate::connection::{Request, RequestMessage}; use crate::error::Error; use crate::hello::{Greeting, GreetingDocument, HelloDocument}; use crate::request::{Command, CommandDocument, Extension, Transaction}; @@ -13,8 +18,9 @@ use crate::xml; /// An `EppClient` provides an interface to sending EPP requests to a registry /// -/// Once initialized, the EppClient instance can serialize EPP requests to XML and send them -/// to the registry and deserialize the XML responses from the registry to local types. +/// Once initialized, the [`EppClient`] instance is the API half that is returned when creating a new connection. +/// It can serialize EPP requests to XML and send them to the registry and deserialize the XML responses from the +/// registry to local types. /// /// # Examples /// @@ -23,7 +29,7 @@ use crate::xml; /// # use std::net::ToSocketAddrs; /// # use std::time::Duration; /// # -/// use epp_client::EppClient; +/// use epp_client::connect::connect; /// use epp_client::domain::DomainCheck; /// use epp_client::common::NoExtension; /// @@ -31,11 +37,15 @@ use crate::xml; /// # async fn main() { /// // Create an instance of EppClient /// let timeout = Duration::from_secs(5); -/// let mut client = match EppClient::connect("registry_name".to_string(), ("example.com".to_owned(), 7000), None, timeout).await { +/// let (mut client, mut connection) = match connect("registry_name".into(), ("example.com".into(), 7000), None, timeout).await { /// Ok(client) => client, /// Err(e) => panic!("Failed to create EppClient: {}", e) /// }; /// +/// tokio::spawn(async move { +/// connection.run().await.unwrap(); +/// }); +/// /// // Make a EPP Hello call to the registry /// let greeting = client.hello().await.unwrap(); /// println!("{:?}", greeting); @@ -55,26 +65,26 @@ use crate::xml; /// Domain: eppdev.com, Available: 1 /// Domain: eppdev.net, Available: 1 /// ``` -pub struct EppClient { - connection: EppConnection, +pub struct EppClient { + inner: Arc, } -impl EppClient { +impl EppClient { /// Create an `EppClient` from an already established connection - pub async fn new(connector: C, registry: String, timeout: Duration) -> Result { - Ok(Self { - connection: EppConnection::new(connector, registry, timeout).await?, - }) + pub(crate) fn new(sender: mpsc::UnboundedSender, registry: Cow<'static, str>) -> Self { + Self { + inner: Arc::new(InnerClient { sender, registry }), + } } /// Executes an EPP Hello call and returns the response as a `Greeting` pub async fn hello(&mut self) -> Result { let xml = xml::serialize(&HelloDocument::default())?; - debug!(registry = self.connection.registry, "hello: {}", &xml); - let response = self.connection.transact(&xml).await?; + debug!(registry = %self.inner.registry, "hello: {}", &xml); + let response = self.inner.send(xml)?.await?; debug!( - registry = self.connection.registry, + registry = %self.inner.registry, "greeting: {}", &response ); @@ -94,14 +104,16 @@ impl EppClient { let document = CommandDocument::new(data.command, data.extension, id); let xml = xml::serialize(&document)?; - debug!(registry = self.connection.registry, "request: {}", &xml); - let response = self.connection.transact(&xml).await?; + debug!(registry = %self.inner.registry, "request: {}", &xml); + let response = self.inner.send(xml)?.await?; debug!( - registry = self.connection.registry, + registry = %self.inner.registry, "response: {}", &response ); let rsp = xml::deserialize::>(&response)?; + debug_assert!(rsp.data.tr_ids.client_tr_id.as_deref() == Some(id)); + if rsp.data.result.code.is_success() { return Ok(rsp.data); } @@ -111,32 +123,35 @@ impl EppClient { tr_ids: rsp.data.tr_ids, })); - error!(registry=self.connection.registry, %response, "Failed to deserialize response for transaction: {}", err); + error!( + registry = %self.inner.registry, + %response, + "Failed to deserialize response for transaction: {}", err + ); Err(err) } /// Accepts raw EPP XML and returns the raw EPP XML response to it. /// Not recommended for direct use but sometimes can be useful for debugging - pub async fn transact_xml(&mut self, xml: &str) -> Result { - self.connection.transact(xml).await + pub async fn transact_xml(&mut self, xml: String) -> Result { + self.inner.send(xml)?.await } /// Returns the greeting received on establishment of the connection in raw xml form - pub fn xml_greeting(&self) -> String { - String::from(&self.connection.greeting) + pub async fn xml_greeting(&self) -> Result { + self.inner.xml_greeting().await } /// Returns the greeting received on establishment of the connection as an `Greeting` - pub fn greeting(&self) -> Result { - xml::deserialize::(&self.connection.greeting).map(|obj| obj.data) + pub async fn greeting(&self) -> Result { + let greeting = self.inner.xml_greeting().await?; + xml::deserialize::(&greeting).map(|obj| obj.data) } - pub async fn reconnect(&mut self) -> Result<(), Error> { - self.connection.reconnect().await - } - - pub async fn shutdown(mut self) -> Result<(), Error> { - self.connection.shutdown().await + /// Reconnects the underlying [`Connector::Connection`] + pub async fn reconnect(&self) -> Result { + let greeting = self.inner.reconnect().await?; + xml::deserialize::(&greeting).map(|obj| obj.data) } } @@ -176,3 +191,63 @@ impl<'c, 'e, C, E> Clone for RequestData<'c, 'e, C, E> { // Manual impl because this does not depend on whether `C` and `E` are `Copy` impl<'c, 'e, C, E> Copy for RequestData<'c, 'e, C, E> {} + +struct InnerClient { + sender: mpsc::UnboundedSender, + pub registry: Cow<'static, str>, +} + +impl InnerClient { + fn send(&self, request: String) -> Result { + let (sender, receiver) = mpsc::channel(1); + let request = Request { + request: RequestMessage::Request(request), + sender, + }; + self.sender.send(request).map_err(|_| Error::Closed)?; + + Ok(InnerResponse { receiver }) + } + + /// Returns the greeting received on establishment of the connection in raw xml form + async fn xml_greeting(&self) -> Result { + let (sender, receiver) = mpsc::channel(1); + let request = Request { + request: RequestMessage::Greeting, + sender, + }; + self.sender.send(request).map_err(|_| Error::Closed)?; + + InnerResponse { receiver }.await + } + + async fn reconnect(&self) -> Result { + let (sender, receiver) = mpsc::channel(1); + let request = Request { + request: RequestMessage::Reconnect, + sender, + }; + self.sender.send(request).map_err(|_| Error::Closed)?; + + InnerResponse { receiver }.await + } +} + +// We do not need to parse any output at this point (we could do that), +// but for now we just store the receiver here. +pub(crate) struct InnerResponse { + receiver: mpsc::Receiver>, +} + +impl Future for InnerResponse { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + match this.receiver.poll_recv(cx) { + Poll::Ready(Some(response)) => Poll::Ready(response), + Poll::Ready(None) => Poll::Ready(Err(Error::Closed)), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/connect.rs b/src/connect.rs index 5c7940c..4dd503e 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::sync::Arc; use std::time::Duration; @@ -8,6 +9,7 @@ use tokio::io::AsyncWrite; #[cfg(feature = "tokio-rustls")] use tokio::net::lookup_host; use tokio::net::TcpStream; +use tokio::sync::mpsc; #[cfg(feature = "tokio-rustls")] use tokio_rustls::client::TlsStream; #[cfg(feature = "tokio-rustls")] @@ -19,6 +21,7 @@ use tracing::info; use crate::client::EppClient; use crate::common::{Certificate, PrivateKey}; use crate::connection; +use crate::connection::EppConnection; use crate::error::Error; /// Connect to the specified `server` and `hostname` over TLS @@ -33,15 +36,18 @@ use crate::error::Error; /// Use connect_with_connector for passing a specific connector. #[cfg(feature = "tokio-rustls")] pub async fn connect( - registry: String, - server: (String, u16), + registry: Cow<'static, str>, + server: (Cow<'static, str>, u16), identity: Option<(Vec, PrivateKey)>, - timeout: Duration, -) -> Result, Error> { - info!("Connecting to server: {:?}", server); - + request_timeout: Duration, +) -> Result<(EppClient, EppConnection), Error> { let connector = RustlsConnector::new(server, identity).await?; - EppClient::new(connector, registry, timeout).await + + let (sender, receiver) = mpsc::unbounded_channel(); + let client = EppClient::new(sender, registry.clone()); + let connection = EppConnection::new(connector, registry, receiver, request_timeout).await?; + + Ok((client, connection)) } /// Connect to the specified `server` and `hostname` via the passed connector. @@ -56,25 +62,29 @@ pub async fn connect( /// Use connect_with_connector for passing a specific connector. pub async fn connect_with_connector( connector: C, - registry: String, - timeout: Duration, -) -> Result, Error> + registry: Cow<'static, str>, + request_timeout: Duration, +) -> Result<(EppClient, EppConnection), Error> where C: Connector, { - EppClient::new(connector, registry, timeout).await + let (sender, receiver) = mpsc::unbounded_channel(); + let client = EppClient::new(sender, registry.clone()); + let connection = EppConnection::new(connector, registry, receiver, request_timeout).await?; + + Ok((client, connection)) } #[cfg(feature = "tokio-rustls")] pub struct RustlsConnector { inner: TlsConnector, domain: ServerName, - server: (String, u16), + server: (Cow<'static, str>, u16), } impl RustlsConnector { pub async fn new( - server: (String, u16), + server: (Cow<'static, str>, u16), identity: Option<(Vec, PrivateKey)>, ) -> Result { let mut roots = RootCertStore::empty(); @@ -103,7 +113,7 @@ impl RustlsConnector { None => builder.with_no_client_auth(), }; - let domain = server.0.as_str().try_into().map_err(|_| { + let domain = server.0.as_ref().try_into().map_err(|_| { io::Error::new( io::ErrorKind::InvalidInput, format!("Invalid domain: {}", server.0), @@ -125,7 +135,10 @@ impl Connector for RustlsConnector { async fn connect(&self, timeout: Duration) -> Result { info!("Connecting to server: {}:{}", self.server.0, self.server.1); - let addr = match lookup_host(&self.server).await?.next() { + let addr = match lookup_host((self.server.0.as_ref(), self.server.1)) + .await? + .next() + { Some(addr) => addr, None => { return Err(Error::Io(io::Error::new( diff --git a/src/connection.rs b/src/connection.rs index a88c33f..7cdf7d1 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,37 +1,51 @@ //! Manages registry connections and reading/writing to them +use std::borrow::Cow; +use std::convert::TryInto; use std::future::Future; use std::time::Duration; use std::{io, str, u32}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tracing::{debug, info}; +use tokio::sync::mpsc; +use tracing::{debug, error, info, trace, warn}; use crate::connect::Connector; use crate::error::Error; -/// EPP Connection struct with some metadata for the connection -pub(crate) struct EppConnection { - pub registry: String, +/// EPP Connection +/// +/// This is the I/O half, returned when creating a new connection, that performs the actual I/O and thus +/// should be spawned in it's own task. +/// +/// [`EppConnection`] provides a [`EppConnection::run`](EppConnection::run) method, which only resolves when the connection is closed, +/// either because a fatal error has occurred, or because its associated [`EppClient`](super::EppClient) has been dropped +/// and all outstanding work has been completed. +pub struct EppConnection { + registry: Cow<'static, str>, connector: C, stream: C::Connection, - pub greeting: String, + greeting: String, timeout: Duration, + /// A receiver for receiving requests from [`EppClients`](super::client::EppClient) for the underlying connection. + receiver: mpsc::UnboundedReceiver, state: ConnectionState, } impl EppConnection { pub(crate) async fn new( connector: C, - registry: String, - timeout: Duration, + registry: Cow<'static, str>, + receiver: mpsc::UnboundedReceiver, + request_timeout: Duration, ) -> Result { let mut this = Self { registry, - stream: connector.connect(timeout).await?, + stream: connector.connect(request_timeout).await?, connector, + receiver, greeting: String::new(), - timeout, + timeout: request_timeout, state: Default::default(), }; @@ -40,7 +54,39 @@ impl EppConnection { Ok(this) } - /// Constructs an EPP XML request in the required form and sends it to the server + /// Runs the connection + /// + /// This will loops and awaits new requests from the client half and sends the request to the epp server + /// awaiting a response. + /// + /// Spawn this in a task and await run to resolve. + /// This resolves when the connection to the epp server gets dropped. + /// + /// # Examples + /// ```[no_compile] + /// let mut connection = + /// tokio::spawn(async move { + /// if let Err(err) = connection.run().await { + /// error!("connection failed: {err}") + /// } + /// }); + pub async fn run(&mut self) -> Result<(), Error> { + while let Some(message) = self.message().await { + match message { + Ok(message) => info!("{message}"), + Err(err) => { + error!("{err}"); + break; + } + } + } + trace!("stopping EppConnection task"); + Ok(()) + } + + /// Sends the given content to the used [`Connector::Connection`] + /// + /// Returns an EOF error when writing to the stream results in 0 bytes written. async fn write_epp_request(&mut self, content: &str) -> Result<(), Error> { let len = content.len(); @@ -54,12 +100,26 @@ impl EppConnection { buf[4..].clone_from_slice(content.as_bytes()); let wrote = timeout(self.timeout, self.stream.write(&buf)).await?; - debug!(registry = self.registry, "Wrote {} bytes", wrote); + // A write return value of 0 means the underlying socket + // does no longer accept any data. + if wrote == 0 { + warn!("Got EOF while writing"); + self.state = ConnectionState::Closed; + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!("{}: unexpected eof", self.registry), + ) + .into()); + } + + debug!(registry = %self.registry, "Wrote {} bytes", wrote); Ok(()) } /// Receives response from the socket and converts it into an EPP XML string async fn read_epp_response(&mut self) -> Result { + // We're looking for the frame header which tells us how long the response will be. + // The frame header is a 32-bit (4-byte) big-endian unsigned integer. let mut buf = [0u8; 4]; timeout(self.timeout, self.stream.read_exact(&mut buf)).await?; @@ -67,7 +127,7 @@ impl EppConnection { let message_size = buf_size - 4; debug!( - registry = self.registry, + registry = %self.registry, "Response buffer size: {}", message_size ); @@ -76,10 +136,10 @@ impl EppConnection { loop { let read = timeout(self.timeout, self.stream.read(&mut buf[read_size..])).await?; - debug!(registry = self.registry, "Read: {} bytes", read); + debug!(registry = %self.registry, "Read: {} bytes", read); read_size += read; - debug!(registry = self.registry, "Total read: {} bytes", read_size); + debug!(registry = %self.registry, "Total read: {} bytes", read_size); if read == 0 { self.state = ConnectionState::Closed; @@ -96,8 +156,8 @@ impl EppConnection { Ok(String::from_utf8(buf)?) } - pub(crate) async fn reconnect(&mut self) -> Result<(), Error> { - debug!(registry = self.registry, "reconnecting"); + async fn reconnect(&mut self) -> Result<(), Error> { + debug!(registry = %self.registry, "reconnecting"); self.state = ConnectionState::Opening; self.stream = self.connector.connect(self.timeout).await?; self.greeting = self.read_epp_response().await?; @@ -105,30 +165,68 @@ impl EppConnection { Ok(()) } - /// Sends an EPP XML request to the registry and return the response - /// receieved to the request - pub(crate) async fn transact(&mut self, content: &str) -> Result { - if self.state != ConnectionState::Open { - debug!(registry = self.registry, " connection not ready"); - self.reconnect().await?; + async fn wait_for_shutdown(&mut self) -> Result<(), io::Error> { + self.state = ConnectionState::Closing; + match self.stream.shutdown().await { + Ok(_) => { + self.state = ConnectionState::Closed; + Ok(()) + } + Err(err) => Err(err), } - - debug!(registry = self.registry, " request: {}", content); - self.write_epp_request(content).await?; - - let response = self.read_epp_response().await?; - debug!(registry = self.registry, " response: {}", response); - - Ok(response) } - /// Closes the socket and shuts the connection - pub(crate) async fn shutdown(&mut self) -> Result<(), Error> { - info!(registry = self.registry, "Closing connection"); - self.state = ConnectionState::Closing; - timeout(self.timeout, self.stream.shutdown()).await?; - self.state = ConnectionState::Closed; - Ok(()) + /// This is the main method of the I/O tasks + /// + /// It will try to get a request, write it to the wire and waits for the response. + /// + /// Once this returns `None`, or `Ok(Err(_))`, the connection is expected to be closed. + async fn message(&mut self) -> Option, Error>> { + // In theory this can be even speed up as the underlying stream is in our case bi-directional. + // But as the EPP RFC does not guarantee the order of responses we would need to + // match based on the transactions id. We can look into adding support for this in + // future. + loop { + if self.state == ConnectionState::Closed { + return None; + } + + // Wait for new request + let request = self.receiver.recv().await; + let Some(request) = request else { + // The client got dropped. We can close the connection. + match self.wait_for_shutdown().await { + Ok(_) => return None, + Err(err) => return Some(Err(err.into())), + } + }; + + let response = match request.request { + RequestMessage::Greeting => Ok(self.greeting.clone()), + RequestMessage::Request(request) => { + if let Err(err) = self.write_epp_request(&request).await { + return Some(Err(err)); + } + timeout(self.timeout, self.read_epp_response()).await + } + RequestMessage::Reconnect => match self.reconnect().await { + Ok(_) => Ok(self.greeting.clone()), + Err(err) => { + // In this case we are not sure if the connection is in tact. Best we error out. + let _ = request.sender.send(Err(Error::Reconnect)).await; + return Some(Err(err)); + } + }, + }; + + // Awaiting `send` should not block this I/O tasks unless we try to write multiple responses to the same bounded channel. + // As this crate is structured to create a new bounded channel for each request, this ok here. + if request.sender.send(response).await.is_err() { + // If the receive half of the sender is dropped, (i.e. the `Client`s `Future` is canceled) + // we can just ignore the err here and return to let `run` print something for this task. + return Some(Ok("request was canceled. Client dropped.".into())); + } + } } } @@ -151,3 +249,17 @@ enum ConnectionState { Closing, Closed, } + +pub(crate) struct Request { + pub(crate) request: RequestMessage, + pub(crate) sender: mpsc::Sender>, +} + +pub(crate) enum RequestMessage { + /// Request the stored server greeting + Greeting, + /// Reconnect the underlying [`Connector::Connection`] + Reconnect, + /// Raw request to be sent to the connected EPP Server + Request(String), +} diff --git a/src/error.rs b/src/error.rs index 806e8c9..1714213 100644 --- a/src/error.rs +++ b/src/error.rs @@ -12,8 +12,10 @@ use crate::response::ResponseStatus; /// Error enum holding the possible error types #[derive(Debug)] pub enum Error { + Closed, Command(Box), Io(std::io::Error), + Reconnect, Timeout, Xml(Box), Other(Box), @@ -24,10 +26,12 @@ impl StdError for Error {} impl Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { + Error::Closed => write!(f, "closed"), Error::Command(e) => { write!(f, "command error: {}", e.result.message) } Error::Io(e) => write!(f, "I/O error: {}", e), + Error::Reconnect => write!(f, "failed to reconnect"), Error::Timeout => write!(f, "timeout"), Error::Xml(e) => write!(f, "(de)serialization error: {}", e), Error::Other(e) => write!(f, "error: {}", e), diff --git a/src/lib.rs b/src/lib.rs index 1dd7c1b..9b220e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,7 +59,7 @@ //! use std::net::ToSocketAddrs; //! use std::time::Duration; //! -//! use epp_client::EppClient; +//! use epp_client::connect::connect; //! use epp_client::domain::DomainCheck; //! use epp_client::login::Login; //! @@ -67,11 +67,15 @@ //! async fn main() { //! // Create an instance of EppClient //! let timeout = Duration::from_secs(5); -//! let mut client = match EppClient::connect("registry_name".to_string(), ("example.com".to_owned(), 7000), None, timeout).await { -//! Ok(client) => client, +//! let (mut client, mut connection) = match connect("registry_name".into(), ("example.com".into(), 7000), None, timeout).await { +//! Ok(c) => c, //! Err(e) => panic!("Failed to create EppClient: {}", e) //! }; //! +//! tokio::spawn(async move { +//! connection.run().await.unwrap(); +//! }); +//! //! let login = Login::new("username", "password", None, None); //! client.transact(&login, "transaction-id").await.unwrap(); //! diff --git a/tests/basic.rs b/tests/basic.rs index 19232d0..4fb5873 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -8,6 +8,7 @@ use epp_client::connect::connect_with_connector; use regex::Regex; use tokio::time::timeout; use tokio_test::io::Builder; +use tracing::trace; use epp_client::domain::{DomainCheck, DomainContact, DomainCreate, Period}; use epp_client::login::Login; @@ -99,11 +100,19 @@ async fn client() { } } - let mut client = connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5)) - .await - .unwrap(); + let (mut client, mut connection) = + connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5)) + .await + .unwrap(); - assert_eq!(client.xml_greeting(), xml("response/greeting.xml")); + tokio::spawn(async move { + connection.run().await.unwrap(); + }); + + assert_eq!( + client.xml_greeting().await.unwrap(), + xml("response/greeting.xml") + ); let rsp = client .transact( &Login::new( @@ -116,6 +125,7 @@ async fn client() { ) .await .unwrap(); + assert_eq!(rsp.result.code, ResultCode::CommandCompletedSuccessfully); assert_eq!(rsp.result.code, ResultCode::CommandCompletedSuccessfully); @@ -175,11 +185,20 @@ async fn dropped() { } } - let mut client = connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5)) - .await - .unwrap(); + let (mut client, mut connection) = + connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5)) + .await + .unwrap(); + tokio::spawn(async move { + connection.run().await.unwrap(); + trace!("connection future resolved successfully") + }); - assert_eq!(client.xml_greeting(), xml("response/greeting.xml")); + trace!("Trying to get greeting"); + assert_eq!( + client.xml_greeting().await.unwrap(), + xml("response/greeting.xml") + ); let rsp = client .transact( &Login::new( @@ -240,4 +259,40 @@ async fn dropped() { let rsp = client.transact(&create, CLTRID).await.unwrap(); assert_eq!(rsp.result.code, ResultCode::CommandCompletedSuccessfully); + + drop(client); +} + +#[tokio::test] +async fn drop_client() { + let _guard = log_to_stdout(); + + struct FakeConnector; + + #[async_trait] + impl epp_client::client::Connector for FakeConnector { + type Connection = tokio_test::io::Mock; + + async fn connect(&self, _: Duration) -> Result { + Ok(build_stream(&["response/greeting.xml"]).build()) + } + } + + let (client, mut connection) = + connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5)) + .await + .unwrap(); + + let handle = tokio::spawn(async move { + connection.run().await.unwrap(); + trace!("connection future resolved successfully") + }); + + assert_eq!( + client.xml_greeting().await.unwrap(), + xml("response/greeting.xml") + ); + + drop(client); + assert!(handle.await.is_ok()); }