diff --git a/core/lib/src/data/data_stream.rs b/core/lib/src/data/data_stream.rs index a9460079..b3355476 100644 --- a/core/lib/src/data/data_stream.rs +++ b/core/lib/src/data/data_stream.rs @@ -65,7 +65,7 @@ enum StreamKind<'r> { impl<'r> DataStream<'r> { pub(crate) fn new(buf: Vec, stream: StreamReader<'r>, limit: u64) -> Self { - let chain = Chain::new(Cursor::new(buf), stream).take(limit); + let chain = Chain::new(Cursor::new(buf), stream).take(limit).into(); Self { chain } } @@ -73,6 +73,7 @@ impl<'r> DataStream<'r> { async fn limit_exceeded(&mut self) -> io::Result { #[cold] async fn _limit_exceeded(stream: &mut DataStream<'_>) -> io::Result { + // Read one more byte after reaching limit to see if we cut early. stream.chain.set_limit(1); let mut buf = [0u8; 1]; Ok(stream.read(&mut buf).await? != 0) @@ -252,6 +253,18 @@ impl AsyncRead for DataStream<'_> { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { + if self.chain.limit() == 0 { + let stream: &StreamReader<'_> = &self.chain.get_ref().get_ref().1; + let kind = match stream.inner { + StreamKind::Empty => "an empty stream (vacuous)", + StreamKind::Body(_) => "the request body", + StreamKind::Multipart(_) => "a multipart form field", + }; + + let msg = yansi::Paint::default(kind).bold(); + warn_!("Data limit reached while reading {}.", msg); + } + Pin::new(&mut self.chain).poll_read(cx, buf) } }