Add support for idle keepalive
Using the idle_timeout paramter during connection creation, the connection is prevented from being closed server-side due to beeing idle for too long. Internally, while waiting for an API request, if idle_timeout reaches zero before a new request comes in, a hello request is send as a keepalive measure.
This commit is contained in:
parent
065210b8e8
commit
4922c4e370
|
@ -37,7 +37,7 @@ use crate::xml;
|
||||||
/// # async fn main() {
|
/// # async fn main() {
|
||||||
/// // Create an instance of EppClient
|
/// // Create an instance of EppClient
|
||||||
/// let timeout = Duration::from_secs(5);
|
/// let timeout = Duration::from_secs(5);
|
||||||
/// let (mut client, mut connection) = match connect("registry_name".into(), ("example.com".into(), 7000), None, timeout).await {
|
/// let (mut client, mut connection) = match connect("registry_name".into(), ("example.com".into(), 7000), None, timeout, None).await {
|
||||||
/// Ok(client) => client,
|
/// Ok(client) => client,
|
||||||
/// Err(e) => panic!("Failed to create EppClient: {}", e)
|
/// Err(e) => panic!("Failed to create EppClient: {}", e)
|
||||||
/// };
|
/// };
|
||||||
|
|
|
@ -26,10 +26,12 @@ use crate::error::Error;
|
||||||
|
|
||||||
/// Connect to the specified `server` and `hostname` over TLS
|
/// Connect to the specified `server` and `hostname` over TLS
|
||||||
///
|
///
|
||||||
/// The `registry` is used as a name in internal logging; `addr` provides the address to
|
/// The `registry` is used as a name in internal logging; `server` provides the hostname and port
|
||||||
/// connect to, `hostname` is sent as the TLS server name indication and `identity` provides
|
/// to connect to, and `identity` provides optional TLS client authentication (using) rustls as
|
||||||
/// optional TLS client authentication (using) rustls as the TLS implementation.
|
/// the TLS implementation.
|
||||||
/// The `timeout` limits the time spent on any underlying network operations.
|
/// The `request_timeout` limits the time spent on any underlying network operation.
|
||||||
|
/// The `idle_timeout` prevents the connection to be closed server-side due to being idle. (See
|
||||||
|
/// [`EppConnection`] Keepalive)
|
||||||
///
|
///
|
||||||
/// This returns two halves, a cloneable client and the underlying connection.
|
/// This returns two halves, a cloneable client and the underlying connection.
|
||||||
///
|
///
|
||||||
|
@ -40,22 +42,25 @@ pub async fn connect(
|
||||||
server: (Cow<'static, str>, u16),
|
server: (Cow<'static, str>, u16),
|
||||||
identity: Option<(Vec<Certificate>, PrivateKey)>,
|
identity: Option<(Vec<Certificate>, PrivateKey)>,
|
||||||
request_timeout: Duration,
|
request_timeout: Duration,
|
||||||
|
idle_timeout: Option<Duration>,
|
||||||
) -> Result<(EppClient, EppConnection<RustlsConnector>), Error> {
|
) -> Result<(EppClient, EppConnection<RustlsConnector>), Error> {
|
||||||
let connector = RustlsConnector::new(server, identity).await?;
|
let connector = RustlsConnector::new(server, identity).await?;
|
||||||
|
|
||||||
let (sender, receiver) = mpsc::unbounded_channel();
|
let (sender, receiver) = mpsc::unbounded_channel();
|
||||||
let client = EppClient::new(sender, registry.clone());
|
let client = EppClient::new(sender, registry.clone());
|
||||||
let connection = EppConnection::new(connector, registry, receiver, request_timeout).await?;
|
let connection =
|
||||||
|
EppConnection::new(connector, registry, receiver, request_timeout, idle_timeout).await?;
|
||||||
|
|
||||||
Ok((client, connection))
|
Ok((client, connection))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Connect to the specified `server` and `hostname` via the passed connector.
|
/// Connect to the specified `server` and `hostname` via the passed connector.
|
||||||
///
|
///
|
||||||
/// The `registry` is used as a name in internal logging; `addr` provides the address to
|
/// The `registry` is used as a name in internal logging; `connector` provides a way to
|
||||||
/// connect to, `hostname` is sent as the TLS server name indication and `identity` provides
|
/// plug in various network connections.
|
||||||
/// optional TLS client authentication (using) rustls as the TLS implementation.
|
/// The `request_timeout` limits the time spent on any underlying network operations.
|
||||||
/// The `timeout` limits the time spent on any underlying network operations.
|
/// The `idle_timeout` prevents the connection to be closed server-side due to being idle. (See
|
||||||
|
/// [`EppConnection`] Keepalive)
|
||||||
///
|
///
|
||||||
/// This returns two halves, a cloneable client and the underlying connection.
|
/// This returns two halves, a cloneable client and the underlying connection.
|
||||||
///
|
///
|
||||||
|
@ -64,13 +69,15 @@ pub async fn connect_with_connector<C>(
|
||||||
connector: C,
|
connector: C,
|
||||||
registry: Cow<'static, str>,
|
registry: Cow<'static, str>,
|
||||||
request_timeout: Duration,
|
request_timeout: Duration,
|
||||||
|
idle_timeout: Option<Duration>,
|
||||||
) -> Result<(EppClient, EppConnection<C>), Error>
|
) -> Result<(EppClient, EppConnection<C>), Error>
|
||||||
where
|
where
|
||||||
C: Connector,
|
C: Connector,
|
||||||
{
|
{
|
||||||
let (sender, receiver) = mpsc::unbounded_channel();
|
let (sender, receiver) = mpsc::unbounded_channel();
|
||||||
let client = EppClient::new(sender, registry.clone());
|
let client = EppClient::new(sender, registry.clone());
|
||||||
let connection = EppConnection::new(connector, registry, receiver, request_timeout).await?;
|
let connection =
|
||||||
|
EppConnection::new(connector, registry, receiver, request_timeout, idle_timeout).await?;
|
||||||
|
|
||||||
Ok((client, connection))
|
Ok((client, connection))
|
||||||
}
|
}
|
||||||
|
@ -160,3 +167,7 @@ pub trait Connector {
|
||||||
|
|
||||||
async fn connect(&self, timeout: Duration) -> Result<Self::Connection, Error>;
|
async fn connect(&self, timeout: Duration) -> Result<Self::Connection, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Per default try to send a keep alive every 8 minutes.
|
||||||
|
/// Verisign has an idle timeout of 10 minutes.
|
||||||
|
pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60 * 8);
|
||||||
|
|
|
@ -12,6 +12,8 @@ use tracing::{debug, error, info, trace, warn};
|
||||||
|
|
||||||
use crate::connect::Connector;
|
use crate::connect::Connector;
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
|
use crate::hello::HelloDocument;
|
||||||
|
use crate::xml;
|
||||||
|
|
||||||
/// EPP Connection
|
/// EPP Connection
|
||||||
///
|
///
|
||||||
|
@ -21,12 +23,27 @@ use crate::error::Error;
|
||||||
/// [`EppConnection`] provides a [`EppConnection::run`](EppConnection::run) method, which only resolves when the connection is closed,
|
/// [`EppConnection`] provides a [`EppConnection::run`](EppConnection::run) method, which only resolves when the connection is closed,
|
||||||
/// either because a fatal error has occurred, or because its associated [`EppClient`](super::EppClient) has been dropped
|
/// either because a fatal error has occurred, or because its associated [`EppClient`](super::EppClient) has been dropped
|
||||||
/// and all outstanding work has been completed.
|
/// and all outstanding work has been completed.
|
||||||
|
///
|
||||||
|
/// # Keepalive (Idle Timeout)
|
||||||
|
///
|
||||||
|
/// EppConnection supports a keepalive mechanism.
|
||||||
|
/// When `idle_timeout` is set, every time the timeout reaches zero while waiting for a new request from the
|
||||||
|
/// [`EppClient`](super::EppClient), a `<hello>` request is sent to the epp server.
|
||||||
|
/// This is in line with VeriSign's guidelines. VeriSign uses an idle timeout of 10 minutes and an absolute timeout of 24h.
|
||||||
|
/// Choosing an `idle_timeout` of 8 minutes should be sufficient to not run into VeriSign's idle timeout.
|
||||||
|
/// Other registry operators might need other values.
|
||||||
|
///
|
||||||
|
/// # Reconnect (Absolute Timeout)
|
||||||
|
///
|
||||||
|
/// Reconnecting, to gracefully allow a [`EppConnection`] to be "active", is currently not implemented. But a reconnect
|
||||||
|
/// command is present to initiate the reconnect from the outside
|
||||||
pub struct EppConnection<C: Connector> {
|
pub struct EppConnection<C: Connector> {
|
||||||
registry: Cow<'static, str>,
|
registry: Cow<'static, str>,
|
||||||
connector: C,
|
connector: C,
|
||||||
stream: C::Connection,
|
stream: C::Connection,
|
||||||
greeting: String,
|
greeting: String,
|
||||||
timeout: Duration,
|
timeout: Duration,
|
||||||
|
idle_timeout: Option<Duration>,
|
||||||
/// A receiver for receiving requests from [`EppClients`](super::client::EppClient) for the underlying connection.
|
/// A receiver for receiving requests from [`EppClients`](super::client::EppClient) for the underlying connection.
|
||||||
receiver: mpsc::UnboundedReceiver<Request>,
|
receiver: mpsc::UnboundedReceiver<Request>,
|
||||||
state: ConnectionState,
|
state: ConnectionState,
|
||||||
|
@ -38,6 +55,7 @@ impl<C: Connector> EppConnection<C> {
|
||||||
registry: Cow<'static, str>,
|
registry: Cow<'static, str>,
|
||||||
receiver: mpsc::UnboundedReceiver<Request>,
|
receiver: mpsc::UnboundedReceiver<Request>,
|
||||||
request_timeout: Duration,
|
request_timeout: Duration,
|
||||||
|
idle_timeout: Option<Duration>,
|
||||||
) -> Result<Self, Error> {
|
) -> Result<Self, Error> {
|
||||||
let mut this = Self {
|
let mut this = Self {
|
||||||
registry,
|
registry,
|
||||||
|
@ -46,6 +64,7 @@ impl<C: Connector> EppConnection<C> {
|
||||||
receiver,
|
receiver,
|
||||||
greeting: String::new(),
|
greeting: String::new(),
|
||||||
timeout: request_timeout,
|
timeout: request_timeout,
|
||||||
|
idle_timeout,
|
||||||
state: Default::default(),
|
state: Default::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -176,6 +195,35 @@ impl<C: Connector> EppConnection<C> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn request_or_keepalive(&mut self) -> Result<Option<Request>, Error> {
|
||||||
|
loop {
|
||||||
|
let Some(idle_timeout) = self.idle_timeout else {
|
||||||
|
// We do not have any keep alive set, just forward to waiting for a request.
|
||||||
|
return Ok(self.receiver.recv().await);
|
||||||
|
};
|
||||||
|
trace!(registry = %self.registry, "Waiting for {idle_timeout:?} for new request until keepalive");
|
||||||
|
match tokio::time::timeout(idle_timeout, self.receiver.recv()).await {
|
||||||
|
Ok(request) => return Ok(request),
|
||||||
|
Err(_) => {
|
||||||
|
self.keepalive().await?;
|
||||||
|
// We sent the keepalive. Go back to wait for requests.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn keepalive(&mut self) -> Result<(), Error> {
|
||||||
|
trace!(registry = %self.registry, "Sending keepalive hello");
|
||||||
|
// Send hello
|
||||||
|
let request = xml::serialize(&HelloDocument::default())?;
|
||||||
|
self.write_epp_request(&request).await?;
|
||||||
|
|
||||||
|
// Await new greeting
|
||||||
|
self.greeting = self.read_epp_response().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// This is the main method of the I/O tasks
|
/// This is the main method of the I/O tasks
|
||||||
///
|
///
|
||||||
/// It will try to get a request, write it to the wire and waits for the response.
|
/// It will try to get a request, write it to the wire and waits for the response.
|
||||||
|
@ -191,8 +239,11 @@ impl<C: Connector> EppConnection<C> {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for new request
|
// Wait for new request or send a keepalive
|
||||||
let request = self.receiver.recv().await;
|
let request = match self.request_or_keepalive().await {
|
||||||
|
Ok(request) => request,
|
||||||
|
Err(err) => return Some(Err(err)),
|
||||||
|
};
|
||||||
let Some(request) = request else {
|
let Some(request) = request else {
|
||||||
// The client got dropped. We can close the connection.
|
// The client got dropped. We can close the connection.
|
||||||
match self.wait_for_shutdown().await {
|
match self.wait_for_shutdown().await {
|
||||||
|
|
|
@ -59,7 +59,7 @@
|
||||||
//! use std::net::ToSocketAddrs;
|
//! use std::net::ToSocketAddrs;
|
||||||
//! use std::time::Duration;
|
//! use std::time::Duration;
|
||||||
//!
|
//!
|
||||||
//! use epp_client::connect::connect;
|
//! use epp_client::connect::{connect, DEFAULT_IDLE_TIMEOUT};
|
||||||
//! use epp_client::domain::DomainCheck;
|
//! use epp_client::domain::DomainCheck;
|
||||||
//! use epp_client::login::Login;
|
//! use epp_client::login::Login;
|
||||||
//!
|
//!
|
||||||
|
@ -67,7 +67,7 @@
|
||||||
//! async fn main() {
|
//! async fn main() {
|
||||||
//! // Create an instance of EppClient
|
//! // Create an instance of EppClient
|
||||||
//! let timeout = Duration::from_secs(5);
|
//! let timeout = Duration::from_secs(5);
|
||||||
//! let (mut client, mut connection) = match connect("registry_name".into(), ("example.com".into(), 7000), None, timeout).await {
|
//! let (mut client, mut connection) = match connect("registry_name".into(), ("example.com".into(), 7000), None, timeout, Some(DEFAULT_IDLE_TIMEOUT)).await {
|
||||||
//! Ok(c) => c,
|
//! Ok(c) => c,
|
||||||
//! Err(e) => panic!("Failed to create EppClient: {}", e)
|
//! Err(e) => panic!("Failed to create EppClient: {}", e)
|
||||||
//! };
|
//! };
|
||||||
|
|
|
@ -101,7 +101,7 @@ async fn client() {
|
||||||
}
|
}
|
||||||
|
|
||||||
let (mut client, mut connection) =
|
let (mut client, mut connection) =
|
||||||
connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5))
|
connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -186,7 +186,7 @@ async fn dropped() {
|
||||||
}
|
}
|
||||||
|
|
||||||
let (mut client, mut connection) =
|
let (mut client, mut connection) =
|
||||||
connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5))
|
connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
@ -279,7 +279,7 @@ async fn drop_client() {
|
||||||
}
|
}
|
||||||
|
|
||||||
let (client, mut connection) =
|
let (client, mut connection) =
|
||||||
connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5))
|
connect_with_connector(FakeConnector, "test".into(), Duration::from_secs(5), None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -296,3 +296,93 @@ async fn drop_client() {
|
||||||
drop(client);
|
drop(client);
|
||||||
assert!(handle.await.is_ok());
|
assert!(handle.await.is_ok());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn keepalive() {
|
||||||
|
let _guard = log_to_stdout();
|
||||||
|
|
||||||
|
struct FakeConnector;
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl epp_client::client::Connector for FakeConnector {
|
||||||
|
type Connection = tokio_test::io::Mock;
|
||||||
|
|
||||||
|
async fn connect(&self, _: Duration) -> Result<Self::Connection, epp_client::Error> {
|
||||||
|
Ok(build_stream(&[
|
||||||
|
"response/greeting.xml",
|
||||||
|
"request/login.xml",
|
||||||
|
"response/login.xml",
|
||||||
|
// The keepalive should kick in.
|
||||||
|
// We set the keepalive to 100ms and wait 250ms which should yield two hello requests
|
||||||
|
"request/hello.xml",
|
||||||
|
"response/greeting.xml",
|
||||||
|
"request/hello.xml",
|
||||||
|
"response/greeting.xml",
|
||||||
|
"request/domain/create.xml",
|
||||||
|
"response/domain/create.xml",
|
||||||
|
])
|
||||||
|
.build())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let (mut client, mut connection) = connect_with_connector(
|
||||||
|
FakeConnector,
|
||||||
|
"test".into(),
|
||||||
|
Duration::from_secs(5),
|
||||||
|
Some(Duration::from_millis(100)),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
connection.run().await.unwrap();
|
||||||
|
trace!("connection future resolved successfully")
|
||||||
|
});
|
||||||
|
|
||||||
|
trace!("Trying to get greeting");
|
||||||
|
assert_eq!(
|
||||||
|
client.xml_greeting().await.unwrap(),
|
||||||
|
xml("response/greeting.xml")
|
||||||
|
);
|
||||||
|
|
||||||
|
let rsp = client
|
||||||
|
.transact(
|
||||||
|
&Login::new(
|
||||||
|
"username",
|
||||||
|
"password",
|
||||||
|
Some("new-password"),
|
||||||
|
Some(&["http://schema.ispapi.net/epp/xml/keyvalue-1.0"]),
|
||||||
|
),
|
||||||
|
CLTRID,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(rsp.result.code, ResultCode::CommandCompletedSuccessfully);
|
||||||
|
trace!("Waiting");
|
||||||
|
tokio::time::sleep(Duration::from_millis(250)).await;
|
||||||
|
|
||||||
|
let contacts = &[
|
||||||
|
DomainContact {
|
||||||
|
contact_type: "admin".into(),
|
||||||
|
id: "eppdev-contact-3".into(),
|
||||||
|
},
|
||||||
|
DomainContact {
|
||||||
|
contact_type: "tech".into(),
|
||||||
|
id: "eppdev-contact-3".into(),
|
||||||
|
},
|
||||||
|
DomainContact {
|
||||||
|
contact_type: "billing".into(),
|
||||||
|
id: "eppdev-contact-3".into(),
|
||||||
|
},
|
||||||
|
];
|
||||||
|
let create = DomainCreate::new(
|
||||||
|
"eppdev-1.com",
|
||||||
|
Period::years(1).unwrap(),
|
||||||
|
None,
|
||||||
|
Some("eppdev-contact-3"),
|
||||||
|
"epP4uthd#v",
|
||||||
|
Some(contacts),
|
||||||
|
);
|
||||||
|
trace!("Trying to create domains");
|
||||||
|
let rsp = client.transact(&create, CLTRID).await.unwrap();
|
||||||
|
assert_eq!(rsp.result.code, ResultCode::CommandCompletedSuccessfully);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue