Skip to content

Commit

Permalink
Use request passport in HTTP and ops modules
Browse files Browse the repository at this point in the history
The peer module doesn't use it yet, only what is required to make it
compile.
  • Loading branch information
Thomasdezeeuw committed Aug 20, 2020
1 parent ff20a35 commit 7001a61
Show file tree
Hide file tree
Showing 9 changed files with 659 additions and 214 deletions.
71 changes: 49 additions & 22 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use parking_lot::{Once, RwLock};
use crate::buffer::{Buffer, WriteBuffer};
use crate::error::Describe;
use crate::op::{self, Outcome};
use crate::passport::{Passport, Uuid};
use crate::passport::{Event, Passport, Uuid};
use crate::peer::Peers;
use crate::storage::{Blob, BlobEntry};
use crate::{db, Key};
Expand Down Expand Up @@ -196,7 +196,9 @@ pub async fn actor(
let start = Instant::now();
let response = match result {
// Parsed a request, now route and process it.
Ok(true) => route_request(&mut ctx, &mut db_ref, &peers, &mut conn, &request).await?,
Ok(true) => {
route_request(&mut ctx, &mut db_ref, &peers, &mut conn, &mut request).await?
}
// Read all requests on the stream, so this actor's work is done.
Ok(false) => break,
// Try operator returns the I/O errors.
Expand All @@ -206,7 +208,7 @@ pub async fn actor(
let result = Deadline::timeout(
&mut ctx,
TIMEOUT,
conn.write_response(&response, request.id()),
conn.write_response(&response, &mut request.passport),
)
.await;
match result {
Expand All @@ -229,6 +231,8 @@ pub async fn actor(
response.status_code().0,
response.len(),
);
// Log the request passport to standard error.
info!("request passport: {}", request.passport);

// In cases were we don't/can't read the (entire) body we need to close
// the connection.
Expand Down Expand Up @@ -276,6 +280,8 @@ impl Connection {
/// Returns `true` if a request was read into `request`, `false` if there
/// are no more requests on `stream` or an error otherwise.
pub async fn read_request(&mut self, request: &mut Request) -> Result<bool, RequestError> {
request.reset();

let mut too_short = 0;
loop {
// At the start we likely don't have enough bytes to read the entire
Expand All @@ -285,6 +291,7 @@ impl Connection {
match request.parse(self.buf.as_bytes()) {
Ok(httparse::Status::Complete(bytes_read)) => {
self.buf.processed(bytes_read);
request.passport.mark(Event::ParsedHttpRequest);
trace!("read HTTP request: {:?}", request);
return Ok(true);
}
Expand Down Expand Up @@ -315,18 +322,22 @@ impl Connection {
pub async fn write_response(
&mut self,
response: &Response,
request_id: &Uuid,
passport: &mut Passport,
) -> io::Result<()> {
trace!("writing HTTP response: {:?}", response);
// Create a write buffer a write the request headers to it.
let (_, mut write_buf) = self.buf.split_write(Response::MAX_HEADERS_SIZE);
response.write_headers(&mut write_buf, request_id);
response.write_headers(&mut write_buf, passport.id());

let bufs = &mut [
IoSlice::new(write_buf.as_bytes()),
IoSlice::new(response.body()),
];
self.stream.write_all_vectored(bufs).await
// TODO: add a timeout to this.
self.stream
.write_all_vectored(bufs)
.await
.map(|()| passport.mark(Event::WrittenHttpResponse))
}
}

Expand Down Expand Up @@ -357,7 +368,7 @@ pub struct Request {

impl Request {
/// Create an empty `Request`.
pub const fn empty() -> Request {
pub fn empty() -> Request {
Request {
passport: Passport::empty(),
method: Method::Get,
Expand Down Expand Up @@ -389,13 +400,15 @@ impl Request {
}

/// Parse a request.
///
/// # Notes
///
/// Does not [`reset`] the request, use that before calling this method.
fn parse(&mut self, bytes: &[u8]) -> Result<httparse::Status<usize>, RequestError> {
let mut headers = [EMPTY_HEADER; MAX_HEADERS];
let mut req = httparse::Request::new(&mut headers);
match req.parse(bytes) {
Ok(httparse::Status::Complete(bytes_read)) => {
self.reset();

// TODO: check req.version?

self.path.push_str(req.path.unwrap_or(""));
Expand Down Expand Up @@ -554,14 +567,15 @@ async fn route_request(
db_ref: &mut ActorRef<db::Message>,
peers: &Peers,
conn: &mut Connection,
request: &Request,
request: &mut Request,
) -> crate::Result<Response> {
trace!("routing request: {:?}", request);
use Method::*;
match (request.method, &*request.path) {
(Post, "/blob") | (Post, "/blob/") => match request.length {
Some(length) => {
let (kind, should_close) = store_blob(ctx, db_ref, conn, peers, length).await?;
let (kind, should_close) =
store_blob(ctx, db_ref, conn, &mut request.passport, peers, length).await?;
Ok(Response {
is_head: false,
should_close,
Expand All @@ -579,7 +593,7 @@ async fn route_request(
Ok(()) => Ok(Response {
is_head: request.method.is_head(),
should_close: false,
kind: health_check(ctx, db_ref).await,
kind: health_check(ctx, db_ref, &mut request.passport).await,
}),
Err(err) => Ok(err),
}
Expand All @@ -590,7 +604,7 @@ async fn route_request(
Ok(key) => Ok(Response {
is_head: false,
should_close: false,
kind: retrieve_blob(ctx, db_ref, key, false).await,
kind: retrieve_blob(ctx, db_ref, &mut request.passport, key, false).await,
}),
Err(err) => Ok(Response {
is_head: false,
Expand All @@ -605,7 +619,7 @@ async fn route_request(
Ok(key) => Ok(Response {
is_head: true,
should_close: false,
kind: retrieve_blob(ctx, db_ref, key, true).await,
kind: retrieve_blob(ctx, db_ref, &mut request.passport, key, true).await,
}),
Err(err) => Ok(Response {
is_head: true,
Expand All @@ -620,7 +634,7 @@ async fn route_request(
Ok(key) => Ok(Response {
is_head: false,
should_close: false,
kind: remove_blob(ctx, db_ref, peers, key).await,
kind: remove_blob(ctx, db_ref, &mut request.passport, peers, key).await,
}),
Err(err) => Ok(Response {
is_head: false,
Expand Down Expand Up @@ -666,17 +680,18 @@ async fn store_blob(
ctx: &mut actor::Context<!>,
db_ref: &mut ActorRef<db::Message>,
conn: &mut Connection,
passport: &mut Passport,
peers: &Peers,
body_length: usize,
) -> crate::Result<(ResponseKind, bool)> {
match read_blob(ctx, conn, body_length).await {
match read_blob(ctx, conn, passport, body_length).await {
Ok(Outcome::Continue(())) => {}
Ok(Outcome::Done((response, should_close))) => return Ok((response, should_close)),
Err(err) => return Err(err),
}

// NOTE: `store_blob` will advance the buffer for use.
match op::store_blob(ctx, db_ref, peers, &mut conn.buf, body_length).await {
match op::store_blob(ctx, db_ref, passport, peers, &mut conn.buf, body_length).await {
Ok(key) => Ok((ResponseKind::Stored(key), false)),
Err(()) => Ok((ResponseKind::ServerError, true)),
}
Expand All @@ -686,9 +701,13 @@ async fn store_blob(
async fn read_blob(
ctx: &mut actor::Context<!>,
conn: &mut Connection,
passport: &mut Passport,
body_length: usize,
) -> crate::Result<Outcome<(), (ResponseKind, bool)>> {
trace!("reading blob from HTTP request body");
trace!(
"reading blob from HTTP request body: request_id={}",
passport.id()
);
// TODO: get this from a configuration.
const MAX_SIZE: usize = 1024 * 1024; // 1MB.

Expand Down Expand Up @@ -721,21 +740,27 @@ async fn read_blob(
"read blob from HTTP request body: length={}",
conn.buf.len()
);
passport.mark(Event::ReadHttpBody);
Ok(Outcome::Continue(()))
}

/// Retrieve the blob associated with `key` from the actor behind the `db_ref`.
async fn retrieve_blob(
ctx: &mut actor::Context<!>,
db_ref: &mut ActorRef<db::Message>,
passport: &mut Passport,
key: Key,
is_head: bool,
) -> ResponseKind {
match op::retrieve_blob(ctx, db_ref, key).await {
match op::retrieve_blob(ctx, db_ref, passport, key).await {
Ok(Some(BlobEntry::Stored(blob))) => {
if !is_head {
if let Err(err) = blob.prefetch() {
warn!("error prefetching blob, continuing: {}", err);
warn!(
"error prefetching blob, continuing: {}: request_id=\"{}\"",
err,
passport.id()
);
}
}
ResponseKind::Ok(blob)
Expand All @@ -750,10 +775,11 @@ async fn retrieve_blob(
async fn remove_blob(
ctx: &mut actor::Context<!>,
db_ref: &mut ActorRef<db::Message>,
passport: &mut Passport,
peers: &Peers,
key: Key,
) -> ResponseKind {
match op::remove_blob(ctx, db_ref, peers, key).await {
match op::remove_blob(ctx, db_ref, passport, peers, key).await {
Ok(Some(removed_at)) => ResponseKind::Removed(removed_at),
Ok(None) => ResponseKind::NotFound,
Err(()) => ResponseKind::ServerError,
Expand All @@ -764,8 +790,9 @@ async fn remove_blob(
async fn health_check(
ctx: &mut actor::Context<!>,
db_ref: &mut ActorRef<db::Message>,
passport: &mut Passport,
) -> ResponseKind {
match op::check_health(ctx, db_ref).await {
match op::check_health(ctx, db_ref, passport).await {
Ok(..) => ResponseKind::HealthOk,
Err(()) => ResponseKind::ServerError,
}
Expand Down
Loading

0 comments on commit 7001a61

Please sign in to comment.