Implement streaming requests.

This commit is contained in:
Sergio Benitez 2016-10-09 04:29:02 -07:00
parent 07204a25dd
commit d8db812856
15 changed files with 271 additions and 67 deletions

View File

@ -25,4 +25,5 @@ members = [
"examples/handlebars_templates",
"examples/form_kitchen_sink",
"examples/config",
"examples/hello_alt_methods",
]

View File

@ -1,10 +1,8 @@
extern crate tera;
use std::path::PathBuf;
use self::tera::Renderer;
use super::serde::Serialize;
use super::serde_json;
use super::{TemplateInfo, TEMPLATE_DIR};
lazy_static! {
@ -19,17 +17,13 @@ pub const EXT: &'static str = "tera";
pub fn render<T>(name: &str, info: &TemplateInfo, context: &T) -> Option<String>
where T: Serialize
{
let template = match TERA.get_template(&info.path.to_string_lossy()) {
Ok(template) => template,
Err(_) => {
error_!("Tera template '{}' does not exist.", name);
let template_name = &info.path.to_string_lossy();
if TERA.get_template(template_name).is_err() {
error_!("Tera template '{}' does not exist.", template_name);
return None;
}
};
let value = serde_json::to_value(&context);
let mut renderer = Renderer::new_with_json(template, &TERA, value);
match renderer.render() {
match TERA.value_render(template_name, &context) {
Ok(string) => Some(string),
Err(e) => {
error_!("Error rendering Tera template '{}': {}", name, e);

View File

@ -0,0 +1,9 @@
[package]
name = "hello_alt_methods"
version = "0.0.1"
authors = ["Sergio Benitez <sb@sergio.bz>"]
workspace = "../../"
[dependencies]
rocket = { path = "../../lib" }
rocket_codegen = { path = "../../codegen" }

View File

@ -0,0 +1,24 @@
#![feature(plugin)]
#![plugin(rocket_codegen)]
extern crate rocket;
use std::io;
use rocket::response::NamedFile;
#[get("/")]
fn index() -> io::Result<NamedFile> {
NamedFile::open("static/index.html")
}
#[put("/")]
fn put() -> &'static str {
"Hello, PUT request!"
}
fn main() {
rocket::ignite()
.mount("/", routes![index, put])
.launch();
}

View File

@ -0,0 +1,14 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width" />
<title>Hello Alt Methods</title>
</head>
<body>
<form action="/" method="post" accept-charset="utf-8">
<input type="hidden" name="_method" id="_method" value="put" />
<input type="submit" name="Submit" id="Submit" value="submit" />
</form>
</body>
</html>

View File

@ -30,10 +30,10 @@ fn upload(req: &Request, data: Data) -> Response {
return Response::failed(StatusCode::BadRequest);
}
let file = File::create("upload.txt");
let file = File::create("/tmp/upload.txt");
if let Ok(mut file) = file {
if io::copy(&mut data.open(), &mut file).is_ok() {
return Response::complete("Upload successful.");
if let Ok(n) = io::copy(&mut data.open(), &mut file) {
return Response::complete(format!("OK: {} bytes uploaded.", n));
}
println!(" => Failed copying.");

View File

@ -6,10 +6,14 @@ authors = ["Sergio Benitez <sb@sergio.bz>"]
[dependencies]
term-painter = "^0.2"
log = "^0.3"
hyper = { version = "^0.9", default-features = false }
url = "^1"
# mime = "^0.2"
toml = "^0.2"
[dependencies.hyper]
git = "https://github.com/SergioBenitez/hyper"
default-features = false
branch = "0.9.x"
[dev-dependencies]
lazy_static = "*"

View File

@ -1,4 +1,4 @@
// TODO: Removed from Rocket in favor of a more flexible HTTP library.
// TODO: Remove from Rocket in favor of a more flexible HTTP library.
pub use hyper::server::Request as HyperRequest;
pub use hyper::server::Response as HyperResponse;
pub use hyper::server::Server as HyperServer;
@ -13,10 +13,19 @@ pub use hyper::header::SetCookie as HyperSetCookie;
pub use hyper::method::Method as HyperMethod;
pub use hyper::uri::RequestUri as HyperRequestUri;
pub use hyper::net::Fresh as HyperFresh;
pub use hyper::net::HttpStream as HyperHttpStream;
pub use hyper::net::NetworkStream as HyperNetworkStream;
pub use hyper::http::h1::HttpReader as HyperHttpReader;
pub use hyper::header;
// This is okay.
// This is okay for now.
pub use hyper::status::StatusCode;
// TODO: Removed from Rocket in favor of a more flexible HTTP library.
// TODO: Remove from Rocket in favor of a more flexible HTTP library.
pub type FreshHyperResponse<'a> = self::HyperResponse<'a, self::HyperFresh>;
// TODO: Remove from Rocket in favor of a more flexible HTTP library.
use hyper::buffer::BufReader;
pub type HyperBodyReader<'a, 'b> =
HyperHttpReader<&'a mut BufReader<&'b mut HyperNetworkStream>>;

View File

@ -74,6 +74,12 @@ impl Log for RocketLogger {
return;
}
// Don't print Hyper's messages unless Debug is enabled.
let from_hyper = record.location().module_path().starts_with("hyper::");
if from_hyper && self.0 != LoggingLevel::Debug {
return;
}
// In Rocket, we abuse target with value "_" to indicate indentation.
if record.target() == "_" && self.0 != LoggingLevel::Critical {
print!(" {} ", White.paint("=>"));

View File

@ -16,6 +16,15 @@ pub enum Outcome<T> {
}
impl<T> Outcome<T> {
pub fn of<A, B: fmt::Debug>(result: Result<A, B>) -> Outcome<T> {
if let Err(e) = result {
error_!("{:?}", e);
return Outcome::Failure;
}
Outcome::Success
}
pub fn as_str(&self) -> &'static str {
match *self {
Outcome::Success => "Success",

View File

@ -1,23 +1,133 @@
use std::io::{BufRead, Read, Cursor, BufReader};
use std::io::{self, BufRead, Read, Cursor, BufReader, Chain, Take};
use std::time::Duration;
use std::net::Shutdown;
use http::hyper::{HyperBodyReader, HyperHttpStream, HyperHttpReader};
use http::hyper::HyperNetworkStream;
use http::hyper::HyperHttpReader::*;
type StreamReader = HyperHttpReader<HyperHttpStream>;
pub struct DataStream {
stream: Chain<Take<Cursor<Vec<u8>>>, BufReader<StreamReader>>,
network: HyperHttpStream,
}
impl Read for DataStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.stream.read(buf)
}
}
impl BufRead for DataStream {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.stream.fill_buf()
}
fn consume(&mut self, amt: usize) {
self.stream.consume(amt)
}
}
fn try_sinking<N: HyperNetworkStream>(net: &mut N) -> bool {
warn_!("Data left unread. Sinking 1k bytes.");
io::copy(&mut net.take(1024), &mut io::sink()).expect("sink");
// If there are any more bytes, kill it.
let mut buf = [0];
if let Ok(n) = net.read(&mut buf) {
if n > 0 {
warn_!("Data still remains. Force closing network stream.");
return net.close(Shutdown::Both).is_ok();
}
}
false
}
impl Drop for DataStream {
// Be a bad citizen and close the TCP stream if there's unread data.
// Unfortunately, Hyper forces us to do this.
fn drop(&mut self) {
try_sinking(&mut self.network);
}
}
pub struct Data {
buffer: Vec<u8>,
stream: Cursor<Vec<u8>>,
stream: StreamReader,
position: usize,
capacity: usize,
}
impl Drop for Data {
fn drop(&mut self) {
try_sinking(self.stream.get_mut());
}
}
impl Data {
pub fn open(self) -> impl BufRead {
Cursor::new(self.buffer).chain(BufReader::new(self.stream))
pub fn open(mut self) -> impl BufRead {
// Swap out the buffer and stream for empty ones so we can move.
let mut buffer = vec![];
let mut stream = EmptyReader(self.stream.get_ref().clone());
::std::mem::swap(&mut buffer, &mut self.buffer);
::std::mem::swap(&mut stream, &mut self.stream);
// Setup the underlying reader at the correct pointers.
let mut cursor = Cursor::new(buffer);
cursor.set_position(self.position as u64);
let buffered = cursor.take((self.capacity - self.position) as u64);
// Create the actual DataSteam.
DataStream {
network: stream.get_ref().clone(),
stream: buffered.chain(BufReader::new(stream)),
}
}
#[doc(hidden)]
pub fn from_hyp(mut h_body: HyperBodyReader) -> Result<Data, &'static str> {
// FIXME: This is asolutely terrible, thanks to Hyper.
// Retrieve the underlying HTTPStream from Hyper.
let mut stream = match h_body.get_ref().get_ref()
.downcast_ref::<HyperHttpStream>() {
Some(s) => {
let owned_stream = s.clone();
let buf_len = h_body.get_ref().get_buf().len() as u64;
match h_body {
SizedReader(_, n) => SizedReader(owned_stream, n - buf_len),
EofReader(_) => EofReader(owned_stream),
EmptyReader(_) => EmptyReader(owned_stream),
ChunkedReader(_, n) =>
ChunkedReader(owned_stream, n.map(|k| k - buf_len)),
}
},
None => return Err("Stream is not an HTTP stream!"),
};
// Set the read timeout to 5 seconds.
stream.get_mut().set_read_timeout(Some(Duration::from_secs(5))).unwrap();
// Create the Data object from hyper's buffer.
let (vec, pos, cap) = h_body.get_mut().take_buf();
Ok(Data::new(vec, pos, cap, stream))
}
pub fn peek(&self) -> &[u8] {
&self.buffer
&self.buffer[self.position..self.capacity]
}
pub fn new() -> Data {
pub fn new(buf: Vec<u8>, pos: usize, cap: usize, stream: StreamReader) -> Data {
// TODO: Make sure we always try to get some number of bytes in the
// buffer so that peek actually does something.
// const PEEK_BYTES: usize = 4096;
Data {
stream: Cursor::new(vec![]),
buffer: vec![]
buffer: buf,
stream: stream,
position: pos,
capacity: cap,
}
}
}

View File

@ -31,7 +31,6 @@ impl<'a> Responder for Redirect {
res.headers_mut().set(header::ContentLength(0));
res.headers_mut().set(header::Location(self.1.clone()));
*(res.status_mut()) = self.0;
res.send(b"").unwrap();
Outcome::Success
Outcome::of(res.send(b""))
}
}

View File

@ -21,8 +21,7 @@ impl<'a> Responder for &'a str {
res.headers_mut().set(header::ContentType(mime));
}
res.send(self.as_bytes()).unwrap();
Outcome::Success
Outcome::of(res.send(self.as_bytes()))
}
}
@ -32,47 +31,53 @@ impl Responder for String {
let mime = Mime(TopLevel::Text, SubLevel::Html, vec![]);
res.headers_mut().set(header::ContentType(mime));
}
res.send(self.as_bytes()).unwrap();
Outcome::Success
Outcome::of(res.send(self.as_bytes()))
}
}
impl Responder for File {
fn respond<'a>(&mut self, mut res: FreshHyperResponse<'a>) -> ResponseOutcome<'a> {
let size = self.metadata().unwrap().len();
res.headers_mut().set(header::ContentLength(size));
*(res.status_mut()) = StatusCode::Ok;
let size = match self.metadata() {
Ok(md) => md.len(),
Err(e) => {
error_!("Failed to read file metadata: {:?}", e);
return Outcome::Forward((StatusCode::InternalServerError, res));
}
};
let mut v = Vec::new();
self.read_to_end(&mut v).unwrap();
if let Err(e) = self.read_to_end(&mut v) {
error_!("Failed to read file: {:?}", e);
return Outcome::Forward((StatusCode::InternalServerError, res));
}
let mut stream = res.start().unwrap();
stream.write_all(&v).unwrap();
Outcome::Success
res.headers_mut().set(header::ContentLength(size));
Outcome::of(res.start().and_then(|mut stream| stream.write_all(&v)))
}
}
impl<T: Responder> Responder for Option<T> {
fn respond<'a>(&mut self, res: FreshHyperResponse<'a>) -> ResponseOutcome<'a> {
if self.is_none() {
warn_!("response was `None`");
return Outcome::Forward((StatusCode::NotFound, res));
if let Some(ref mut val) = *self {
val.respond(res)
} else {
warn_!("Response was `None`.");
Outcome::Forward((StatusCode::NotFound, res))
}
self.as_mut().unwrap().respond(res)
}
}
impl<T: Responder, E: fmt::Debug> Responder for Result<T, E> {
// prepend with `default` when using impl specialization
default fn respond<'a>(&mut self, res: FreshHyperResponse<'a>) -> ResponseOutcome<'a> {
if self.is_err() {
error_!("{:?}", self.as_ref().err().unwrap());
return Outcome::Forward((StatusCode::InternalServerError, res));
match *self {
Ok(ref mut val) => val.respond(res),
Err(ref e) => {
error_!("{:?}", e);
Outcome::Forward((StatusCode::InternalServerError, res))
}
}
self.as_mut().unwrap().respond(res)
}
}

View File

@ -27,7 +27,14 @@ impl<T: Read> Stream<T> {
impl<T: Read> Responder for Stream<T> {
fn respond<'a>(&mut self, res: FreshHyperResponse<'a>) -> ResponseOutcome<'a> {
let mut stream = res.start().unwrap();
let mut stream = match res.start() {
Ok(s) => s,
Err(e) => {
error_!("Failed opening response stream: {:?}", e);
return Outcome::Failure;
}
};
let mut buffer = [0; CHUNK_SIZE];
let mut complete = false;
while !complete {

View File

@ -9,7 +9,7 @@ use term_painter::ToStyle;
use config;
use logger;
use request::{Request, Data, FormItems};
use response::{Response};
use response::Response;
use router::{Router, Route};
use catcher::{self, Catcher};
use outcome::Outcome;
@ -45,14 +45,11 @@ impl Rocket {
let uri = hyp_req.uri.to_string();
// Get all of the information from Hyper.
let (_, h_method, h_headers, h_uri, _, mut _body) = hyp_req.deconstruct();
let (_, h_method, h_headers, h_uri, _, h_body) = hyp_req.deconstruct();
// Try to create a Rocket request from the hyper request info.
let request = match Request::new(h_method, h_headers, h_uri) {
Ok(mut req) => {
self.preprocess_request(&mut req);
req
}
let mut request = match Request::new(h_method, h_headers, h_uri) {
Ok(req) => req,
Err(ref reason) => {
let mock_request = Request::mock(Method::Get, uri.as_str());
debug_!("Bad request: {}", reason);
@ -62,9 +59,20 @@ impl Rocket {
};
// Retrieve the data from the request.
let mut data = Data::new();
let mut data = match Data::from_hyp(h_body) {
Ok(data) => data,
Err(reason) => {
debug_!("Bad data in request: {}", reason);
return self.handle_error(StatusCode::InternalServerError,
&request, res);
}
};
// Preprocess the request.
self.preprocess_request(&mut request, &data);
info!("{}:", request);
info_!("Peek size: {} bytes", data.peek().len());
let matches = self.router.route(&request);
for route in matches {
// Retrieve and set the requests parameters.
@ -108,14 +116,14 @@ impl Rocket {
/// Preprocess the request for Rocket-specific things. At this time, we're
/// only checking for _method in forms.
fn preprocess_request(&self, req: &mut Request) {
fn preprocess_request(&self, req: &mut Request, data: &Data) {
// Check if this is a form and if the form contains the special _method
// field which we use to reinterpret the request's method.
let data_len = req.data.len();
let data_len = data.peek().len();
let (min_len, max_len) = ("_method=get".len(), "_method=delete".len());
if req.content_type().is_form() && data_len >= min_len {
let form = unsafe {
from_utf8_unchecked(&req.data.as_slice()[..min(data_len, max_len)])
from_utf8_unchecked(&data.peek()[..min(data_len, max_len)])
};
let mut form_items = FormItems(form);
@ -132,8 +140,13 @@ impl Rocket {
code: StatusCode,
req: &'r Request,
response: FreshHyperResponse) {
error_!("Dispatch failed: {}.", code);
let catcher = self.catchers.get(&code.to_u16()).unwrap();
// Find the catcher or use the one for internal server errors.
let catcher = self.catchers.get(&code.to_u16()).unwrap_or_else(|| {
error_!("No catcher found for {}.", code);
warn_!("Using internal server error catcher.");
self.catchers.get(&500).expect("500 Catcher")
});
if let Some(mut responder) = catcher.handle(Error::NoRoute, req).responder() {
if responder.respond(response) != Outcome::Success {
@ -144,7 +157,8 @@ impl Rocket {
} else {
error_!("Catcher returned an incomplete response.");
warn_!("Using default error response.");
let catcher = self.default_catchers.get(&code.to_u16()).unwrap();
let catcher = self.default_catchers.get(&code.to_u16())
.unwrap_or(self.default_catchers.get(&500).expect("500 default"));
let responder = catcher.handle(Error::Internal, req).responder();
responder.unwrap().respond(response).expect_success()
}
@ -166,8 +180,7 @@ impl Rocket {
pub fn catch(mut self, catchers: Vec<Catcher>) -> Self {
info!("👾 {}:", Magenta.paint("Catchers"));
for c in catchers {
if self.catchers.contains_key(&c.code) &&
!self.catchers.get(&c.code).unwrap().is_default() {
if self.catchers.get(&c.code).map_or(false, |e| e.is_default()) {
let msg = format!("warning: overrides {} catcher!", c.code);
warn!("{} ({})", c, Yellow.paint(msg.as_str()));
} else {