Add streaming responder and example.

This commit is contained in:
Sergio Benitez 2016-09-12 02:43:34 -07:00
parent 4e03bb6107
commit 8824d498d1
6 changed files with 96 additions and 15 deletions

View File

@ -17,4 +17,5 @@ members = [
"examples/hello_ranks",
"examples/testing",
"examples/from_request",
"examples/stream",
]

View File

@ -0,0 +1,9 @@
[package]
name = "stream"
version = "0.0.1"
authors = ["Sergio Benitez <sb@sergio.bz>"]
workspace = "../../"
[dependencies]
rocket = { path = "../../lib" }
rocket_codegen = { path = "../../codegen" }

View File

@ -0,0 +1,20 @@
#![feature(plugin)]
#![plugin(rocket_codegen)]
extern crate rocket;
use rocket::Rocket;
use rocket::response::{Plain, Stream};
use std::io::{repeat, Repeat, Read, Take};
type LimitedRepeat = Take<Repeat>;
#[get("/")]
fn root() -> Plain<Stream<LimitedRepeat>> {
Plain(Stream::from(repeat('a' as u8).take(25000)))
}
fn main() {
Rocket::new("localhost", 8000).mount_and_launch("/", routes![root]);
}

View File

@ -6,6 +6,7 @@ mod outcome;
mod flash;
mod data_type;
mod named_file;
mod stream;
pub use hyper::server::Response as HyperResponse;
pub use hyper::net::Fresh as HyperFresh;
@ -21,6 +22,7 @@ pub use self::with_status::StatusResponse;
pub use self::outcome::Outcome;
pub use self::flash::Flash;
pub use self::named_file::NamedFile;
pub use self::stream::Stream;
use std::ops::{Deref, DerefMut};

View File

@ -3,7 +3,9 @@ use std::fs::File;
use std::path::{Path, PathBuf};
use response::mime::{Mime, TopLevel, SubLevel};
use std::io;
use std::ops::{Deref, DerefMut};
#[derive(Debug)]
pub struct NamedFile(PathBuf, File);
impl NamedFile {
@ -46,3 +48,17 @@ impl Responder for NamedFile {
}
}
impl Deref for NamedFile {
type Target = File;
fn deref(&self) -> &File {
&self.1
}
}
impl DerefMut for NamedFile {
fn deref_mut(&mut self) -> &mut File {
&mut self.1
}
}

View File

@ -1,16 +1,49 @@
// const CHUNK_SIZE: u32 = 4096;
// pub struct Stream<T: Read>(T);
// impl<T> Responder for Stream<T> {
// fn respond<'a>(&self, mut r: HypResponse<'a, HypFresh>) {
// r.headers_mut().set(header::TransferEncoding(vec![Encoding::Chunked]));
// *(r.status_mut()) = StatusCode::Ok;
// let mut stream = r.start();
use response::*;
use std::io::{Read, Write, ErrorKind};
// r.write()
// Response {
// status: StatusCode::Ok,
// headers: headers,
// body: Body::Stream(r)
// }
// }
// }
/// The size of each chunk in the streamed response.
pub const CHUNK_SIZE: usize = 4096;
pub struct Stream<T: Read>(pub Box<T>);
impl<T: Read> Stream<T> {
pub fn from(reader: T) -> Stream<T> {
Stream(Box::new(reader))
}
}
impl<T: Read> Responder for Stream<T> {
fn respond<'a>(&mut self, mut res: FreshHyperResponse<'a>) -> Outcome<'a> {
let mut stream = res.start().unwrap();
let mut buffer = [0; CHUNK_SIZE];
let mut complete = false;
while !complete {
let mut left = CHUNK_SIZE;
while left > 0 && !complete {
match self.0.read(&mut buffer[..left]) {
Ok(n) if n == 0 => complete = true,
Ok(n) if n < left => left -= n,
Ok(n) if n == left => left = CHUNK_SIZE,
Ok(n) => unreachable!("Impossible byte count {}/{}!", n, left),
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(ref e) => {
error_!("Error streaming response: {:?}", e);
return Outcome::FailStop;
}
}
}
if let Err(e) = stream.write_all(&buffer) {
error_!("Stream write_all() failed: {:?}", e);
return Outcome::FailStop;
}
}
if let Err(e) = stream.end() {
error_!("Stream end() failed: {:?}", e);
return Outcome::FailStop;
}
Outcome::Complete
}
}