mirror of
synced 2025-03-03 05:02:10 +00:00
Split client and connection into two halfes
This rewrites the logic to process requests to follow the I/O task pattern. This makes it easier to implement things like keepalives as well as dealing with dropped futures.
This commit is contained in:
@ -13,9 +13,9 @@ default = ["tokio-rustls"]
async-trait = "0.1.52"
celes = "2.1"
chrono = { version = "0.4.23", features = ["serde"] }
quick-xml = { version = "0.26", features = [ "serialize" ] }
quick-xml = { version = "0.26", features = ["serialize"] }
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.0", features = [ "full" ] }
tokio = { version = "1.0", features = ["full"] }
tokio-rustls = { version = "0.23", optional = true }
tracing = "0.1.29"
webpki-roots = "0.22.1"
@ -1,10 +1,15 @@
use std::time::Duration;
use std::borrow::Cow;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tracing::{debug, error};
use crate::common::NoExtension;
pub use crate::connect::Connector;
use crate::connection::EppConnection;
use crate::connection::{Request, RequestMessage};
use crate::error::Error;
use crate::hello::{Greeting, GreetingDocument, HelloDocument};
use crate::request::{Command, CommandDocument, Extension, Transaction};
@ -13,8 +18,9 @@ use crate::xml;
/// An `EppClient` provides an interface to sending EPP requests to a registry
/// Once initialized, the EppClient instance can serialize EPP requests to XML and send them
/// to the registry and deserialize the XML responses from the registry to local types.
/// Once initialized, the [`EppClient`] instance is the API half that is returned when creating a new connection.
/// It can serialize EPP requests to XML and send them to the registry and deserialize the XML responses from the
/// registry to local types.
/// # Examples
@ -23,7 +29,7 @@ use crate::xml;
/// # use std::net::ToSocketAddrs;
/// # use std::time::Duration;
/// #
/// use epp_client::EppClient;
/// use epp_client::connect::connect;
/// use epp_client::domain::DomainCheck;
/// use epp_client::common::NoExtension;
@ -31,11 +37,15 @@ use crate::xml;
/// # async fn main() {
/// // Create an instance of EppClient
/// let timeout = Duration::from_secs(5);
/// let mut client = match EppClient::connect("registry_name".to_string(), ("example.com".to_owned(), 7000), None, timeout).await {
/// let (mut client, mut connection) = match connect("registry_name".into(), ("example.com".into(), 7000), None, timeout).await {
/// Ok(client) => client,
/// Err(e) => panic!("Failed to create EppClient: {}", e)
/// };
/// tokio::spawn(async move {
/// connection.run().await.unwrap();
/// });
/// // Make a EPP Hello call to the registry
/// let greeting = client.hello().await.unwrap();
/// println!("{:?}", greeting);
@ -55,26 +65,26 @@ use crate::xml;
/// Domain: eppdev.com, Available: 1
/// Domain: eppdev.net, Available: 1
/// ```
pub struct EppClient<C: Connector> {
connection: EppConnection<C>,
pub struct EppClient {
inner: Arc<InnerClient>,
impl<C: Connector> EppClient<C> {
impl EppClient {
/// Create an `EppClient` from an already established connection
pub async fn new(connector: C, registry: String, timeout: Duration) -> Result<Self, Error> {
Ok(Self {
connection: EppConnection::new(connector, registry, timeout).await?,
pub(crate) fn new(sender: mpsc::UnboundedSender<Request>, registry: Cow<'static, str>) -> Self {
Self {
inner: Arc::new(InnerClient { sender, registry }),
/// Executes an EPP Hello call and returns the response as a `Greeting`
pub async fn hello(&mut self) -> Result<Greeting, Error> {
let xml = xml::serialize(&HelloDocument::default())?;
debug!(registry = self.connection.registry, "hello: {}", &xml);
let response = self.connection.transact(&xml).await?;
debug!(registry = %self.inner.registry, "hello: {}", &xml);
let response = self.inner.send(xml)?.await?;
registry = self.connection.registry,
registry = %self.inner.registry,
"greeting: {}", &response
@ -94,14 +104,16 @@ impl<C: Connector> EppClient<C> {
let document = CommandDocument::new(data.command, data.extension, id);
let xml = xml::serialize(&document)?;
debug!(registry = self.connection.registry, "request: {}", &xml);
let response = self.connection.transact(&xml).await?;
debug!(registry = %self.inner.registry, "request: {}", &xml);
let response = self.inner.send(xml)?.await?;
registry = self.connection.registry,
registry = %self.inner.registry,
"response: {}", &response
let rsp = xml::deserialize::<ResponseDocument<Cmd::Response, Ext::Response>>(&response)?;
debug_assert!(rsp.data.tr_ids.client_tr_id.as_deref() == Some(id));
if rsp.data.result.code.is_success() {
return Ok(rsp.data);
@ -111,32 +123,35 @@ impl<C: Connector> EppClient<C> {
tr_ids: rsp.data.tr_ids,
error!(registry=self.connection.registry, %response, "Failed to deserialize response for transaction: {}", err);
registry = %self.inner.registry,
"Failed to deserialize response for transaction: {}", err
/// Accepts raw EPP XML and returns the raw EPP XML response to it.
/// Not recommended for direct use but sometimes can be useful for debugging
pub async fn transact_xml(&mut self, xml: &str) -> Result<String, Error> {
pub async fn transact_xml(&mut self, xml: String) -> Result<String, Error> {
/// Returns the greeting received on establishment of the connection in raw xml form
pub fn xml_greeting(&self) -> String {
pub async fn xml_greeting(&self) -> Result<String, Error> {
/// Returns the greeting received on establishment of the connection as an `Greeting`
pub fn greeting(&self) -> Result<Greeting, Error> {
xml::deserialize::<GreetingDocument>(&self.connection.greeting).map(|obj| obj.data)
pub async fn greeting(&self) -> Result<Greeting, Error> {
let greeting = self.inner.xml_greeting().await?;
xml::deserialize::<GreetingDocument>(&greeting).map(|obj| obj.data)
pub async fn reconnect(&mut self) -> Result<(), Error> {
pub async fn shutdown(mut self) -> Result<(), Error> {
/// Reconnects the underlying [`Connector::Connection`]
pub async fn reconnect(&self) -> Result<Greeting, Error> {
let greeting = self.inner.reconnect().await?;
xml::deserialize::<GreetingDocument>(&greeting).map(|obj| obj.data)
@ -176,3 +191,63 @@ impl<'c, 'e, C, E> Clone for RequestData<'c, 'e, C, E> {
// Manual impl because this does not depend on whether `C` and `E` are `Copy`
impl<'c, 'e, C, E> Copy for RequestData<'c, 'e, C, E> {}
struct InnerClient {
sender: mpsc::UnboundedSender<Request>,
pub registry: Cow<'static, str>,
impl InnerClient {
fn send(&self, request: String) -> Result<InnerResponse, Error> {
let (sender, receiver) = mpsc::channel(1);
let request = Request {
request: RequestMessage::Request(request),
self.sender.send(request).map_err(|_| Error::Closed)?;
Ok(InnerResponse { receiver })
/// Returns the greeting received on establishment of the connection in raw xml form
async fn xml_greeting(&self) -> Result<String, Error> {
let (sender, receiver) = mpsc::channel(1);
let request = Request {
request: RequestMessage::Greeting,
self.sender.send(request).map_err(|_| Error::Closed)?;
InnerResponse { receiver }.await
async fn reconnect(&self) -> Result<String, Error> {
let (sender, receiver) = mpsc::channel(1);
let request = Request {
request: RequestMessage::Reconnect,
self.sender.send(request).map_err(|_| Error::Closed)?;
InnerResponse { receiver }.await
// We do not need to parse any output at this point (we could do that),
// but for now we just store the receiver here.
pub(crate) struct InnerResponse {
receiver: mpsc::Receiver<Result<String, Error>>,
impl Future for InnerResponse {
type Output = Result<String, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match this.receiver.poll_recv(cx) {
Poll::Ready(Some(response)) => Poll::Ready(response),
Poll::Ready(None) => Poll::Ready(Err(Error::Closed)),
Poll::Pending => Poll::Pending,
@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::sync::Arc;
use std::time::Duration;
@ -8,6 +9,7 @@ use tokio::io::AsyncWrite;
#[cfg(feature = "tokio-rustls")]
use tokio::net::lookup_host;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
#[cfg(feature = "tokio-rustls")]
use tokio_rustls::client::TlsStream;
#[cfg(feature = "tokio-rustls")]
@ -19,6 +21,7 @@ use tracing::info;
use crate::client::EppClient;
use crate::common::{Certificate, PrivateKey};
use crate::connection;
use crate::connection::EppConnection;
use crate::error::Error;
/// Connect to the specified `server` and `hostname` over TLS
@ -33,15 +36,18 @@ use crate::error::Error;
/// Use connect_with_connector for passing a specific connector.
#[cfg(feature = "tokio-rustls")]
pub async fn connect(
registry: String,
server: (String, u16),
registry: Cow<'static, str>,
server: (Cow<'static, str>, u16),
identity: Option<(Vec<Certificate>, PrivateKey)>,
timeout: Duration,
) -> Result<EppClient<RustlsConnector>, Error> {
info!("Connecting to server: {:?}", server);
request_timeout: Duration,
) -> Result<(EppClient, EppConnection<RustlsConnector>), Error> {
let connector = RustlsConnector::new(server, identity).await?;
EppClient::new(connector, registry, timeout).await
let (sender, receiver) = mpsc::unbounded_channel();
let client = EppClient::new(sender, registry.clone());
let connection = EppConnection::new(connector, registry, receiver, request_timeout).await?;
Ok((client, connection))
/// Connect to the specified `server` and `hostname` via the passed connector.
@ -56,25 +62,29 @@ pub async fn connect(
/// Use connect_with_connector for passing a specific connector.
pub async fn connect_with_connector<C>(
connector: C,
registry: String,
timeout: Duration,
) -> Result<EppClient<C>, Error>
registry: Cow<'static, str>,
request_timeout: Duration,
) -> Result<(EppClient, EppConnection<C>), Error>
C: Connector,
EppClient::new(connector, registry, timeout).await
let (sender, receiver) = mpsc::unbounded_channel();
let client = EppClient::new(sender, registry.clone());
let connection = EppConnection::new(connector, registry, receiver, request_timeout).await?;
Ok((client, connection))
#[cfg(feature = "tokio-rustls")]
pub struct RustlsConnector {
inner: TlsConnector,
domain: ServerName,
server: (String, u16),
server: (Cow<'static, str>, u16),
impl RustlsConnector {
pub async fn new(
server: (String, u16),
server: (Cow<'static, str>, u16),
identity: Option<(Vec<Certificate>, PrivateKey)>,
) -> Result<Self, Error> {
let mut roots = RootCertStore::empty();
@ -103,7 +113,7 @@ impl RustlsConnector {
None => builder.with_no_client_auth(),
let domain = server.0.as_str().try_into().map_err(|_| {
let domain = server.0.as_ref().try_into().map_err(|_| {
format!("Invalid domain: {}", server.0),
@ -125,7 +135,10 @@ impl Connector for RustlsConnector {
async fn connect(&self, timeout: Duration) -> Result<Self::Connection, Error> {
info!("Connecting to server: {}:{}", self.server.0, self.server.1);
let addr = match lookup_host(&self.server).await?.next() {
let addr = match lookup_host((self.server.0.as_ref(), self.server.1))
Some(addr) => addr,
None => {
return Err(Error::Io(io::Error::new(
@ -1,37 +1,51 @@
//! Manages registry connections and reading/writing to them
use std::borrow::Cow;
use std::convert::TryInto;
use std::future::Future;
use std::time::Duration;
use std::{io, str, u32};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{debug, info};
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};
use crate::connect::Connector;
use crate::error::Error;
/// EPP Connection struct with some metadata for the connection
pub(crate) struct EppConnection<C: Connector> {
pub registry: String,
/// EPP Connection
/// This is the I/O half, returned when creating a new connection, that performs the actual I/O and thus
/// should be spawned in it's own task.
/// [`EppConnection`] provides a [`EppConnection::run`](EppConnection::run) method, which only resolves when the connection is closed,
/// either because a fatal error has occurred, or because its associated [`EppClient`](super::EppClient) has been dropped
/// and all outstanding work has been completed.
pub struct EppConnection<C: Connector> {
registry: Cow<'static, str>,
connector: C,
stream: C::Connection,
pub greeting: String,
greeting: String,
timeout: Duration,
/// A receiver for receiving requests from [`EppClients`](super::client::EppClient) for the underlying connection.
receiver: mpsc::UnboundedReceiver<Request>,
state: ConnectionState,
impl<C: Connector> EppConnection<C> {
pub(crate) async fn new(
connector: C,
registry: String,
timeout: Duration,
registry: Cow<'static, str>,
receiver: mpsc::UnboundedReceiver<Request>,
request_timeout: Duration,
) -> Result<Self, Error> {
let mut this = Self {
stream: connector.connect(timeout).await?,
stream: connector.connect(request_timeout).await?,
greeting: String::new(),
timeout: request_timeout,
state: Default::default(),
@ -40,7 +54,39 @@ impl<C: Connector> EppConnection<C> {
/// Constructs an EPP XML request in the required form and sends it to the server
/// Runs the connection
/// This will loops and awaits new requests from the client half and sends the request to the epp server
/// awaiting a response.
/// Spawn this in a task and await run to resolve.
/// This resolves when the connection to the epp server gets dropped.
/// # Examples
/// ```[no_compile]
/// let mut connection = <obtained via connect::connect()>
/// tokio::spawn(async move {
/// if let Err(err) = connection.run().await {
/// error!("connection failed: {err}")
/// }
/// });
pub async fn run(&mut self) -> Result<(), Error> {
while let Some(message) = self.message().await {
match message {
Ok(message) => info!("{message}"),
Err(err) => {
trace!("stopping EppConnection task");
/// Sends the given content to the used [`Connector::Connection`]
/// Returns an EOF error when writing to the stream results in 0 bytes written.
async fn write_epp_request(&mut self, content: &str) -> Result<(), Error> {
let len = content.len();
@ -54,12 +100,26 @@ impl<C: Connector> EppConnection<C> {
let wrote = timeout(self.timeout, self.stream.write(&buf)).await?;
debug!(registry = self.registry, "Wrote {} bytes", wrote);
// A write return value of 0 means the underlying socket
// does no longer accept any data.
if wrote == 0 {
warn!("Got EOF while writing");
self.state = ConnectionState::Closed;
return Err(io::Error::new(
format!("{}: unexpected eof", self.registry),
debug!(registry = %self.registry, "Wrote {} bytes", wrote);
/// Receives response from the socket and converts it into an EPP XML string
async fn read_epp_response(&mut self) -> Result<String, Error> {
// We're looking for the frame header which tells us how long the response will be.
// The frame header is a 32-bit (4-byte) big-endian unsigned integer.
let mut buf = [0u8; 4];
timeout(self.timeout, self.stream.read_exact(&mut buf)).await?;
@ -67,7 +127,7 @@ impl<C: Connector> EppConnection<C> {
let message_size = buf_size - 4;
registry = self.registry,
registry = %self.registry,
"Response buffer size: {}", message_size
@ -76,10 +136,10 @@ impl<C: Connector> EppConnection<C> {
loop {
let read = timeout(self.timeout, self.stream.read(&mut buf[read_size..])).await?;
debug!(registry = self.registry, "Read: {} bytes", read);
debug!(registry = %self.registry, "Read: {} bytes", read);
read_size += read;
debug!(registry = self.registry, "Total read: {} bytes", read_size);
debug!(registry = %self.registry, "Total read: {} bytes", read_size);
if read == 0 {
self.state = ConnectionState::Closed;
@ -96,8 +156,8 @@ impl<C: Connector> EppConnection<C> {
pub(crate) async fn reconnect(&mut self) -> Result<(), Error> {
debug!(registry = self.registry, "reconnecting");
async fn reconnect(&mut self) -> Result<(), Error> {
debug!(registry = %self.registry, "reconnecting");
self.state = ConnectionState::Opening;
self.stream = self.connector.connect(self.timeout).await?;
self.greeting = self.read_epp_response().await?;
@ -105,30 +165,68 @@ impl<C: Connector> EppConnection<C> {
/// Sends an EPP XML request to the registry and return the response
/// receieved to the request
pub(crate) async fn transact(&mut self, content: &str) -> Result<String, Error> {
if self.state != ConnectionState::Open {
debug!(registry = self.registry, " connection not ready");
async fn wait_for_shutdown(&mut self) -> Result<(), io::Error> {
self.state = ConnectionState::Closing;
match self.stream.shutdown().await {
Ok(_) => {
self.state = ConnectionState::Closed;
Err(err) => Err(err),
debug!(registry = self.registry, " request: {}", content);
let response = self.read_epp_response().await?;
debug!(registry = self.registry, " response: {}", response);
/// Closes the socket and shuts the connection
pub(crate) async fn shutdown(&mut self) -> Result<(), Error> {
info!(registry = self.registry, "Closing connection");
self.state = ConnectionState::Closing;
timeout(self.timeout, self.stream.shutdown()).await?;
self.state = ConnectionState::Closed;
/// This is the main method of the I/O tasks
/// It will try to get a request, write it to the wire and waits for the response.
/// Once this returns `None`, or `Ok(Err(_))`, the connection is expected to be closed.
async fn message(&mut self) -> Option<Result<Cow<'static, str>, Error>> {
// In theory this can be even speed up as the underlying stream is in our case bi-directional.
// But as the EPP RFC does not guarantee the order of responses we would need to
// match based on the transactions id. We can look into adding support for this in
// future.
loop {
if self.state == ConnectionState::Closed {
return None;
// Wait for new request
let request = self.receiver.recv().await;
let Some(request) = request else {
// The client got dropped. We can close the connection.
match self.wait_for_shutdown().await {
Ok(_) => return None,
Err(err) => return Some(Err(err.into())),
let response = match request.request {
RequestMessage::Greeting => Ok(self.greeting.clone()),
RequestMessage::Request(request) => {
if let Err(err) = self.write_epp_request(&request).await {
return Some(Err(err));
timeout(self.timeout, self.read_epp_response()).await
RequestMessage::Reconnect => match self.reconnect().await {
Ok(_) => Ok(self.greeting.clone()),
Err(err) => {
// In this case we are not sure if the connection is in tact. Best we error out.
let _ = request.sender.send(Err(Error::Reconnect)).await;
return Some(Err(err));
// Awaiting `send` should not block this I/O tasks unless we try to write multiple responses to the same bounded channel.
// As this crate is structured to create a new bounded channel for each request, this ok here.
if request.sender.send(response).await.is_err() {
// If the receive half of the sender is dropped, (i.e. the `Client`s `Future` is canceled)
// we can just ignore the err here and return to let `run` print something for this task.
return Some(Ok("request was canceled. Client dropped.".into()));
@ -151,3 +249,17 @@ enum ConnectionState {
pub(crate) struct Request {
pub(crate) request: RequestMessage,
pub(crate) sender: mpsc::Sender<Result<String, Error>>,
pub(crate) enum RequestMessage {
/// Request the stored server greeting
/// Reconnect the underlying [`Connector::Connection`]
/// Raw request to be sent to the connected EPP Server
@ -12,8 +12,10 @@ use crate::response::ResponseStatus;
/// Error enum holding the possible error types
pub enum Error {
Xml(Box<dyn StdError + Send + Sync>),
Other(Box<dyn StdError + Send + Sync>),
@ -24,10 +26,12 @@ impl StdError for Error {}
impl Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::Closed => write!(f, "closed"),
Error::Command(e) => {
write!(f, "command error: {}", e.result.message)
Error::Io(e) => write!(f, "I/O error: {}", e),
Error::Reconnect => write!(f, "failed to reconnect"),
Error::Timeout => write!(f, "timeout"),
Error::Xml(e) => write!(f, "(de)serialization error: {}", e),
Error::Other(e) => write!(f, "error: {}", e),
@ -59,7 +59,7 @@
//! use std::net::ToSocketAddrs;
//! use std::time::Duration;
//! use epp_client::EppClient;
//! use epp_client::connect::connect;
//! use epp_client::domain::DomainCheck;
//! use epp_client::login::Login;
@ -67,11 +67,15 @@
//! async fn main() {
//! // Create an instance of EppClient
//! let timeout = Duration::from_secs(5);
//! let mut client = match EppClient::connect("registry_name".to_string(), ("example.com".to_owned(), 7000), None, timeout).await {
//! Ok(client) => client,
//! let (mut client, mut connection) = match connect("registry_name".into(), ("example.com".into(), 7000), None, timeout).await {
//! Ok(c) => c,
//! Err(e) => panic!("Failed to create EppClient: {}", e)
//! };
//! tokio::spawn(async move {
//! connection.run().await.unwrap();
//! });
//! let login = Login::new("username", "password", None, None);
//! client.transact(&login, "transaction-id").await.unwrap();
@ -8,6 +8,7 @@ use epp_client::connect::connect_with_connector;
use regex::Regex;
use tokio::time::timeout;
use tokio_test::io::Builder;
use tracing::trace;
use epp_client::domain::{DomainCheck, DomainContact, DomainCreate, Period};
use epp_client::login::Login;
@ -99,11 +100,19 @@ async fn client() {
let mut client = connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5))
let (mut client, mut connection) =
connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5))
assert_eq!(client.xml_greeting(), xml("response/greeting.xml"));
tokio::spawn(async move {
let rsp = client
@ -116,6 +125,7 @@ async fn client() {
assert_eq!(rsp.result.code, ResultCode::CommandCompletedSuccessfully);
assert_eq!(rsp.result.code, ResultCode::CommandCompletedSuccessfully);
@ -175,11 +185,20 @@ async fn dropped() {
let mut client = connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5))
let (mut client, mut connection) =
connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5))
tokio::spawn(async move {
trace!("connection future resolved successfully")
assert_eq!(client.xml_greeting(), xml("response/greeting.xml"));
trace!("Trying to get greeting");
let rsp = client
@ -240,4 +259,40 @@ async fn dropped() {
let rsp = client.transact(&create, CLTRID).await.unwrap();
assert_eq!(rsp.result.code, ResultCode::CommandCompletedSuccessfully);
async fn drop_client() {
let _guard = log_to_stdout();
struct FakeConnector;
impl epp_client::client::Connector for FakeConnector {
type Connection = tokio_test::io::Mock;
async fn connect(&self, _: Duration) -> Result<Self::Connection, epp_client::Error> {
let (client, mut connection) =
connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5))
let handle = tokio::spawn(async move {
trace!("connection future resolved successfully")
Reference in New Issue
Block a user