Prepare move to I/O task based client connection architecture

This commit is contained in:
Rudi Floren 2023-01-17 17:06:20 +01:00
parent 0c28b1db62
commit eb2ff138f5
No known key found for this signature in database
GPG Key ID: 3667D82FA1AA6CEB
3 changed files with 50 additions and 30 deletions

View File

@ -71,9 +71,12 @@ impl<C: Connector> EppClient<C> {
pub async fn hello(&mut self) -> Result<Greeting, Error> {
let xml = xml::serialize(&HelloDocument::default())?;
debug!("{}: hello: {}", self.connection.registry, &xml);
debug!(registry = self.connection.registry, "hello: {}", &xml);
let response = self.connection.transact(&xml).await?;
debug!("{}: greeting: {}", self.connection.registry, &response);
debug!(
registry = self.connection.registry,
"greeting: {}", &response
);
Ok(xml::deserialize::<GreetingDocument>(&response)?.data)
}
@ -91,9 +94,12 @@ impl<C: Connector> EppClient<C> {
let document = CommandDocument::new(data.command, data.extension, id);
let xml = xml::serialize(&document)?;
debug!("{}: request: {}", self.connection.registry, &xml);
debug!(registry = self.connection.registry, "request: {}", &xml);
let response = self.connection.transact(&xml).await?;
debug!("{}: response: {}", self.connection.registry, &response);
debug!(
registry = self.connection.registry,
"response: {}", &response
);
let rsp = xml::deserialize::<ResponseDocument<Cmd::Response, Ext::Response>>(&response)?;
if rsp.data.result.code.is_success() {
@ -105,7 +111,7 @@ impl<C: Connector> EppClient<C> {
tr_ids: rsp.data.tr_ids,
}));
error!(%response, "Failed to deserialize response for transaction: {}", err);
error!(registry=self.connection.registry, %response, "Failed to deserialize response for transaction: {}", err);
Err(err)
}

View File

@ -17,8 +17,7 @@ pub(crate) struct EppConnection<C: Connector> {
stream: C::Connection,
pub greeting: String,
timeout: Duration,
// Whether the connection is in a good state to start sending a request
ready: bool,
state: ConnectionState,
}
impl<C: Connector> EppConnection<C> {
@ -33,16 +32,16 @@ impl<C: Connector> EppConnection<C> {
connector,
greeting: String::new(),
timeout,
ready: false,
state: Default::default(),
};
this.greeting = this.get_epp_response().await?;
this.ready = true;
this.greeting = this.read_epp_response().await?;
this.state = ConnectionState::Open;
Ok(this)
}
/// Constructs an EPP XML request in the required form and sends it to the server
async fn send_epp_request(&mut self, content: &str) -> Result<(), Error> {
async fn write_epp_request(&mut self, content: &str) -> Result<(), Error> {
let len = content.len();
let buf_size = len + 4;
@ -55,31 +54,35 @@ impl<C: Connector> EppConnection<C> {
buf[4..].clone_from_slice(content.as_bytes());
let wrote = timeout(self.timeout, self.stream.write(&buf)).await?;
debug!("{}: Wrote {} bytes", self.registry, wrote);
debug!(registry = self.registry, "Wrote {} bytes", wrote);
Ok(())
}
/// Receives response from the socket and converts it into an EPP XML string
async fn get_epp_response(&mut self) -> Result<String, Error> {
async fn read_epp_response(&mut self) -> Result<String, Error> {
let mut buf = [0u8; 4];
timeout(self.timeout, self.stream.read_exact(&mut buf)).await?;
let buf_size: usize = u32::from_be_bytes(buf).try_into()?;
let message_size = buf_size - 4;
debug!("{}: Response buffer size: {}", self.registry, message_size);
debug!(
registry = self.registry,
"Response buffer size: {}", message_size
);
let mut buf = vec![0; message_size];
let mut read_size: usize = 0;
loop {
let read = timeout(self.timeout, self.stream.read(&mut buf[read_size..])).await?;
debug!("{}: Read: {} bytes", self.registry, read);
debug!(registry = self.registry, "Read: {} bytes", read);
read_size += read;
debug!("{}: Total read: {} bytes", self.registry, read_size);
debug!(registry = self.registry, "Total read: {} bytes", read_size);
if read == 0 {
self.state = ConnectionState::Closed;
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!("{}: unexpected eof", self.registry),
@ -90,41 +93,41 @@ impl<C: Connector> EppConnection<C> {
}
}
self.ready = true;
Ok(String::from_utf8(buf)?)
}
pub(crate) async fn reconnect(&mut self) -> Result<(), Error> {
debug!("{}: reconnecting", self.registry);
self.ready = false;
debug!(registry = self.registry, "reconnecting");
self.state = ConnectionState::Opening;
self.stream = self.connector.connect(self.timeout).await?;
self.greeting = self.get_epp_response().await?;
self.ready = true;
self.greeting = self.read_epp_response().await?;
self.state = ConnectionState::Open;
Ok(())
}
/// Sends an EPP XML request to the registry and return the response
/// receieved to the request
pub(crate) async fn transact(&mut self, content: &str) -> Result<String, Error> {
if !self.ready {
debug!("{}: connection not ready", self.registry);
if self.state != ConnectionState::Open {
debug!(registry = self.registry, " connection not ready");
self.reconnect().await?;
}
debug!("{}: request: {}", self.registry, content);
self.send_epp_request(content).await?;
debug!(registry = self.registry, " request: {}", content);
self.write_epp_request(content).await?;
let response = self.get_epp_response().await?;
debug!("{}: response: {}", self.registry, response);
let response = self.read_epp_response().await?;
debug!(registry = self.registry, " response: {}", response);
Ok(response)
}
/// Closes the socket and shuts the connection
pub(crate) async fn shutdown(&mut self) -> Result<(), Error> {
info!("{}: Closing connection", self.registry);
self.ready = false;
info!(registry = self.registry, "Closing connection");
self.state = ConnectionState::Closing;
timeout(self.timeout, self.stream.shutdown()).await?;
self.state = ConnectionState::Closed;
Ok(())
}
}
@ -139,3 +142,12 @@ pub(crate) async fn timeout<T, E: Into<Error>>(
Err(_) => Err(Error::Timeout),
}
}
#[derive(Debug, Default, PartialEq, Eq)]
enum ConnectionState {
#[default]
Opening,
Open,
Closing,
Closed,
}

View File

@ -104,7 +104,7 @@ async fn client() {
.unwrap();
assert_eq!(client.xml_greeting(), xml("response/greeting.xml"));
client
let rsp = client
.transact(
&Login::new(
"username",
@ -117,6 +117,8 @@ async fn client() {
.await
.unwrap();
assert_eq!(rsp.result.code, ResultCode::CommandCompletedSuccessfully);
let rsp = client
.transact(
&DomainCheck {