From 8824d498d1370e341b52e1efe654b61fbb771d18 Mon Sep 17 00:00:00 2001 From: Sergio Benitez Date: Mon, 12 Sep 2016 02:43:34 -0700 Subject: [PATCH] Add streaming responder and example. --- Cargo.toml | 1 + examples/stream/Cargo.toml | 9 +++++ examples/stream/src/main.rs | 20 +++++++++++ lib/src/response/mod.rs | 2 ++ lib/src/response/named_file.rs | 16 +++++++++ lib/src/response/stream.rs | 63 ++++++++++++++++++++++++++-------- 6 files changed, 96 insertions(+), 15 deletions(-) create mode 100644 examples/stream/Cargo.toml create mode 100644 examples/stream/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 55ed61ba..c4bf6de3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,4 +17,5 @@ members = [ "examples/hello_ranks", "examples/testing", "examples/from_request", + "examples/stream", ] diff --git a/examples/stream/Cargo.toml b/examples/stream/Cargo.toml new file mode 100644 index 00000000..8f1e148e --- /dev/null +++ b/examples/stream/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "stream" +version = "0.0.1" +authors = ["Sergio Benitez "] +workspace = "../../" + +[dependencies] +rocket = { path = "../../lib" } +rocket_codegen = { path = "../../codegen" } diff --git a/examples/stream/src/main.rs b/examples/stream/src/main.rs new file mode 100644 index 00000000..8e4b56d3 --- /dev/null +++ b/examples/stream/src/main.rs @@ -0,0 +1,20 @@ +#![feature(plugin)] +#![plugin(rocket_codegen)] + +extern crate rocket; + +use rocket::Rocket; +use rocket::response::{Plain, Stream}; + +use std::io::{repeat, Repeat, Read, Take}; + +type LimitedRepeat = Take; + +#[get("/")] +fn root() -> Plain> { + Plain(Stream::from(repeat('a' as u8).take(25000))) +} + +fn main() { + Rocket::new("localhost", 8000).mount_and_launch("/", routes![root]); +} diff --git a/lib/src/response/mod.rs b/lib/src/response/mod.rs index b03d793a..807f3a21 100644 --- a/lib/src/response/mod.rs +++ b/lib/src/response/mod.rs @@ -6,6 +6,7 @@ mod outcome; mod flash; mod data_type; mod named_file; +mod stream; pub use hyper::server::Response as HyperResponse; pub use hyper::net::Fresh as HyperFresh; @@ -21,6 +22,7 @@ pub use self::with_status::StatusResponse; pub use self::outcome::Outcome; pub use self::flash::Flash; pub use self::named_file::NamedFile; +pub use self::stream::Stream; use std::ops::{Deref, DerefMut}; diff --git a/lib/src/response/named_file.rs b/lib/src/response/named_file.rs index f8803839..d14975d8 100644 --- a/lib/src/response/named_file.rs +++ b/lib/src/response/named_file.rs @@ -3,7 +3,9 @@ use std::fs::File; use std::path::{Path, PathBuf}; use response::mime::{Mime, TopLevel, SubLevel}; use std::io; +use std::ops::{Deref, DerefMut}; +#[derive(Debug)] pub struct NamedFile(PathBuf, File); impl NamedFile { @@ -46,3 +48,17 @@ impl Responder for NamedFile { } } +impl Deref for NamedFile { + type Target = File; + + fn deref(&self) -> &File { + &self.1 + } +} + +impl DerefMut for NamedFile { + fn deref_mut(&mut self) -> &mut File { + &mut self.1 + } +} + diff --git a/lib/src/response/stream.rs b/lib/src/response/stream.rs index 92523712..20635510 100644 --- a/lib/src/response/stream.rs +++ b/lib/src/response/stream.rs @@ -1,16 +1,49 @@ -// const CHUNK_SIZE: u32 = 4096; -// pub struct Stream(T); -// impl Responder for Stream { -// fn respond<'a>(&self, mut r: HypResponse<'a, HypFresh>) { -// r.headers_mut().set(header::TransferEncoding(vec![Encoding::Chunked])); -// *(r.status_mut()) = StatusCode::Ok; -// let mut stream = r.start(); +use response::*; +use std::io::{Read, Write, ErrorKind}; -// r.write() -// Response { -// status: StatusCode::Ok, -// headers: headers, -// body: Body::Stream(r) -// } -// } -// } +/// The size of each chunk in the streamed response. +pub const CHUNK_SIZE: usize = 4096; + +pub struct Stream(pub Box); + +impl Stream { + pub fn from(reader: T) -> Stream { + Stream(Box::new(reader)) + } +} + +impl Responder for Stream { + fn respond<'a>(&mut self, mut res: FreshHyperResponse<'a>) -> Outcome<'a> { + let mut stream = res.start().unwrap(); + let mut buffer = [0; CHUNK_SIZE]; + let mut complete = false; + while !complete { + let mut left = CHUNK_SIZE; + while left > 0 && !complete { + match self.0.read(&mut buffer[..left]) { + Ok(n) if n == 0 => complete = true, + Ok(n) if n < left => left -= n, + Ok(n) if n == left => left = CHUNK_SIZE, + Ok(n) => unreachable!("Impossible byte count {}/{}!", n, left), + Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, + Err(ref e) => { + error_!("Error streaming response: {:?}", e); + return Outcome::FailStop; + } + } + } + + if let Err(e) = stream.write_all(&buffer) { + error_!("Stream write_all() failed: {:?}", e); + return Outcome::FailStop; + } + } + + if let Err(e) = stream.end() { + error_!("Stream end() failed: {:?}", e); + return Outcome::FailStop; + } + + Outcome::Complete + } +}