diff --git a/src/http.rs b/src/http.rs index b13fbf3..07fadba 100644 --- a/src/http.rs +++ b/src/http.rs @@ -46,7 +46,7 @@ use parking_lot::{Once, RwLock}; use crate::buffer::{Buffer, WriteBuffer}; use crate::error::Describe; use crate::op::{self, Outcome}; -use crate::passport::{Passport, Uuid}; +use crate::passport::{Event, Passport, Uuid}; use crate::peer::Peers; use crate::storage::{Blob, BlobEntry}; use crate::{db, Key}; @@ -196,7 +196,9 @@ pub async fn actor( let start = Instant::now(); let response = match result { // Parsed a request, now route and process it. - Ok(true) => route_request(&mut ctx, &mut db_ref, &peers, &mut conn, &request).await?, + Ok(true) => { + route_request(&mut ctx, &mut db_ref, &peers, &mut conn, &mut request).await? + } // Read all requests on the stream, so this actor's work is done. Ok(false) => break, // Try operator returns the I/O errors. @@ -206,7 +208,7 @@ pub async fn actor( let result = Deadline::timeout( &mut ctx, TIMEOUT, - conn.write_response(&response, request.id()), + conn.write_response(&response, &mut request.passport), ) .await; match result { @@ -229,6 +231,8 @@ pub async fn actor( response.status_code().0, response.len(), ); + // Log the request passport to standard error. + info!("request passport: {}", request.passport); // In cases were we don't/can't read the (entire) body we need to close // the connection. @@ -276,6 +280,8 @@ impl Connection { /// Returns `true` if a request was read into `request`, `false` if there /// are no more requests on `stream` or an error otherwise. pub async fn read_request(&mut self, request: &mut Request) -> Result { + request.reset(); + let mut too_short = 0; loop { // At the start we likely don't have enough bytes to read the entire @@ -285,6 +291,7 @@ impl Connection { match request.parse(self.buf.as_bytes()) { Ok(httparse::Status::Complete(bytes_read)) => { self.buf.processed(bytes_read); + request.passport.mark(Event::ParsedHttpRequest); trace!("read HTTP request: {:?}", request); return Ok(true); } @@ -315,18 +322,22 @@ impl Connection { pub async fn write_response( &mut self, response: &Response, - request_id: &Uuid, + passport: &mut Passport, ) -> io::Result<()> { trace!("writing HTTP response: {:?}", response); // Create a write buffer a write the request headers to it. let (_, mut write_buf) = self.buf.split_write(Response::MAX_HEADERS_SIZE); - response.write_headers(&mut write_buf, request_id); + response.write_headers(&mut write_buf, passport.id()); let bufs = &mut [ IoSlice::new(write_buf.as_bytes()), IoSlice::new(response.body()), ]; - self.stream.write_all_vectored(bufs).await + // TODO: add a timeout to this. + self.stream + .write_all_vectored(bufs) + .await + .map(|()| passport.mark(Event::WrittenHttpResponse)) } } @@ -357,7 +368,7 @@ pub struct Request { impl Request { /// Create an empty `Request`. - pub const fn empty() -> Request { + pub fn empty() -> Request { Request { passport: Passport::empty(), method: Method::Get, @@ -389,13 +400,15 @@ impl Request { } /// Parse a request. + /// + /// # Notes + /// + /// Does not [`reset`] the request, use that before calling this method. fn parse(&mut self, bytes: &[u8]) -> Result, RequestError> { let mut headers = [EMPTY_HEADER; MAX_HEADERS]; let mut req = httparse::Request::new(&mut headers); match req.parse(bytes) { Ok(httparse::Status::Complete(bytes_read)) => { - self.reset(); - // TODO: check req.version? self.path.push_str(req.path.unwrap_or("")); @@ -554,14 +567,15 @@ async fn route_request( db_ref: &mut ActorRef, peers: &Peers, conn: &mut Connection, - request: &Request, + request: &mut Request, ) -> crate::Result { trace!("routing request: {:?}", request); use Method::*; match (request.method, &*request.path) { (Post, "/blob") | (Post, "/blob/") => match request.length { Some(length) => { - let (kind, should_close) = store_blob(ctx, db_ref, conn, peers, length).await?; + let (kind, should_close) = + store_blob(ctx, db_ref, conn, &mut request.passport, peers, length).await?; Ok(Response { is_head: false, should_close, @@ -579,7 +593,7 @@ async fn route_request( Ok(()) => Ok(Response { is_head: request.method.is_head(), should_close: false, - kind: health_check(ctx, db_ref).await, + kind: health_check(ctx, db_ref, &mut request.passport).await, }), Err(err) => Ok(err), } @@ -590,7 +604,7 @@ async fn route_request( Ok(key) => Ok(Response { is_head: false, should_close: false, - kind: retrieve_blob(ctx, db_ref, key, false).await, + kind: retrieve_blob(ctx, db_ref, &mut request.passport, key, false).await, }), Err(err) => Ok(Response { is_head: false, @@ -605,7 +619,7 @@ async fn route_request( Ok(key) => Ok(Response { is_head: true, should_close: false, - kind: retrieve_blob(ctx, db_ref, key, true).await, + kind: retrieve_blob(ctx, db_ref, &mut request.passport, key, true).await, }), Err(err) => Ok(Response { is_head: true, @@ -620,7 +634,7 @@ async fn route_request( Ok(key) => Ok(Response { is_head: false, should_close: false, - kind: remove_blob(ctx, db_ref, peers, key).await, + kind: remove_blob(ctx, db_ref, &mut request.passport, peers, key).await, }), Err(err) => Ok(Response { is_head: false, @@ -666,17 +680,18 @@ async fn store_blob( ctx: &mut actor::Context, db_ref: &mut ActorRef, conn: &mut Connection, + passport: &mut Passport, peers: &Peers, body_length: usize, ) -> crate::Result<(ResponseKind, bool)> { - match read_blob(ctx, conn, body_length).await { + match read_blob(ctx, conn, passport, body_length).await { Ok(Outcome::Continue(())) => {} Ok(Outcome::Done((response, should_close))) => return Ok((response, should_close)), Err(err) => return Err(err), } // NOTE: `store_blob` will advance the buffer for use. - match op::store_blob(ctx, db_ref, peers, &mut conn.buf, body_length).await { + match op::store_blob(ctx, db_ref, passport, peers, &mut conn.buf, body_length).await { Ok(key) => Ok((ResponseKind::Stored(key), false)), Err(()) => Ok((ResponseKind::ServerError, true)), } @@ -686,9 +701,13 @@ async fn store_blob( async fn read_blob( ctx: &mut actor::Context, conn: &mut Connection, + passport: &mut Passport, body_length: usize, ) -> crate::Result> { - trace!("reading blob from HTTP request body"); + trace!( + "reading blob from HTTP request body: request_id={}", + passport.id() + ); // TODO: get this from a configuration. const MAX_SIZE: usize = 1024 * 1024; // 1MB. @@ -721,6 +740,7 @@ async fn read_blob( "read blob from HTTP request body: length={}", conn.buf.len() ); + passport.mark(Event::ReadHttpBody); Ok(Outcome::Continue(())) } @@ -728,14 +748,19 @@ async fn read_blob( async fn retrieve_blob( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, key: Key, is_head: bool, ) -> ResponseKind { - match op::retrieve_blob(ctx, db_ref, key).await { + match op::retrieve_blob(ctx, db_ref, passport, key).await { Ok(Some(BlobEntry::Stored(blob))) => { if !is_head { if let Err(err) = blob.prefetch() { - warn!("error prefetching blob, continuing: {}", err); + warn!( + "error prefetching blob, continuing: {}: request_id=\"{}\"", + err, + passport.id() + ); } } ResponseKind::Ok(blob) @@ -750,10 +775,11 @@ async fn retrieve_blob( async fn remove_blob( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, peers: &Peers, key: Key, ) -> ResponseKind { - match op::remove_blob(ctx, db_ref, peers, key).await { + match op::remove_blob(ctx, db_ref, passport, peers, key).await { Ok(Some(removed_at)) => ResponseKind::Removed(removed_at), Ok(None) => ResponseKind::NotFound, Err(()) => ResponseKind::ServerError, @@ -764,8 +790,9 @@ async fn remove_blob( async fn health_check( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, ) -> ResponseKind { - match op::check_health(ctx, db_ref).await { + match op::check_health(ctx, db_ref, passport).await { Ok(..) => ResponseKind::HealthOk, Err(()) => ResponseKind::ServerError, } diff --git a/src/op/mod.rs b/src/op/mod.rs index 82f8622..524aa28 100644 --- a/src/op/mod.rs +++ b/src/op/mod.rs @@ -13,6 +13,7 @@ use log::{debug, error, warn}; use crate::buffer::BufView; use crate::db::{self, HealthCheck, HealthOk}; +use crate::passport::{Event, Passport, Uuid}; use crate::peer::coordinator::relay; use crate::peer::{ConsensusId, PeerRpc, Peers}; use crate::storage::{self, BlobEntry, Keys, UncommittedBlob}; @@ -51,15 +52,32 @@ pub(crate) enum Outcome { pub async fn retrieve_blob( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, key: Key, ) -> Result, ()> where K: RuntimeAccess, { - debug!("running retrieve operation"); - match db_rpc(ctx, db_ref, key) { - Ok(rpc) => rpc.await, - Err(()) => Err(()), + debug!( + "running retrieve operation: request_id=\"{}\", key=\"{}\"", + passport.id(), + key + ); + match db_rpc(ctx, db_ref, *passport.id(), key) { + Ok(rpc) => match rpc.await { + Ok(blob) => { + passport.mark(Event::RetrievedBlob); + Ok(blob) + } + Err(()) => { + passport.mark(Event::FailedToRetrieveBlob); + Err(()) + } + }, + Err(()) => { + passport.mark(Event::FailedToRetrieveBlob); + Err(()) + } } } @@ -69,15 +87,32 @@ where pub(crate) async fn retrieve_uncommitted_blob( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, key: Key, ) -> Result>, ()> where K: RuntimeAccess, { - debug!("running uncommitted retrieve operation"); - match db_rpc(ctx, db_ref, key) { - Ok(rpc) => rpc.await, - Err(()) => Err(()), + debug!( + "running uncommitted retrieve operation: request_id=\"{}\", key=\"{}\"", + passport.id(), + key + ); + match db_rpc(ctx, db_ref, *passport.id(), key) { + Ok(rpc) => match rpc.await { + Ok(blob) => { + passport.mark(Event::RetrievedUncommittedBlob); + Ok(blob) + } + Err(()) => { + passport.mark(Event::FailedToRetrieveUncommittedBlob); + Err(()) + } + }, + Err(()) => { + passport.mark(Event::FailedToRetrieveUncommittedBlob); + Err(()) + } } } @@ -87,14 +122,30 @@ where pub(crate) async fn retrieve_keys( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, ) -> Result where K: RuntimeAccess, { - debug!("running retrieve keys operation"); - match db_rpc(ctx, db_ref, ()) { - Ok(rpc) => rpc.await, - Err(()) => Err(()), + debug!( + "running retrieve keys operation: request_id=\"{}\"", + passport.id() + ); + match db_rpc(ctx, db_ref, *passport.id(), ()) { + Ok(rpc) => match rpc.await { + Ok(keys) => { + passport.mark(Event::RetrievedKeys); + Ok(keys) + } + Err(()) => { + passport.mark(Event::FailedToRetrieveKeys); + Err(()) + } + }, + Err(()) => { + passport.mark(Event::FailedToRetrieveKeys); + Err(()) + } } } @@ -104,14 +155,27 @@ where pub async fn check_health( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, ) -> Result where K: RuntimeAccess, { - debug!("running health check"); - match db_rpc(ctx, db_ref, HealthCheck) { - Ok(rpc) => rpc.await, - Err(()) => Err(()), + debug!("running health check: request_id=\"{}\"", passport.id()); + match db_rpc(ctx, db_ref, *passport.id(), HealthCheck) { + Ok(rpc) => match rpc.await { + Ok(result) => { + passport.mark(Event::HealthCheckComplete); + Ok(result) + } + Err(()) => { + passport.mark(Event::HealthCheckFailed); + Err(()) + } + }, + Err(()) => { + passport.mark(Event::HealthCheckFailed); + Err(()) + } } } @@ -119,16 +183,33 @@ where pub(crate) async fn sync_stored_blob( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, view: BufView, timestamp: SystemTime, ) -> Result where K: RuntimeAccess, { - debug!("syncing stored blob"); - match db_rpc(ctx, db_ref, (view, timestamp)) { - Ok(rpc) => rpc.await, - Err(()) => Err(()), + debug!( + "syncing stored blob: request_id=\"{}\", blob_length={}", + passport.id(), + view.len() + ); + match db_rpc(ctx, db_ref, *passport.id(), (view, timestamp)) { + Ok(rpc) => match rpc.await { + Ok(view) => { + passport.mark(Event::SyncedStoredBlob); + Ok(view) + } + Err(()) => { + passport.mark(Event::FailedToSyncStoredBlob); + Err(()) + } + }, + Err(()) => { + passport.mark(Event::FailedToSyncStoredBlob); + Err(()) + } } } @@ -136,16 +217,33 @@ where pub(crate) async fn sync_removed_blob( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, key: Key, timestamp: SystemTime, ) -> Result<(), ()> where K: RuntimeAccess, { - debug!("syncing removed blob"); - match db_rpc(ctx, db_ref, (key, timestamp)) { - Ok(rpc) => rpc.await, - Err(()) => Err(()), + debug!( + "syncing removed blob: request_id=\"{}\", key=\"{}\"", + passport.id(), + key + ); + match db_rpc(ctx, db_ref, *passport.id(), (key, timestamp)) { + Ok(rpc) => match rpc.await { + Ok(()) => { + passport.mark(Event::SyncedRemovedBlob); + Ok(()) + } + Err(()) => { + passport.mark(Event::FailedToSyncRemovedBlob); + Err(()) + } + }, + Err(()) => { + passport.mark(Event::FailedToSyncRemovedBlob); + Err(()) + } } } @@ -157,6 +255,7 @@ const DB_TIMEOUT: Duration = Duration::from_secs(1); fn db_rpc( ctx: &mut actor::Context, db_ref: &mut ActorRef, + request_id: Uuid, request: Req, ) -> Result, ()> where @@ -167,9 +266,13 @@ where Ok(rpc) => Ok(DbRpc { rpc, timer: Timer::timeout(ctx, DB_TIMEOUT), + request_id, }), Err(err) => { - error!("error making RPC call to database: {}", err); + error!( + "error making RPC call to database: {}: request_id=\"{}\"", + err, request_id + ); Err(()) } } @@ -184,6 +287,7 @@ where struct DbRpc { rpc: Rpc, timer: Timer, + request_id: Uuid, } impl Future for DbRpc { @@ -194,11 +298,17 @@ impl Future for DbRpc { match Pin::new(&mut self.rpc).poll(ctx) { Poll::Ready(Ok(res)) => Poll::Ready(Ok(res)), Poll::Ready(Err(err)) => { - error!("error waiting for RPC response from database: {}", err); + error!( + "error waiting for RPC response from database: {}: request_id=\"{}\"", + err, self.request_id + ); Poll::Ready(Err(())) } Poll::Pending => Pin::new(&mut self.timer).poll(ctx).map(|_| { - error!("timeout waiting for RPC response from database"); + error!( + "timeout waiting for RPC response from database: request_id=\"{}\"", + self.request_id + ); Err(()) }), } @@ -227,6 +337,7 @@ pub(crate) trait Query: storage::Query { &self, ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, ) -> Self::AlreadyDone; /// Start phase one of the 2PC protocol, asking the `peers` to prepare the @@ -237,6 +348,11 @@ pub(crate) trait Query: storage::Query { peers: &Peers, ) -> (ConsensusId, PeerRpc); + // Events in the request [`Passport`] regarding committing to a storage + // query. + const COMMITTED: Event; + const FAILED_TO_COMMIT: Event; + /// Second phase of the 2PC protocol, asking the `peers` to commit to the /// query. fn peers_commit( @@ -247,6 +363,10 @@ pub(crate) trait Query: storage::Query { timestamp: SystemTime, ) -> PeerRpc<()>; + // Events in the request [`Passport`] regarding aborting a storage query. + const ABORTED: Event; + const FAILED_TO_ABORT: Event; + /// Ask the `peers` to abort the query. fn peers_abort( &self, @@ -263,6 +383,7 @@ pub(crate) trait Query: storage::Query { async fn consensus( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, peers: &Peers, query: Q, ) -> Result @@ -273,6 +394,7 @@ where // The consensus id of a previous run, only used after we failed a consensus // run previously. let mut prev_consensus_id = None; + passport.mark(Event::StartingConsensus); // TODO: optimisation on a retry only let aborted/failed peers retry. @@ -281,11 +403,11 @@ where // It could be that one of the peers aborted because the operation // is already complete (i.e. the blob already stored/removed). Check // for that before proceeding. - match query.already_done(ctx, db_ref).await { + match query.already_done(ctx, db_ref, passport).await { Ok(Some(timestamp)) => { // Operation already completed by another actor. Abort the // old 2PC query (from the previous iteration). - return abort(ctx, db_ref, peers, consensus_id, query) + return abort(ctx, db_ref, passport, peers, consensus_id, query) .await .map(|()| timestamp); } @@ -301,12 +423,14 @@ where // storage layer. let (consensus_id, rpc) = query.peers_prepare(ctx, peers); debug!( - "requesting peers to prepare query: consensus_id={}, key={}", + "requesting peers to prepare query: request_id=\"{}\", consensus_id={}, key=\"{}\"", + passport.id(), consensus_id, query.key() ); // Wait for the results. let results = rpc.await; + passport.mark(Event::ConsensusPhaseOneResults); // If we failed a previous run we want to start aborting it now. // NOTE: we wait for the participants to prepare it first to ensure that @@ -318,13 +442,19 @@ where if aborted > 0 || failed > 0 { // TODO: allow some failure here. warn!( - "consensus algorithm failed: consensus_id={}, key={}, votes_commit={}, votes_abort={}, failed_votes={}", - consensus_id, query.key(), committed, aborted, failed + "consensus algorithm failed: request_id=\"{}\", consensus_id={}, key=\"{}\", votes_commit={}, votes_abort={}, failed_votes={}", + passport.id(), consensus_id, query.key(), committed, aborted, failed ); // Await aborting the previous run, if any. if let Some(abort_rpc) = abort_rpc { - abort_consensus(abort_rpc, prev_consensus_id.take().unwrap(), query.key()).await; + abort_consensus( + passport, + abort_rpc, + prev_consensus_id.take().unwrap(), + query.key(), + ) + .await; } // Try again, ensuring that this run is aborted in the next @@ -334,13 +464,14 @@ where } debug!( - "consensus algorithm succeeded: consensus_id={}, key={}, votes_commit={}, votes_abort={}, failed_votes={}", - consensus_id, query.key(), committed, aborted, failed + "consensus algorithm succeeded: request_id=\"{}\", consensus_id={}, key=\"{}\", votes_commit={}, votes_abort={}, failed_votes={}", + passport.id(), consensus_id, query.key(), committed, aborted, failed ); // Phase two of 2PC: ask the participants to commit. debug!( - "requesting peers to commit: consensus_id={}, key={}", + "requesting peers to commit: request_id=\"{}\", consensus_id={}, key=\"{}\"", + passport.id(), consensus_id, query.key() ); @@ -351,17 +482,24 @@ where // Await aborting the previous run, if any. if let Some(abort_rpc) = abort_rpc { - abort_consensus(abort_rpc, prev_consensus_id.take().unwrap(), query.key()).await; + abort_consensus( + passport, + abort_rpc, + prev_consensus_id.take().unwrap(), + query.key(), + ) + .await; } // Await the commit results. let results = rpc.await; + passport.mark(Event::ConsensusPhaseTwoResults); let (committed, aborted, failed) = count_consensus_votes(&results); if aborted > 0 || failed > 0 { // TODO: allow some failure here. warn!( - "consensus algorithm commitment failed: consensus_id={}, key={}, votes_commit={}, votes_abort={}, failed_votes={}", - consensus_id, query.key(), committed, aborted, failed + "consensus algorithm commitment failed: request_id=\"{}\", consensus_id={}, key=\"{}\", votes_commit={}, votes_abort={}, failed_votes={}", + passport.id(), consensus_id, query.key(), committed, aborted, failed ); // Try again, ensuring that this run is aborted in the next @@ -371,8 +509,8 @@ where } debug!( - "consensus algorithm commitment success: consensus_id={}, key={}, votes_commit={}, votes_abort={}, failed_votes={}", - consensus_id, query.key(), committed, aborted, failed + "consensus algorithm commitment success: request_id=\"{}\", consensus_id={}, key=\"{}\", votes_commit={}, votes_abort={}, failed_votes={}", + passport.id(), consensus_id, query.key(), committed, aborted, failed ); // NOTE: Its crucial here that at least a single peer received the @@ -386,23 +524,25 @@ where // response and then start the commit process ourselves, then we can // wait for the peers and the storing concurrently. let key = query.key().to_owned(); - let timestamp = commit_query(ctx, db_ref, query, timestamp).await?; + let timestamp = commit_query(ctx, db_ref, passport, query, timestamp).await?; // Let the participants know the operation is complete. Q::committed(peers, consensus_id, key, timestamp); + passport.mark(Event::ConsensusCommitted); return Ok(timestamp); } // Failed too many times. error!( - "failed {} consensus algorithm runs: key={}", + "failed {} consensus algorithm runs: request_id=\"{}\", key=\"{}\"", + passport.id(), MAX_CONSENSUS_TRIES, query.key() ); // Abort the last consensus run. let consensus_id = prev_consensus_id.unwrap(); - abort(ctx, db_ref, peers, consensus_id, query) + abort(ctx, db_ref, passport, peers, consensus_id, query) .await .and(Err(())) } @@ -411,18 +551,35 @@ where pub(crate) async fn commit_query( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, query: Q, timestamp: SystemTime, ) -> Result where K: RuntimeAccess, - Q: storage::Query, + Q: Query, db::Message: From>, { - debug!("committing to query: key={}", query.key()); - match db_rpc(ctx, db_ref, (query, timestamp)) { - Ok(rpc) => rpc.await, - Err(()) => Err(()), + debug!( + "committing to query: request_id=\"{}\", key=\"{}\"", + passport.id(), + query.key() + ); + match db_rpc(ctx, db_ref, *passport.id(), (query, timestamp)) { + Ok(rpc) => match rpc.await { + Ok(time) => { + passport.mark(Q::COMMITTED); + Ok(time) + } + Err(()) => { + passport.mark(Q::FAILED_TO_COMMIT); + Err(()) + } + }, + Err(()) => { + passport.mark(Q::FAILED_TO_COMMIT); + Err(()) + } } } @@ -430,6 +587,7 @@ where async fn abort( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, peers: &Peers, consensus_id: ConsensusId, query: Q, @@ -439,16 +597,23 @@ where db::Message: From>, { let abort_rpc = query.peers_abort(ctx, peers, consensus_id); - abort_consensus(abort_rpc, consensus_id, query.key()).await; - abort_query(ctx, db_ref, query).await + abort_consensus(passport, abort_rpc, consensus_id, query.key()).await; + abort_query(ctx, db_ref, passport, query).await } /// Await the results in `abort_rpc`, logging the results. -async fn abort_consensus(abort_rpc: PeerRpc<()>, consensus_id: ConsensusId, key: &Key) { +async fn abort_consensus( + passport: &mut Passport, + abort_rpc: PeerRpc<()>, + consensus_id: ConsensusId, + key: &Key, +) { let results = abort_rpc.await; + passport.mark(Event::AbortedConsensusRun); let (committed, aborted, failed) = count_consensus_votes(&results); warn!( - "aborted consensus algorithm: consensus_id={}, key={}, success={}, failed={}", + "aborted consensus algorithm: request_id=\"{}\", consensus_id=\"{}\", key=\"{}\", success={}, failed={}", + passport.id(), consensus_id, key, committed, @@ -460,17 +625,34 @@ async fn abort_consensus(abort_rpc: PeerRpc<()>, consensus_id: ConsensusId, key: pub(crate) async fn abort_query( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, query: Q, ) -> Result<(), ()> where K: RuntimeAccess, - Q: storage::Query, + Q: Query, db::Message: From>, { - debug!("aborting query: key={}", query.key()); - match db_rpc(ctx, db_ref, query) { - Ok(rpc) => rpc.await, - Err(()) => Err(()), + debug!( + "aborting query: request_id=\"{}\", key=\"{}\"", + passport.id(), + query.key() + ); + match db_rpc(ctx, db_ref, *passport.id(), query) { + Ok(rpc) => match rpc.await { + Ok(()) => { + passport.mark(Q::ABORTED); + Ok(()) + } + Err(()) => { + passport.mark(Q::FAILED_TO_ABORT); + Err(()) + } + }, + Err(()) => { + passport.mark(Q::FAILED_TO_ABORT); + Err(()) + } } } diff --git a/src/op/remove.rs b/src/op/remove.rs index 94fb79a..b40c876 100644 --- a/src/op/remove.rs +++ b/src/op/remove.rs @@ -11,6 +11,7 @@ use log::debug; use crate::db::{self, RemoveBlobResponse}; use crate::op::{commit_query, consensus, db_rpc, DbRpc, Outcome}; +use crate::passport::{Event, Passport}; use crate::peer::{ConsensusId, PeerRpc, Peers}; use crate::storage::{BlobEntry, Query, RemoveBlob}; use crate::Key; @@ -21,12 +22,17 @@ use crate::Key; pub async fn remove_blob( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, peers: &Peers, key: Key, ) -> Result, ()> { - debug!("running remove operation"); + debug!( + "running remove operation: request_id=\"{}\", key=\"{}\"", + passport.id(), + key, + ); - let query = match prep_remove_blob(ctx, db_ref, key.clone()).await { + let query = match prep_remove_blob(ctx, db_ref, passport, key.clone()).await { Ok(Outcome::Continue(query)) => query, // Already removed or never stored. Ok(Outcome::Done(timestamp)) => return Ok(timestamp), @@ -36,21 +42,25 @@ pub async fn remove_blob( if peers.is_empty() { // Easy mode! debug!( - "running in single mode, not running consensus algorithm to remove blob: key={}", - query.key() + "running in single mode, not running consensus algorithm to remove blob: request_id=\"{}\", key=\"{}\"", + passport.id(), + query.key(), ); // We can directly commit to removing the blob, we're always in // agreement with ourselves. - commit_query(ctx, db_ref, query, SystemTime::now()) + commit_query(ctx, db_ref, passport, query, SystemTime::now()) .await .map(Some) } else { // Hard mode. debug!( - "running consensus algorithm to remove blob: key={}", - query.key() + "running consensus algorithm to remove blob: request_id=\"{}\", key=\"{}\"", + passport.id(), + query.key(), ); - consensus(ctx, db_ref, peers, query).await.map(Some) + consensus(ctx, db_ref, passport, peers, query) + .await + .map(Some) } } @@ -58,20 +68,35 @@ pub async fn remove_blob( pub(crate) async fn prep_remove_blob( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, key: Key, ) -> Result>, ()> where K: RuntimeAccess, { - match db_rpc(ctx, db_ref, key) { + debug!( + "prepping storage to removing blob: request_id=\"{}\", key=\"{}\"", + passport.id(), + key, + ); + match db_rpc(ctx, db_ref, *passport.id(), key) { Ok(rpc) => match rpc.await { - Ok(result) => match result { - RemoveBlobResponse::Query(query) => Ok(Outcome::Continue(query)), - RemoveBlobResponse::NotStored(timestamp) => Ok(Outcome::Done(timestamp)), - }, - Err(()) => Err(()), + Ok(result) => { + passport.mark(Event::PreppedRemoveBlob); + match result { + RemoveBlobResponse::Query(query) => Ok(Outcome::Continue(query)), + RemoveBlobResponse::NotStored(timestamp) => Ok(Outcome::Done(timestamp)), + } + } + Err(()) => { + passport.mark(Event::FailedToPrepRemoveBlob); + Err(()) + } }, - Err(()) => Err(()), + Err(()) => { + passport.mark(Event::FailedToPrepRemoveBlob); + Err(()) + } } } @@ -82,10 +107,15 @@ impl super::Query for RemoveBlob { &self, ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, ) -> Self::AlreadyDone { - debug!("checking if blob is already removed: key={}", self.key()); + debug!( + "checking if blob is already removed: request_id=\"{}\", key=\"{}\"", + passport.id(), + self.key(), + ); AlreadyDone { - db_rpc: db_rpc(ctx, db_ref, self.key().clone()), + db_rpc: db_rpc(ctx, db_ref, *passport.id(), self.key().clone()), } } @@ -97,6 +127,9 @@ impl super::Query for RemoveBlob { peers.remove_blob(ctx, self.key().clone()) } + const COMMITTED: Event = Event::CommittedRemovingBlob; + const FAILED_TO_COMMIT: Event = Event::FailedToCommitRemovingBlob; + fn peers_commit( &self, ctx: &mut actor::Context, @@ -107,6 +140,9 @@ impl super::Query for RemoveBlob { peers.commit_to_remove_blob(ctx, id, self.key().clone(), timestamp) } + const ABORTED: Event = Event::AbortedRemovingBlob; + const FAILED_TO_ABORT: Event = Event::FailedToAbortRemovingBlob; + fn peers_abort( &self, ctx: &mut actor::Context, diff --git a/src/op/store.rs b/src/op/store.rs index 7b61e58..4a5e085 100644 --- a/src/op/store.rs +++ b/src/op/store.rs @@ -12,6 +12,7 @@ use log::debug; use crate::db::{self, AddBlobResponse}; use crate::op::{commit_query, consensus, db_rpc, DbRpc, Outcome}; +use crate::passport::{Event, Passport}; use crate::peer::{ConsensusId, PeerRpc, Peers}; use crate::storage::{BlobEntry, Query, StoreBlob}; use crate::{Buffer, Key}; @@ -22,13 +23,18 @@ use crate::{Buffer, Key}; pub async fn store_blob( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, peers: &Peers, blob: &mut Buffer, blob_length: usize, ) -> Result { - debug!("running store operation"); + debug!( + "running store operation: request_id=\"{}\", blob_length={}", + passport.id(), + blob_length, + ); - let query = match add_blob(ctx, db_ref, blob, blob_length).await { + let query = match add_blob(ctx, db_ref, passport, blob, blob_length).await { Ok(Outcome::Continue(query)) => query, Ok(Outcome::Done(key)) => return Ok(key), Err(()) => return Err(()), @@ -38,18 +44,25 @@ pub async fn store_blob( if peers.is_empty() { // Easy mode! debug!( - "running in single mode, not running consensus algorithm to store blob: key={}", - key + "running in single mode, not running consensus algorithm to store blob: request_id=\"{}\", key=\"{}\"", + passport.id(), + key, ); // We can directly commit to storing the blob, we're always in agreement // with ourselves. - commit_query(ctx, db_ref, query, SystemTime::now()) + commit_query(ctx, db_ref, passport, query, SystemTime::now()) .await .map(|_| key) } else { // Hard mode. - debug!("running consensus algorithm to store blob: key={}", key); - consensus(ctx, db_ref, peers, query).await.map(|_| key) + debug!( + "running consensus algorithm to store blob: request_id=\"{}\", key=\"{}\"", + passport.id(), + key, + ); + consensus(ctx, db_ref, passport, peers, query) + .await + .map(|_| key) } } @@ -57,6 +70,7 @@ pub async fn store_blob( pub(crate) async fn add_blob( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, buf: &mut Buffer, blob_length: usize, ) -> Result, ()> @@ -66,21 +80,33 @@ where // We need ownership of the `Buffer`, so temporarily replace it with an // empty one. let view = replace(buf, Buffer::empty()).view(blob_length); + debug!( + "adding blob to storage: request_id=\"{}\", blob_length={}", + passport.id(), + view.len() + ); - match db_rpc(ctx, db_ref, view) { + match db_rpc(ctx, db_ref, *passport.id(), view) { Ok(rpc) => match rpc.await { Ok((result, view)) => { // Mark the blob's bytes as processed and put back the buffer. *buf = view.processed(); + passport.mark(Event::AddedBlob); match result { AddBlobResponse::Query(query) => Ok(Outcome::Continue(query)), AddBlobResponse::AlreadyStored(key) => Ok(Outcome::Done(key)), } } - Err(()) => Err(()), + Err(()) => { + passport.mark(Event::FailedToAddBlob); + Err(()) + } }, - Err(()) => Err(()), + Err(()) => { + passport.mark(Event::FailedToAddBlob); + Err(()) + } } } @@ -91,10 +117,15 @@ impl super::Query for StoreBlob { &self, ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, ) -> Self::AlreadyDone { - debug!("checking if blob is already stored: key={}", self.key()); + debug!( + "checking if blob is already stored: request_id=\"{}\", key=\"{}\"", + passport.id(), + self.key(), + ); AlreadyDone { - db_rpc: db_rpc(ctx, db_ref, self.key().clone()), + db_rpc: db_rpc(ctx, db_ref, *passport.id(), self.key().clone()), } } @@ -106,6 +137,9 @@ impl super::Query for StoreBlob { peers.add_blob(ctx, self.key().clone()) } + const COMMITTED: Event = Event::CommittedStoringBlob; + const FAILED_TO_COMMIT: Event = Event::FailedToCommitStoringBlob; + fn peers_commit( &self, ctx: &mut actor::Context, @@ -116,6 +150,9 @@ impl super::Query for StoreBlob { peers.commit_to_store_blob(ctx, id, self.key().clone(), timestamp) } + const ABORTED: Event = Event::AbortedStoringBlob; + const FAILED_TO_ABORT: Event = Event::FailedToAbortStoringBlob; + fn peers_abort( &self, ctx: &mut actor::Context, diff --git a/src/op/sync.rs b/src/op/sync.rs index 3e58e3c..7921da3 100644 --- a/src/op/sync.rs +++ b/src/op/sync.rs @@ -19,11 +19,12 @@ use heph::rt::options::{ActorOptions, Priority}; use heph::rt::RuntimeAccess; use heph::timer::Deadline; use heph::{actor, Actor, NewActor, SupervisorStrategy}; -use log::{error, warn}; +use log::{debug, error, warn}; use crate::buffer::Buffer; use crate::db::{self, db_error}; use crate::op::{db_rpc, retrieve_blob, sync_removed_blob, sync_stored_blob}; +use crate::passport::{Passport, Uuid}; use crate::peer::server::{ BLOB_LENGTH_LEN, DATE_TIME_LEN, KEY_SET_SIZE_LEN, METADATA_LEN, REQUEST_BLOB, REQUEST_KEYS, STORE_BLOB, @@ -56,6 +57,7 @@ pub async fn full_sync( db_ref: &mut ActorRef, peers: &Peers, ) -> Result<(), ()> { + debug!("running full synchronisation"); if peers.is_empty() { return Ok(()); } @@ -66,7 +68,7 @@ pub async fn full_sync( ); // Keys stored locally. - let stored_keys: Keys = db_rpc(ctx, db_ref, ())?.await?; + let stored_keys: Keys = db_rpc(ctx, db_ref, Uuid::empty(), ())?.await?; // Keys missing locally. let mut missing_keys = Vec::new(); @@ -414,9 +416,12 @@ where .await .map_err(|err| err.describe("writing magic bytes"))?; + // FIXME: actually use the passport. + let mut passport = Passport::empty(); + passport.set_id(Uuid::new()); + let mut buf = Buffer::new(); // FIXME: this doesn't return. // Change this to `while let Some(msg) = ctx.receive_next()`. - let mut buf = Buffer::new(); loop { match ctx.receive_next().await { Message::GetKnownKeys(RpcMessage { response, .. }) => { @@ -427,7 +432,7 @@ where } } Message::ShareBlobs(RpcMessage { request, response }) => { - share_blobs(&mut ctx, &mut db_ref, &mut stream, request).await?; + share_blobs(&mut ctx, &mut db_ref, &mut passport, &mut stream, request).await?; if let Result::Err(err) = response.respond(()) { // TODO: better name for the actor? warn!("peer sync actor failed to send response to actor: {}", err); @@ -438,6 +443,7 @@ where let res = retrieve_blobs( &mut ctx, &mut db_ref, + &mut passport, &mut stream, &mut buf, &mut stored_keys, @@ -544,6 +550,7 @@ where async fn share_blobs( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, stream: &mut TcpStream, keys: Vec, ) -> crate::Result<()> @@ -551,7 +558,7 @@ where K: RuntimeAccess, { for key in keys { - match retrieve_blob(ctx, db_ref, key.clone()).await { + match retrieve_blob(ctx, db_ref, passport, key.clone()).await { Ok(Some(BlobEntry::Stored(blob))) => { write_blob( ctx, @@ -613,6 +620,7 @@ const RETRIEVE_MAX_KEYS: usize = 20; async fn retrieve_blobs( ctx: &mut actor::Context, db_ref: &mut ActorRef, + passport: &mut Passport, stream: &mut TcpStream, buf: &mut Buffer, stored_keys: &mut Vec, @@ -674,13 +682,13 @@ where match timestamp.into() { ModifiedTime::Created(timestamp) => { let view = replace(buf, Buffer::empty()).view(blob_length as usize); - match sync_stored_blob(ctx, db_ref, view, timestamp).await { + match sync_stored_blob(ctx, db_ref, passport, view, timestamp).await { Ok(view) => *buf = view.processed(), Err(()) => return Err(db_error()), } } ModifiedTime::Removed(timestamp) => { - match sync_removed_blob(ctx, db_ref, key.clone(), timestamp).await { + match sync_removed_blob(ctx, db_ref, passport, key.clone(), timestamp).await { Ok(()) => {} Err(()) => return Err(db_error()), } diff --git a/src/passport.rs b/src/passport.rs index b1aa20c..8473b13 100644 --- a/src/passport.rs +++ b/src/passport.rs @@ -7,24 +7,34 @@ use std::iter::FusedIterator; use std::ops::Range; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Instant; use std::{array, fmt}; -use chrono::{DateTime, Utc}; - /// Request passport. /// /// Collection of events and the time at which they occurred. #[derive(Debug)] pub struct Passport { id: Uuid, + start: Instant, marks: Vec, } impl Passport { - /// Create an empty `Passport` with an initial `capacity`. - pub const fn empty() -> Passport { + /// Create an empty `Passport` with an zero id. + pub fn empty() -> Passport { Passport { id: Uuid::empty(), + start: Instant::now(), + marks: Vec::new(), + } + } + + /// Create a new `Passport` with unique id. + pub fn new() -> Passport { + Passport { + id: Uuid::new(), + start: Instant::now(), marks: Vec::new(), } } @@ -42,7 +52,7 @@ impl Passport { /// Mark the passport with a new `event`. pub fn mark(&mut self, event: Event) { let mark = Mark { - timestamp: Utc::now(), + timestamp: Instant::now(), event, }; self.marks.push(mark); @@ -59,10 +69,35 @@ impl Passport { /// Reuse the passport for another request. pub fn reset(&mut self) { self.id = Uuid::empty(); + self.start = Instant::now(); self.marks.clear(); } } +impl fmt::Display for Passport { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{{request_id=\"{}\", events=[", self.id)?; + let mut last_time = self.start; + let mut first = true; + for mark in self.marks.iter() { + if first { + first = false; + } else { + write!(f, ", ")?; + } + let timestamp = mark.timestamp(); + write!( + f, + "{{event=\"{}\", duration=\"{:?}\"}}", + mark.event(), + timestamp - last_time, + )?; + last_time = timestamp; + } + write!(f, "]}}") + } +} + /// Universally Unique Identifier. /// /// # Notes @@ -70,7 +105,7 @@ impl Passport { /// Loosely follows [RFC4122]. /// /// [RFC4122]: http://tools.ietf.org/html/rfc4122 -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone)] pub struct Uuid { bytes: [u8; 16], // 128 bits. } @@ -119,6 +154,12 @@ impl Uuid { } } +impl fmt::Debug for Uuid { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(self, f) + } +} + impl fmt::Display for Uuid { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // Always force a length of 32. @@ -205,13 +246,13 @@ fn from_hex_byte(b: u8) -> Result { /// [time]: Mark::timestamp #[derive(Copy, Clone, Debug)] pub struct Mark { - timestamp: DateTime, + timestamp: Instant, event: Event, } impl Mark { /// Returns the time at which the event took place. - pub fn timestamp(self) -> DateTime { + pub fn timestamp(self) -> Instant { self.timestamp } @@ -247,36 +288,63 @@ macro_rules! events { } events!( - // HTTP. - ReadFirstByte => "read first byte", - ParsedRequest => "parsed request heading", - ReadBody => "read body", - WrittenResponse => "written response", - - // Ops. - RetrievingBlob => "retrieving blob", + // # HTTP. + ParsedHttpRequest => "read and parsed HTTP request heading", + ReadHttpBody => "read HTTP body", + WrittenHttpResponse => "written HTTP response", + + // # Peer interaction. + // Server. + ReadingPeerRequest => "reading peer request header", + ReadPeerRequest => "read peer request header", + ReadingPeerKey => "reading key from peer", + ReadPeerKey => "read key from peer", + ReadingPeerMetadata => "reading blob metadata from peer", + ReadPeerMetadata => "read blob metadata from peer", + ReadingPeerBlob => "reading blob from peer", + ReadPeerBlob => "read blob from peer", + WritingPeerResponse => "writing peer response", + WrittenPeerResponse => "written peer response", + + // # Ops. + // Store blob. + AddedBlob => "added blob to storage", + FailedToAddBlob => "failed to add blob to storage", + CommittedStoringBlob => "committed to storing blob query", + FailedToCommitStoringBlob => "failed to commit storing blob query", + AbortedStoringBlob => "aborted store blob query", + FailedToAbortStoringBlob => "failed to abort store blob query", + // Retrieve blob. RetrievedBlob => "retrieved blob", - RetrievingKeys => "retrieving keys", + FailedToRetrieveBlob => "failed to retrieve blob", + // Remove blob. + PreppedRemoveBlob => "prepared storage for removing blob", + FailedToPrepRemoveBlob => "failed to prepare storage for removing blob", + CommittedRemovingBlob => "committed to removing blob query", + FailedToCommitRemovingBlob => "failed to commit removing blob query", + AbortedRemovingBlob => "aborted remove blob query", + FailedToAbortRemovingBlob => "failed to abort remove blob query", + // Retrieve uncommitted blob. + RetrievedUncommittedBlob => "retrieved uncommitted blob", + FailedToRetrieveUncommittedBlob => "retrieved uncommitted blob", + // Retrieve keys. RetrievedKeys => "retrieved keys", - HealthCheck => "running health check", + FailedToRetrieveKeys => "failed to retrieve keys", + // Health check. HealthCheckComplete => "health check complete", - SyncStoredBlob => "syncing stored blob", + HealthCheckFailed => "health check failed", + // Sync stored blob. SyncedStoredBlob => "synced stored blob", - SyncRemovedBlob => "syncing removed blob", + FailedToSyncStoredBlob => "failed to sync stored blob", + // Sync removed blob. SyncedRemovedBlob => "synced removed blob", - - // TODO: add more events. - // Storing blob, in ops module: - // TODO: log failed attempts. - // * Add blob. - // * Blob added. - // * Peers adding blob. // Only in consensus. - // * Peers added blob. // Only in consensus. - // * Peers storing blob. // Only in consensus. - // * Peers stored blob. // Only in consensus. - // * Store blob. - // * Stored blob. - // * Inform peers blob stored. // Only in consensus. + FailedToSyncRemovedBlob => "failed to sync removed blob", + // Consensus. + StartingConsensus => "starting consensus", + ConsensusPhaseOneResults => "got consensus phase one results", + ConsensusPhaseTwoResults => "got consensus phase two results", + ConsensusCommitted => "coordinator and participants committed", + AbortedConsensusRun => "aborted a consensus run", ); #[cfg(test)] @@ -296,7 +364,6 @@ mod tests { assert_eq!(size_of::(), 1); assert_eq!(size_of::(), 16); - assert_eq!(size_of::(), 16); } #[test] diff --git a/src/peer/participant.rs b/src/peer/participant.rs index c579e3d..b8f4373 100644 --- a/src/peer/participant.rs +++ b/src/peer/participant.rs @@ -611,6 +611,7 @@ pub mod consensus { use std::collections::HashMap; use std::convert::TryInto; + use std::io::IoSlice; use std::net::SocketAddr; use std::time::{Duration, SystemTime}; @@ -626,6 +627,7 @@ pub mod consensus { use crate::error::Describe; use crate::op::{abort_query, add_blob, commit_query, prep_remove_blob, Outcome}; + use crate::passport::{Passport, Uuid}; use crate::peer::participant::RpcResponder; use crate::peer::server::{ BLOB_LENGTH_LEN, DATE_TIME_LEN, METADATA_LEN, NO_BLOB, REQUEST_BLOB, @@ -668,43 +670,46 @@ pub mod consensus { key: Key, mut responder: RpcResponder, ) -> crate::Result<()> { + // TODO: use and log the passport properly. + let mut passport = Passport::new(); debug!( - "store blob consensus actor started: key={}, remote_address={}", - key, remote + "store blob consensus actor started: request_id=\"{}\", key=\"{}\", remote_address=\"{}\"", + passport.id(), key, remote ); // TODO: reuse stream and buffer. // TODO: stream large blob to a file directly? Preallocating disk space in // the data file? debug!( - "connecting to coordinator server: remote_address={}", + "connecting to coordinator server: request_id=\"{}\", remote_address={}", + passport.id(), remote ); let mut stream = TcpStream::connect(&mut ctx, remote) .map_err(|err| err.describe("creating connect to peer server"))?; let mut buf = Buffer::new(); - trace!("writing connection magic"); + trace!("writing connection magic: request_id=\"{}\"", passport.id()); match Deadline::timeout(&mut ctx, IO_TIMEOUT, stream.write_all(COORDINATOR_MAGIC)).await { Ok(()) => {} Err(err) => return Err(err.describe("writing connection magic")), } - let read_blob = read_blob(&mut ctx, &mut stream, &mut buf, &key); + let read_blob = read_blob(&mut ctx, &mut stream, &mut passport, &mut buf, &key); let blob_len = match read_blob.await { Ok(Some((blob_key, blob))) if key == blob_key => blob.len(), Ok(Some((blob_key, ..))) => { error!( - "coordinator server responded with incorrect blob, voting to abort consensus: want_key={}, got_blob_key={}", - key, blob_key + "coordinator server responded with incorrect blob, voting to abort consensus: request_id=\"{}\", want_key=\"{}\", got_blob_key=\"{}\"", + passport.id(), key, blob_key ); responder.respond(ConsensusVote::Abort); return Ok(()); } Ok(None) => { error!( - "couldn't get blob from coordinator server, voting to abort consensus: key={}", - key + "couldn't get blob from coordinator server, voting to abort consensus: request_id=\"{}\", key=\"{}\"", + passport.id(), key ); responder.respond(ConsensusVote::Abort); return Ok(()); @@ -713,14 +718,15 @@ pub mod consensus { }; // Phase one: storing the blob, readying it to be added to the database. - let query = match add_blob(&mut ctx, &mut db_ref, &mut buf, blob_len).await { + let query = match add_blob(&mut ctx, &mut db_ref, &mut passport, &mut buf, blob_len).await { Ok(Outcome::Continue(query)) => { responder.respond(ConsensusVote::Commit(SystemTime::now())); query } Ok(Outcome::Done(..)) => { info!( - "blob already stored, voting to abort consensus: key={}", + "blob already stored, voting to abort consensus: request_id=\"{}\", key=\"{}\"", + passport.id(), key ); responder.respond(ConsensusVote::Abort); @@ -728,7 +734,8 @@ pub mod consensus { } Err(()) => { warn!( - "failed to add blob to storage, voting to abort consensus: key={}", + "failed to add blob to storage, voting to abort consensus: request_id=\"{}\", key=\"{}\"", + passport.id(), key ); responder.respond(ConsensusVote::Abort); @@ -750,8 +757,8 @@ pub mod consensus { } Either::Right(..) => { warn!( - "failed to get consensus result in time, running peer conensus: key={}", - query.key() + "failed to get consensus result in time, running peer conensus: request_id=\"{}\", key=\"{}\"", + passport.id(), query.key() ); peers.uncommitted_stored(query); return Ok(()); @@ -760,13 +767,15 @@ pub mod consensus { let response = match vote_result.result { ConsensusVote::Commit(timestamp) => { - commit_query(&mut ctx, &mut db_ref, query, timestamp) + commit_query(&mut ctx, &mut db_ref, &mut passport, query, timestamp) .await .map(Some) } - ConsensusVote::Abort | ConsensusVote::Fail => abort_query(&mut ctx, &mut db_ref, query) - .await - .map(|_| None), + ConsensusVote::Abort | ConsensusVote::Fail => { + abort_query(&mut ctx, &mut db_ref, &mut passport, query) + .await + .map(|_| None) + } }; let timestamp = match response { @@ -797,7 +806,8 @@ pub mod consensus { // use peer consensus to get us back into an ok state. warn!( "failed to get committed message from coordinator, \ - running peer conensus: key={}", + running peer conensus: request_id=\"{}\", key=\"{}\"", + passport.id(), key ); // We're committed, let all other participants know. @@ -812,6 +822,7 @@ pub mod consensus { async fn read_blob<'b, M, K>( ctx: &mut actor::Context, stream: &mut TcpStream, + passport: &mut Passport, buf: &'b mut Buffer, request_key: &Key, ) -> crate::Result> @@ -819,21 +830,20 @@ pub mod consensus { K: RuntimeAccess, { trace!( - "requesting key from coordinator server: key={}", + "requesting key from coordinator server: request_id=\"{}\", key=\"{}\"", + passport.id(), request_key ); debug_assert!(buf.is_empty()); - // TODO: use vectored I/O here. See - // https://github.com/rust-lang/futures-rs/pull/2181. // Write the request for the key. - match Deadline::timeout(ctx, IO_TIMEOUT, stream.write_all(&[REQUEST_BLOB])).await { + let bufs = &mut [ + IoSlice::new(&[REQUEST_BLOB]), + IoSlice::new(request_key.as_bytes()), + ]; + match Deadline::timeout(ctx, IO_TIMEOUT, stream.write_all_vectored(bufs)).await { Ok(()) => {} - Err(err) => return Err(err.describe("writing request key")), - } - match Deadline::timeout(ctx, IO_TIMEOUT, stream.write_all(request_key.as_bytes())).await { - Ok(()) => {} - Err(err) => return Err(err.describe("writing request key")), + Err(err) => return Err(err.describe("writing request")), } // Don't want to include the length of the blob in the key calculation. @@ -853,7 +863,8 @@ pub mod consensus { let blob_length = u64::from_be_bytes(blob_length_bytes); buf.processed(BLOB_LENGTH_LEN); trace!( - "read blob length from coordinator server: length={}", + "read blob length from coordinator server: request_id=\"{}\", blob_length={}", + passport.id(), blob_length ); @@ -873,7 +884,8 @@ pub mod consensus { let key = calc.finish(); trace!( - "read blob from coordinator server: key={}, length={}", + "read blob from coordinator server: request_id=\"{}\", key=\"{}\", blob_length={}", + passport.id(), key, blob_length ); @@ -892,13 +904,16 @@ pub mod consensus { key: Key, mut responder: RpcResponder, ) -> crate::Result<()> { + // TODO: use and log the passport properly. + let mut passport = Passport::new(); debug!( - "remove blob consensus actor started: key={}, remote_address={}", - key, remote + "remove blob consensus actor started: request_id=\"{}\", key=\"{}\", remote_address=\"{}\"", + passport.id(), key, remote ); // Phase one: removing the blob, readying it to be removed from the database. - let query = match prep_remove_blob(&mut ctx, &mut db_ref, key.clone()).await { + let query = match prep_remove_blob(&mut ctx, &mut db_ref, &mut passport, key.clone()).await + { Ok(Outcome::Continue(query)) => { responder.respond(ConsensusVote::Commit(SystemTime::now())); query @@ -907,7 +922,8 @@ pub mod consensus { // FIXME: if we're not synced this can happen, but we should // continue. info!( - "blob already removed/not stored, voting to abort consensus: key={}", + "blob already removed/not stored, voting to abort consensus: request_id=\"{}\", key=\"{}\"", + passport.id(), key ); responder.respond(ConsensusVote::Abort); @@ -915,7 +931,8 @@ pub mod consensus { } Err(()) => { warn!( - "failed to prepare storage to remove blob, voting to abort consensus: key={}", + "failed to prepare storage to remove blob, voting to abort consensus: request_id=\"{}\", key=\"{}\"", + passport.id(), key ); responder.respond(ConsensusVote::Abort); @@ -936,7 +953,8 @@ pub mod consensus { } Either::Right(..) => { warn!( - "failed to get consensus result in time, running peer conensus: key={}", + "failed to get consensus result in time, running peer conensus: request_id=\"{}\", key=\"{}\"", + passport.id(), query.key() ); peers.uncommitted_removed(query); @@ -946,13 +964,15 @@ pub mod consensus { let response = match vote_result.result { ConsensusVote::Commit(timestamp) => { - commit_query(&mut ctx, &mut db_ref, query, timestamp) + commit_query(&mut ctx, &mut db_ref, &mut passport, query, timestamp) .await .map(Some) } - ConsensusVote::Abort | ConsensusVote::Fail => abort_query(&mut ctx, &mut db_ref, query) - .await - .map(|()| None), + ConsensusVote::Abort | ConsensusVote::Fail => { + abort_query(&mut ctx, &mut db_ref, &mut passport, query) + .await + .map(|()| None) + } }; let timestamp = match response { @@ -983,7 +1003,8 @@ pub mod consensus { // use peer consensus to get us back into an ok state. warn!( "failed to get committed message from coordinator, \ - running peer conensus: key={}", + running peer conensus: request_id=\"{}\", key=\"{}\"", + passport.id(), key ); // We're committed, let all other participants know. @@ -1032,8 +1053,12 @@ pub mod consensus { let mut peer_results: HashMap<_, PeerResult, _> = HashMap::with_hasher(FxBuildHasher::default()); + // TODO: use and log the passport properly. + let mut passport = Passport::empty(); loop { + passport.reset(); let msg = ctx.receive_next().await; + passport.set_id(Uuid::new()); debug!("participant consensus received a message: {:?}", msg); match msg { @@ -1042,7 +1067,14 @@ pub mod consensus { if let Some(timestamp) = result.committed { // Don't care about the result, can't handle it // here. - let _ = commit_query(&mut ctx, &mut db_ref, query, timestamp).await; + let _ = commit_query( + &mut ctx, + &mut db_ref, + &mut passport, + query, + timestamp, + ) + .await; continue; } } @@ -1055,7 +1087,14 @@ pub mod consensus { if let Some(timestamp) = result.committed { // Don't care about the result, can't handle it // here. - let _ = commit_query(&mut ctx, &mut db_ref, query, timestamp).await; + let _ = commit_query( + &mut ctx, + &mut db_ref, + &mut passport, + query, + timestamp, + ) + .await; continue; } } @@ -1097,14 +1136,26 @@ pub mod consensus { StorageQuery::Store(query) => { // Don't care about the result, can't handle // it here. - let _ = - commit_query(&mut ctx, &mut db_ref, query, timestamp).await; + let _ = commit_query( + &mut ctx, + &mut db_ref, + &mut passport, + query, + timestamp, + ) + .await; } StorageQuery::Remove(query) => { // Don't care about the result, can't handle // it here. - let _ = - commit_query(&mut ctx, &mut db_ref, query, timestamp).await; + let _ = commit_query( + &mut ctx, + &mut db_ref, + &mut passport, + query, + timestamp, + ) + .await; } } } diff --git a/src/peer/server.rs b/src/peer/server.rs index c6c7482..971e879 100644 --- a/src/peer/server.rs +++ b/src/peer/server.rs @@ -21,6 +21,7 @@ use crate::buffer::Buffer; use crate::db::{self, db_error}; use crate::error::Describe; use crate::op::{self, sync_removed_blob, sync_stored_blob}; +use crate::passport::{Event, Passport, Uuid}; use crate::storage::{self, BlobEntry, DateTime, ModifiedTime}; use crate::Key; @@ -111,23 +112,29 @@ pub async fn actor( warn!("failed to set no delay, continuing: {}", err); } + // TODO: read request-id from peer? + let mut passport = Passport::new(); + passport.mark(Event::ReadingPeerRequest); loop { // NOTE: we don't create `buf` ourselves so it could be that it already // contains a request, so check it first and only after read some more // bytes. while let Some(request_byte) = buf.next_byte() { + passport.mark(Event::ReadPeerRequest); match request_byte { REQUEST_BLOB => { buf.processed(1); - retrieve_blob(&mut ctx, &mut stream, &mut buf, &mut db_ref).await?; + retrieve_blob(&mut ctx, &mut stream, &mut passport, &mut buf, &mut db_ref) + .await?; } REQUEST_KEYS => { buf.processed(1); - retrieve_keys(&mut ctx, &mut stream, &mut buf, &mut db_ref).await?; + retrieve_keys(&mut ctx, &mut stream, &mut passport, &mut buf, &mut db_ref) + .await?; } STORE_BLOB => { buf.processed(1); - store_blob(&mut ctx, &mut stream, &mut buf, &mut db_ref).await?; + store_blob(&mut ctx, &mut stream, &mut passport, &mut buf, &mut db_ref).await?; } byte => { // Invalid byte. Forcefully close the connection, letting @@ -139,6 +146,9 @@ pub async fn actor( return Ok(()); } } + passport.reset(); + passport.set_id(Uuid::new()); + passport.mark(Event::ReadingPeerRequest); } // Read some more bytes. @@ -182,13 +192,15 @@ impl Blob { async fn retrieve_blob( ctx: &mut actor::Context, stream: &mut TcpStream, + passport: &mut Passport, buf: &mut Buffer, db_ref: &mut ActorRef, ) -> crate::Result<()> { + passport.mark(Event::ReadingPeerKey); if buf.len() < Key::LENGTH { let n = Key::LENGTH - buf.len(); match Deadline::timeout(ctx, IO_TIMEOUT, buf.read_n_from(&mut *stream, n)).await { - Ok(..) => {} + Ok(..) => passport.mark(Event::ReadPeerKey), Err(err) => return Err(err.describe("reading key of blob to retrieve")), } } @@ -196,18 +208,30 @@ async fn retrieve_blob( // SAFETY: checked length above, so indexing is safe. let key = Key::from_bytes(&buf.as_bytes()[..Key::LENGTH]).to_owned(); buf.processed(Key::LENGTH); - debug!("got peer request for blob: key={}", key); + debug!( + "retrieving blob for peer: request_id=\"{}\", key=\"{}\"", + passport.id(), + key + ); - let (blob, timestamp) = match op::retrieve_uncommitted_blob(ctx, db_ref, key).await { + let (blob, timestamp) = match op::retrieve_uncommitted_blob(ctx, db_ref, passport, key).await { Ok(Ok(blob)) => { if let Err(err) = blob.prefetch() { - warn!("error prefetching uncommitted blob, continuing: {}", err); + warn!( + "error prefetching uncommitted blob, continuing: {}: request_id=\"{}\"", + err, + passport.id() + ); } (Blob::Uncommitted(blob), DateTime::INVALID) } Ok(Err(Some(BlobEntry::Stored(blob)))) => { if let Err(err) = blob.prefetch() { - warn!("error prefetching committed blob, continuing: {}", err); + warn!( + "error prefetching committed blob, continuing: {}: request_id=\"{}\"", + err, + passport.id() + ); } let created_at = blob.created_at().into(); (Blob::Committed(blob), created_at) @@ -217,6 +241,7 @@ async fn retrieve_blob( Err(()) => return Err(db_error()), }; + passport.mark(Event::WritingPeerResponse); let length: [u8; BLOB_LENGTH_LEN] = (blob.len() as u64).to_be_bytes(); let bufs = &mut [ IoSlice::new(timestamp.as_bytes()), @@ -225,6 +250,7 @@ async fn retrieve_blob( ]; Deadline::timeout(ctx, IO_TIMEOUT, stream.write_all_vectored(bufs)) .await + .map(|()| passport.mark(Event::WrittenPeerResponse)) .map_err(|err| err.describe("writing blob")) } @@ -233,12 +259,13 @@ async fn retrieve_blob( async fn retrieve_keys( ctx: &mut actor::Context, stream: &mut TcpStream, + passport: &mut Passport, buf: &mut Buffer, db_ref: &mut ActorRef, ) -> crate::Result<()> { - debug!("retrieving keys"); + debug!("retrieving keys for peer: request_id=\"{}\"", passport.id()); - let keys = crate::op::retrieve_keys(ctx, db_ref) + let keys = crate::op::retrieve_keys(ctx, db_ref, passport) .await .map_err(|()| db_error())?; let mut iter = keys.into_iter(); @@ -248,6 +275,7 @@ async fn retrieve_keys( // TODO: benchmark with larger sizes. const N_KEYS: usize = 100; + passport.mark(Event::WritingPeerResponse); let mut first = true; loop { let mut wbuf = buf.split_write(BLOB_LENGTH_LEN + (N_KEYS * Key::LENGTH)).1; @@ -267,6 +295,7 @@ async fn retrieve_keys( // Wrote all keys. if wbuf.is_empty() { + passport.mark(Event::WrittenPeerResponse); return Ok(()); } @@ -283,14 +312,16 @@ async fn retrieve_keys( async fn store_blob( ctx: &mut actor::Context, stream: &mut TcpStream, + passport: &mut Passport, buf: &mut Buffer, db_ref: &mut ActorRef, ) -> crate::Result<()> { + passport.mark(Event::ReadingPeerMetadata); // Read at least the metadata of the blob to store. if buf.len() < Key::LENGTH + METADATA_LEN { let n = (Key::LENGTH + METADATA_LEN) - buf.len(); match Deadline::timeout(ctx, IO_TIMEOUT, buf.read_n_from(&mut *stream, n)).await { - Ok(..) => {} + Ok(..) => passport.mark(Event::ReadPeerMetadata), Err(err) => return Err(err.describe("reading metadata from socket")), } } @@ -311,15 +342,18 @@ async fn store_blob( buf.processed(Key::LENGTH + METADATA_LEN); debug!( - "storing blob: key={}, length={}, timestamp={:?}", - key, blob_length, timestamp + "storing blob for peer: request_id=\"{}\", key=\"{}\", blob_length={}", + passport.id(), + key, + blob_length, ); // Read the entire blob. + passport.mark(Event::ReadingPeerBlob); if buf.len() < (blob_length as usize) { let n = (blob_length as usize) - buf.len(); match Deadline::timeout(ctx, IO_TIMEOUT, buf.read_n_from(&mut *stream, n)).await { - Ok(..) => {} + Ok(..) => passport.mark(Event::ReadPeerBlob), Err(err) => return Err(err.describe("reading blob from socket")), } } @@ -327,7 +361,7 @@ async fn store_blob( match timestamp.into() { ModifiedTime::Created(timestamp) => { let view = replace(buf, Buffer::empty()).view(blob_length as usize); - match sync_stored_blob(ctx, db_ref, view, timestamp).await { + match sync_stored_blob(ctx, db_ref, passport, view, timestamp).await { Ok(view) => { *buf = view.processed(); Ok(()) @@ -336,7 +370,7 @@ async fn store_blob( } } ModifiedTime::Removed(timestamp) => { - match sync_removed_blob(ctx, db_ref, key, timestamp).await { + match sync_removed_blob(ctx, db_ref, passport, key, timestamp).await { Ok(()) => Ok(()), Err(()) => Err(db_error()), } @@ -347,4 +381,6 @@ async fn store_blob( Ok(()) } } + + // TODO: add a response to the peer. } diff --git a/src/storage/validate.rs b/src/storage/validate.rs index 0ec3993..446cc82 100644 --- a/src/storage/validate.rs +++ b/src/storage/validate.rs @@ -6,6 +6,7 @@ use std::path::Path; use std::{io, thread}; use crossbeam_channel::{Receiver, Sender}; +use log::error; use crate::storage::{Blob, BlobEntry, EntryIndex, Storage}; use crate::Key; @@ -86,7 +87,7 @@ fn validate_blobs(entries: Receiver<(Key, EntryIndex, Blob)>, corruptions: Sende let got = Key::for_blob(blob.bytes()); if got != key { if let Err(err) = corruptions.send(Corruption::for_key(key)) { - log::error!( + error!( "failed to send corruption for blob: {}", err.into_inner().key() );