Enable TCP_NODELAY on TCP-based connections.

We may want a more fine-grained approach to immediately transmitting
application data, but Hyper does not yet expose a suitable API.

Resolves #2062.
This commit is contained in:
Sergio Benitez 2022-04-27 19:30:17 -07:00
parent 613366f4bb
commit bf84b1cdb5
4 changed files with 34 additions and 3 deletions

View File

@ -46,6 +46,11 @@ pub trait Connection: AsyncRead + AsyncWrite {
/// The remote address, i.e. the client's socket address, if it is known. /// The remote address, i.e. the client's socket address, if it is known.
fn peer_address(&self) -> Option<SocketAddr>; fn peer_address(&self) -> Option<SocketAddr>;
/// Requests that the connection not delay reading or writing data as much
/// as possible. For connections backed by TCP, this corresponds to setting
/// `TCP_NODELAY`.
fn enable_nodelay(&self) -> io::Result<()>;
/// DER-encoded X.509 certificate chain presented by the client, if any. /// DER-encoded X.509 certificate chain presented by the client, if any.
/// ///
/// The certificate order must be as it appears in the TLS protocol: the /// The certificate order must be as it appears in the TLS protocol: the
@ -65,6 +70,7 @@ pin_project_lite::pin_project! {
#[must_use = "streams do nothing unless polled"] #[must_use = "streams do nothing unless polled"]
pub struct Incoming<L> { pub struct Incoming<L> {
sleep_on_errors: Option<Duration>, sleep_on_errors: Option<Duration>,
nodelay: bool,
#[pin] #[pin]
pending_error_delay: Option<Sleep>, pending_error_delay: Option<Sleep>,
#[pin] #[pin]
@ -79,10 +85,11 @@ impl<L: Listener> Incoming<L> {
listener, listener,
sleep_on_errors: Some(Duration::from_millis(250)), sleep_on_errors: Some(Duration::from_millis(250)),
pending_error_delay: None, pending_error_delay: None,
nodelay: false,
} }
} }
/// Set whether to sleep on accept errors. /// Set whether and how long to sleep on accept errors.
/// ///
/// A possible scenario is that the process has hit the max open files /// A possible scenario is that the process has hit the max open files
/// allowed, and so trying to accept a new connection will fail with /// allowed, and so trying to accept a new connection will fail with
@ -97,8 +104,16 @@ impl<L: Listener> Incoming<L> {
/// this option to `None` will allow that. /// this option to `None` will allow that.
/// ///
/// Default is 1 second. /// Default is 1 second.
pub fn set_sleep_on_errors(&mut self, val: Option<Duration>) { pub fn sleep_on_errors(mut self, val: Option<Duration>) -> Self {
self.sleep_on_errors = val; self.sleep_on_errors = val;
self
}
/// Set whether to request no delay on all incoming connections. The default
/// is `false`. See [`Connection::enable_nodelay()`] for details.
pub fn nodelay(mut self, nodelay: bool) -> Self {
self.nodelay = nodelay;
self
} }
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<L::Connection>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<L::Connection>> {
@ -123,6 +138,10 @@ impl<L: Listener> Incoming<L> {
match me.listener.as_mut().poll_accept(cx) { match me.listener.as_mut().poll_accept(cx) {
Poll::Ready(Ok(stream)) => { Poll::Ready(Ok(stream)) => {
if *me.nodelay {
let _ = stream.enable_nodelay();
}
return Poll::Ready(Ok(stream)); return Poll::Ready(Ok(stream));
}, },
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
@ -205,4 +224,8 @@ impl Connection for TcpStream {
fn peer_address(&self) -> Option<SocketAddr> { fn peer_address(&self) -> Option<SocketAddr> {
self.peer_addr().ok() self.peer_addr().ok()
} }
fn enable_nodelay(&self) -> io::Result<()> {
self.set_nodelay(true)
}
} }

View File

@ -125,4 +125,8 @@ impl Connection for TlsStream<TcpStream> {
fn peer_certificates(&self) -> Option<&[RawCertificate]> { fn peer_certificates(&self) -> Option<&[RawCertificate]> {
self.get_ref().1.peer_certificates() self.get_ref().1.peer_certificates()
} }
fn enable_nodelay(&self) -> io::Result<()> {
self.get_ref().0.enable_nodelay()
}
} }

View File

@ -301,6 +301,10 @@ impl<F: Future, C: Connection> Connection for CancellableIo<F, C> {
fn peer_certificates(&self) -> Option<&[RawCertificate]> { fn peer_certificates(&self) -> Option<&[RawCertificate]> {
self.io.peer_certificates() self.io.peer_certificates()
} }
fn enable_nodelay(&self) -> io::Result<()> {
self.io.enable_nodelay()
}
} }
pin_project! { pin_project! {

View File

@ -452,7 +452,7 @@ impl Rocket<Orbit> {
// NOTE: `hyper` uses `tokio::spawn()` as the default executor. // NOTE: `hyper` uses `tokio::spawn()` as the default executor.
let listener = CancellableListener::new(shutdown.clone(), listener, grace, mercy); let listener = CancellableListener::new(shutdown.clone(), listener, grace, mercy);
let builder = hyper::Server::builder(Incoming::new(listener)); let builder = hyper::Server::builder(Incoming::new(listener).nodelay(true));
#[cfg(feature = "http2")] #[cfg(feature = "http2")]
let builder = builder.http2_keep_alive_interval(match keep_alive { let builder = builder.http2_keep_alive_interval(match keep_alive {