Track connection state and reconnect on invalid state

The external client/connection interface expects that to complete
full request/response cycles. However, at await points the stack
could simply be dropped, meaning the connection is left in an
inconsistent state. One relatively likely scenario is that a
transaction might be dropped while waiting for a response from the
server. For example, this might happen if the connection was
initiated by a HTTP request which was canceled/aborted.

There are different failure modes which can result from similar
scenarios depending on during what await point the future was
dropped. Since it's relatively difficult to protect against
these scenarios and some of them might manifest in indirect ways
(for example, a deserialization error might happen because the
incoming response was for a different kind of request), this PR
takes the approach of tracking in the connection whether we're
(supposedly) at a point where the connection is ready to send
another request. If transact() is called while the connection is
not in such a state, the connection will transparently attempt
to reconnect to clean up any erroneous state.
This commit is contained in:
Dirkjan Ochtman 2022-03-04 15:41:41 +01:00
parent 7e0a51bebb
commit eed3a075eb
1 changed files with 12 additions and 1 deletions

View File

@ -18,6 +18,8 @@ pub(crate) struct EppConnection<C: Connector> {
stream: C::Connection,
pub greeting: String,
timeout: Duration,
// Whether the connection is in a good state to start sending a request
ready: bool,
}
impl<C: Connector> EppConnection<C> {
@ -32,9 +34,11 @@ impl<C: Connector> EppConnection<C> {
connector,
greeting: String::new(),
timeout,
ready: false,
};
this.greeting = this.get_epp_response().await?;
this.ready = true;
Ok(this)
}
@ -87,18 +91,25 @@ impl<C: Connector> EppConnection<C> {
}
}
self.ready = true;
Ok(String::from_utf8(buf)?)
}
pub(crate) async fn reconnect(&mut self) -> Result<(), Error> {
self.ready = false;
self.stream = self.connector.connect(self.timeout).await?;
self.greeting = self.get_epp_response().await?;
self.ready = true;
Ok(())
}
/// Sends an EPP XML request to the registry and return the response
/// receieved to the request
pub(crate) async fn transact(&mut self, content: &str) -> Result<String, Error> {
if !self.ready {
self.reconnect().await?;
}
debug!("{}: request: {}", self.registry, content);
self.send_epp_request(content).await?;
@ -111,7 +122,7 @@ impl<C: Connector> EppConnection<C> {
/// Closes the socket and shuts the connection
pub(crate) async fn shutdown(&mut self) -> Result<(), Error> {
info!("{}: Closing connection", self.registry);
self.ready = false;
timeout(self.timeout, self.stream.shutdown()).await?;
Ok(())
}