From c8b8e3a745159874d6ba0289bc0823612bc235ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Smolarek?= <34063647+Razz4780@users.noreply.github.com> Date: Tue, 8 Aug 2023 16:39:06 +0200 Subject: [PATCH] Bump hyper (#1775) * tokio-compat crate * Fixed http1 upgrade implementation * Changelog entry * Removed tokio-compat, using hyper-util pinned to a commit --- Cargo.lock | 120 +++++++++++------- Cargo.toml | 9 +- changelog.d/1774.internal.md | 1 + mirrord/agent/Cargo.toml | 1 + mirrord/agent/src/runtime/crio.rs | 3 +- mirrord/agent/src/steal/http/hyper_handler.rs | 7 +- mirrord/agent/src/steal/http/v1.rs | 40 +++--- mirrord/agent/src/steal/http/v2.rs | 18 ++- mirrord/layer/Cargo.toml | 1 + mirrord/layer/src/tcp_steal/http/v1.rs | 5 +- mirrord/layer/src/tcp_steal/http/v2.rs | 21 +-- 11 files changed, 138 insertions(+), 88 deletions(-) create mode 100644 changelog.d/1774.internal.md diff --git a/Cargo.lock b/Cargo.lock index cd1cbbbd375..397f1d947ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1073,9 +1073,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.81" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c6b2562119bf28c3439f7f02db99faf0aa1a8cdfe5772a2ee155d32227239f0" +checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01" dependencies = [ "libc", ] @@ -1134,9 +1134,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.19" +version = "4.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd304a20bff958a57f04c4e96a2e7594cc4490a0e809cbd48bb6437edaa452d" +checksum = "c27cdf28c0f604ba3f512b0c9a409f8de8513e4816705deb0498b627e7c3a3fd" dependencies = [ "clap_builder", "clap_derive", @@ -1145,9 +1145,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.3.19" +version = "4.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01c6a3f08f1fe5662a35cfe393aec09c4df95f60ee93b7556505260f75eee9e1" +checksum = "08a9f1ab5e9f01a9b81f202e8562eb9a10de70abf9eaeac1be465c28b75aa4aa" dependencies = [ "anstream", "anstyle", @@ -1510,9 +1510,9 @@ dependencies = [ [[package]] name = "der" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c7ed52955ce76b1554f509074bb357d3fb8ac9b51288a65a3fd480d1dfba946" +checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" dependencies = [ "const-oid", "zeroize", @@ -1942,13 +1942,13 @@ dependencies = [ [[package]] name = "filetime" -version = "0.2.21" +version = "0.2.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cbc844cecaee9d4443931972e1289c8ff485cb4cc2767cb03ca139ed6885153" +checksum = "d4029edd3e734da6fe05b6cd7bd2960760a616bd2ddd0d59a0124746d6272af0" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.2.16", + "redox_syscall 0.3.5", "windows-sys 0.48.0", ] @@ -2352,7 +2352,8 @@ dependencies = [ [[package]] name = "http-body" version = "1.0.0-rc.2" -source = "git+https://github.com/meowjesty/http-body?branch=issue/922/http2#d4809041116a097d6d9bece681d0842b79b26170" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "951dfc2e32ac02d67c90c0d65bd27009a635dc9b381a2cc7d284ab01e3a0150d" dependencies = [ "bytes", "http", @@ -2360,8 +2361,9 @@ dependencies = [ [[package]] name = "http-body-util" -version = "0.1.0-rc.2" -source = "git+https://github.com/meowjesty/http-body?branch=issue/922/http2#d4809041116a097d6d9bece681d0842b79b26170" +version = "0.1.0-rc.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08ef12f041acdd397010e5fb6433270c147d3b8b2d0a840cd7fff8e531dca5c8" dependencies = [ "bytes", "futures-util", @@ -2430,12 +2432,12 @@ dependencies = [ [[package]] name = "hyper" -version = "1.0.0-rc.3" -source = "git+https://github.com/meowjesty/hyper?branch=issue/922/http2#1b7c2bf95f70425954e34df2b2f757daa9b0e020" +version = "1.0.0-rc.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d280a71f348bcc670fc55b02b63c53a04ac0bf2daff2980795aeaf53edae10e6" dependencies = [ "bytes", "futures-channel", - "futures-core", "futures-util", "h2", "http", @@ -2493,6 +2495,24 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-util" +version = "0.0.0" +source = "git+https://github.com/hyperium/hyper-util#f898015fc9eca9f459ddac521db278d904099e89" +dependencies = [ + "futures-channel", + "futures-util", + "http", + "hyper 1.0.0-rc.4", + "once_cell", + "pin-project-lite", + "socket2 0.4.9", + "tokio", + "tower", + "tower-service", + "tracing", +] + [[package]] name = "hyperlocal" version = "0.8.0" @@ -3202,7 +3222,8 @@ dependencies = [ "http", "http-body-util", "httparse", - "hyper 1.0.0-rc.3", + "hyper 1.0.0-rc.4", + "hyper-util", "iptables", "k8s-cri", "libc", @@ -3330,7 +3351,7 @@ dependencies = [ "mirrord-progress", "mirrord-protocol", "rand", - "rstest 0.17.0", + "rstest 0.18.1", "serde", "serde_json", "shellexpand", @@ -3360,7 +3381,8 @@ dependencies = [ "futures", "http-body 1.0.0-rc.2", "http-body-util", - "hyper 1.0.0-rc.3", + "hyper 1.0.0-rc.4", + "hyper-util", "itertools", "k8s-openapi", "libc", @@ -3375,7 +3397,7 @@ dependencies = [ "os_info", "rand", "regex", - "rstest 0.17.0", + "rstest 0.18.1", "serde", "serde_json", "socket2 0.5.3", @@ -3465,7 +3487,7 @@ dependencies = [ "fancy-regex", "http-body-util", "http-serde", - "hyper 1.0.0-rc.3", + "hyper 1.0.0-rc.4", "libc", "mirrord-macros", "nix 0.24.3", @@ -3928,18 +3950,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "030ad2bc4db10a8944cb0d837f158bdfec4d4a4873ab701a95046770d11f8842" +checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" +checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", @@ -3948,9 +3970,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "2c516611246607d0c04186886dbb3a754368ef82c79e9827a802c6d836dd111c" [[package]] name = "pin-utils" @@ -4454,13 +4476,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.1" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.4", + "regex-automata 0.3.6", "regex-syntax 0.7.4", ] @@ -4475,9 +4497,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294" +checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" dependencies = [ "aho-corasick", "memchr", @@ -4695,9 +4717,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.6" +version = "0.38.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ee020b1716f0a80e2ace9b03441a749e402e86712f15f16fe8a8f75afac732f" +checksum = "172891ebdceb05aa0005f533a6cbfca599ddd7d966f6f5d4d9b2e70478e70399" dependencies = [ "bitflags 2.3.3", "errno 0.3.2", @@ -4940,9 +4962,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.181" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d3e73c93c3240c0bda063c239298e633114c69a888c3e37ca8bb33f343e9890" +checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" dependencies = [ "serde_derive", ] @@ -4971,9 +4993,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.181" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be02f6cb0cd3a5ec20bbcfbcbd749f57daddb1a0882dc2e46a6c236c90b977ed" +checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" dependencies = [ "proc-macro2", "quote", @@ -5275,7 +5297,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d1e996ef02c474957d681f1b05213dfb0abab947b446a62d37770b23500184a" dependencies = [ "base64ct", - "der 0.7.7", + "der 0.7.8", ] [[package]] @@ -5381,9 +5403,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tar" -version = "0.4.39" +version = "0.4.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec96d2ffad078296368d46ff1cb309be1c23c513b4ab0e22a45de0185275ac96" +checksum = "b16afcea1f22891c49a00c751c7b63b2233284064f11a200fc624137c51e2ddb" dependencies = [ "filetime", "libc", @@ -5392,9 +5414,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.7.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998" +checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651" dependencies = [ "cfg-if", "fastrand 2.0.0", @@ -6512,9 +6534,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "winnow" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f46aab759304e4d7b2075a9aecba26228bb073ee8c50db796b2c72c676b5d807" +checksum = "acaaa1190073b2b101e15083c38ee8ec891b5e05cbee516521e94ec008f61e64" dependencies = [ "memchr", ] @@ -6590,7 +6612,7 @@ dependencies = [ "bcder", "bytes", "chrono", - "der 0.7.7", + "der 0.7.8", "hex", "pem 1.1.1", "ring", @@ -6620,9 +6642,9 @@ dependencies = [ [[package]] name = "xattr" -version = "0.2.3" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d1526bbe5aaeb5eb06885f4d987bcdfa5e23187055de9b83fe00156a821fabc" +checksum = "f4686009f71ff3e5c4dbcf1a282d0a44db3f021ba69350cd42086b3e5f1c6985" dependencies = [ "libc", ] diff --git a/Cargo.toml b/Cargo.toml index 1d169d98746..0045d76acc2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,10 +57,11 @@ regex = { version = "1", features = ["unicode-case"] } miette = "5" fancy-regex = { version = "0.10" } -# TODO(alex): Change when https://github.com/hyperium/http-body/issues/88 is solved. -hyper = { git = "https://github.com/meowjesty/hyper", branch = "issue/922/http2", features = ["full"] } -http-body = { git = "https://github.com/meowjesty/http-body", branch = "issue/922/http2" } -http-body-util = { git = "https://github.com/meowjesty/http-body", branch = "issue/922/http2" } +hyper = { version = "1.0.0-rc.4", features = ["full"] } +# TODO switch to release when available +hyper-util = { git = "https://github.com/hyperium/hyper-util", commit = "f898015fc9eca9f459ddac521db278d904099e89" } +http-body = "1.0.0-rc.2" +http-body-util = "0.1.0-rc.3" libc = "0.2" bimap = "0.6.2" socket2 = { version = "0.5.3", features = ["all"]} diff --git a/changelog.d/1774.internal.md b/changelog.d/1774.internal.md new file mode 100644 index 00000000000..898e09b53e0 --- /dev/null +++ b/changelog.d/1774.internal.md @@ -0,0 +1 @@ +Updated `hyper` version. \ No newline at end of file diff --git a/mirrord/agent/Cargo.toml b/mirrord/agent/Cargo.toml index 88d819c3f0b..74cadb7759d 100644 --- a/mirrord/agent/Cargo.toml +++ b/mirrord/agent/Cargo.toml @@ -44,6 +44,7 @@ wildmatch = "2" enum_dispatch = "0.3" http-body-util = { workspace = true } hyper = { workspace = true, features = ["full"] } +hyper-util.workspace = true httparse = "1" fancy-regex = { workspace = true } dashmap = { version = "5" } diff --git a/mirrord/agent/src/runtime/crio.rs b/mirrord/agent/src/runtime/crio.rs index cd9e7696b7e..4ca356dc1b8 100644 --- a/mirrord/agent/src/runtime/crio.rs +++ b/mirrord/agent/src/runtime/crio.rs @@ -3,6 +3,7 @@ use futures::TryFutureExt; use http::{Request, Response}; use http_body_util::{BodyExt, Empty}; use hyper::{body::Incoming, client::conn}; +use hyper_util::rt::TokioIo; use k8s_cri::v1alpha2::{runtime_service_client::RuntimeServiceClient, ContainerStatusRequest}; use serde::Deserialize; use tokio::net::UnixStream; @@ -34,7 +35,7 @@ impl CriOContainer { async fn api_get(path: &str) -> Result> { let stream = UnixStream::connect(CRIO_DEFAULT_SOCK_PATH).await?; - let (mut request_sender, connection) = conn::http1::handshake(stream).await?; + let (mut request_sender, connection) = conn::http1::handshake(TokioIo::new(stream)).await?; tokio::spawn(async move { if let Err(e) = connection.await { diff --git a/mirrord/agent/src/steal/http/hyper_handler.rs b/mirrord/agent/src/steal/http/hyper_handler.rs index 3ed3eb68426..1fd0af0aa46 100644 --- a/mirrord/agent/src/steal/http/hyper_handler.rs +++ b/mirrord/agent/src/steal/http/hyper_handler.rs @@ -4,7 +4,10 @@ //! //! # [`RawHyperConnection`] use core::fmt::Debug; -use std::{net::SocketAddr, sync::Arc}; +use std::{ + net::SocketAddr, + sync::{atomic::AtomicU16, Arc}, +}; use bytes::Bytes; use dashmap::DashMap; @@ -59,7 +62,7 @@ where pub(crate) original_destination: SocketAddr, /// Keeps track of which HTTP request we're dealing with, so we don't mix up [`Request`]s. - pub(crate) request_id: RequestId, + pub(crate) next_request_id: AtomicU16, pub(super) handle_version: V, } diff --git a/mirrord/agent/src/steal/http/v1.rs b/mirrord/agent/src/steal/http/v1.rs index 94461dfd5a1..11297942a5f 100644 --- a/mirrord/agent/src/steal/http/v1.rs +++ b/mirrord/agent/src/steal/http/v1.rs @@ -4,7 +4,10 @@ //! //! Handles HTTP/1 requests (with support for upgrades). use core::{fmt::Debug, future::Future, pin::Pin}; -use std::{net::SocketAddr, sync::Arc}; +use std::{ + net::SocketAddr, + sync::{atomic::Ordering, Arc, Mutex}, +}; use dashmap::DashMap; use futures::TryFutureExt; @@ -17,6 +20,7 @@ use hyper::{ service::Service, Request, }; +use hyper_util::rt::TokioIo; use mirrord_protocol::{ConnectionId, Port}; use tokio::{ io::{copy_bidirectional, AsyncWriteExt}, @@ -43,11 +47,15 @@ use crate::{ /// /// We use this channel to take control of the upgraded connection back from hyper. #[derive(Debug)] -pub(crate) struct HttpV1(Option>); +pub(crate) struct HttpV1(Mutex>>); impl HttpV1 { - fn take_upgrade_tx(&mut self) -> Option> { - self.0.take() + fn new(upgrate_tx: Option>) -> Self { + Self(Mutex::new(upgrate_tx)) + } + + fn take_upgrade_tx(&self) -> Option> { + self.0.lock().expect("poisoned lock").take() } } @@ -68,13 +76,13 @@ impl HttpV for HttpV1 { // We have to keep the connection alive to handle a possible upgrade request // manually. let server::conn::http1::Parts { - io: mut client_agent, // i.e. browser-agent connection + io: client_agent, // i.e. browser-agent connection read_buf: agent_unprocessed, .. } = http1::Builder::new() .preserve_header_case(true) .serve_connection( - stream, + TokioIo::new(stream), HyperHandler::::new( filters, matched_tx, @@ -96,6 +104,7 @@ impl HttpV for HttpV1 { // HTTP, to the original destination. agent_remote.write_all(&agent_unprocessed).await?; + let mut client_agent = client_agent.into_inner(); // Send the data we received from the original destination, and have not // processed as HTTP, to the client. client_agent.write_all(&client_unprocessed).await?; @@ -113,9 +122,10 @@ impl HttpV for HttpV1 { target_stream: TcpStream, upgrade_tx: Option>, ) -> Result { - let (request_sender, mut connection) = client::conn::http1::handshake(target_stream) - .await - .inspect_err(|fail| error!("Handshake failed with {fail:#?}"))?; + let (request_sender, mut connection) = + client::conn::http1::handshake(TokioIo::new(target_stream)) + .await + .inspect_err(|fail| error!("Handshake failed with {fail:#?}"))?; // We need this to progress the connection forward (hyper thing). tokio::spawn(async move { @@ -132,7 +142,7 @@ impl HttpV for HttpV1 { let _ = sender .send(RawHyperConnection { - stream: io, + stream: io.into_inner(), unprocessed_bytes: read_buf, }) .inspect_err(|_| error!("Failed sending interceptor connection!")); @@ -178,8 +188,8 @@ impl HyperHandler { connection_id, port, original_destination, - request_id: 0, - handle_version: HttpV1(upgrade_tx), + next_request_id: Default::default(), + handle_version: HttpV1::new(upgrade_tx), } } } @@ -192,8 +202,8 @@ impl Service> for HyperHandler { type Future = Pin> + Send>>; #[tracing::instrument(level = "trace", skip(self))] - fn call(&mut self, request: Request) -> Self::Future { - self.request_id += 1; + fn call(&self, request: Request) -> Self::Future { + let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed); Box::pin(HyperHandler::::handle_request( request, @@ -202,7 +212,7 @@ impl Service> for HyperHandler { self.filters.clone(), self.port, self.connection_id, - self.request_id, + request_id, self.matched_tx.clone(), )) } diff --git a/mirrord/agent/src/steal/http/v2.rs b/mirrord/agent/src/steal/http/v2.rs index 953d833859c..8783831e208 100644 --- a/mirrord/agent/src/steal/http/v2.rs +++ b/mirrord/agent/src/steal/http/v2.rs @@ -4,7 +4,10 @@ //! //! Handles HTTP/2 requests. use core::{fmt::Debug, future::Future, pin::Pin}; -use std::{net::SocketAddr, sync::Arc}; +use std::{ + net::SocketAddr, + sync::{atomic::Ordering, Arc}, +}; use dashmap::DashMap; use futures::TryFutureExt; @@ -16,6 +19,7 @@ use hyper::{ service::Service, Request, }; +use hyper_util::rt::TokioIo; use mirrord_protocol::{ConnectionId, Port}; use tokio::{ net::TcpStream, @@ -52,7 +56,7 @@ impl HttpV for HttpV2 { ) -> Result<(), HttpTrafficError> { http2::Builder::new(TokioExecutor::default()) .serve_connection( - stream, + TokioIo::new(stream), HyperHandler::::new( filters, matched_tx, @@ -72,7 +76,7 @@ impl HttpV for HttpV2 { _: Option>, ) -> Result { let (request_sender, connection) = - client::conn::http2::handshake(TokioExecutor::default(), target_stream) + client::conn::http2::handshake(TokioExecutor::default(), TokioIo::new(target_stream)) .await .inspect_err(|fail| error!("Handshake failed with {fail:#?}"))?; @@ -121,7 +125,7 @@ impl HyperHandler { connection_id, port, original_destination, - request_id: 0, + next_request_id: Default::default(), handle_version: HttpV2, } } @@ -135,8 +139,8 @@ impl Service> for HyperHandler { type Future = Pin> + Send>>; #[tracing::instrument(level = "trace", skip(self))] - fn call(&mut self, request: Request) -> Self::Future { - self.request_id += 1; + fn call(&self, request: Request) -> Self::Future { + let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed); Box::pin(HyperHandler::::handle_request( request, @@ -145,7 +149,7 @@ impl Service> for HyperHandler { self.filters.clone(), self.port, self.connection_id, - self.request_id, + request_id, self.matched_tx.clone(), )) } diff --git a/mirrord/layer/Cargo.toml b/mirrord/layer/Cargo.toml index 92fab71e5d3..f8cb75e343c 100644 --- a/mirrord/layer/Cargo.toml +++ b/mirrord/layer/Cargo.toml @@ -53,6 +53,7 @@ itertools = "0.10" os_info = "3" bytemuck = "1" hyper = { workspace = true, features = ["client", "http1", "http2"] } +hyper-util.workspace = true http-body-util = { workspace = true } bimap.workspace = true dashmap = "5.4" diff --git a/mirrord/layer/src/tcp_steal/http/v1.rs b/mirrord/layer/src/tcp_steal/http/v1.rs index 55b18a5bec3..bc64c5bc420 100644 --- a/mirrord/layer/src/tcp_steal/http/v1.rs +++ b/mirrord/layer/src/tcp_steal/http/v1.rs @@ -6,6 +6,7 @@ use std::{convert::Infallible, future}; use bytes::Bytes; use http_body_util::combinators::BoxBody; use hyper::client::conn::http1::{self, Connection, SendRequest}; +use hyper_util::rt::TokioIo; use mirrord_protocol::tcp::HttpRequestFallback; use tokio::net::TcpStream; @@ -22,7 +23,7 @@ pub(crate) struct HttpV1(http1::SendRequest>); impl HttpV for HttpV1 { type Sender = SendRequest>; - type Connection = Connection>; + type Connection = Connection, BoxBody>; #[tracing::instrument(level = "trace")] fn new(http_request_sender: Self::Sender) -> Self { @@ -33,7 +34,7 @@ impl HttpV for HttpV1 { async fn handshake( target_stream: TcpStream, ) -> Result<(Self::Sender, Self::Connection), HttpForwarderError> { - Ok(http1::handshake(target_stream).await?) + Ok(http1::handshake(TokioIo::new(target_stream)).await?) } #[tracing::instrument(level = "trace", skip(self))] diff --git a/mirrord/layer/src/tcp_steal/http/v2.rs b/mirrord/layer/src/tcp_steal/http/v2.rs index e58b8ee342f..a9cec45256c 100644 --- a/mirrord/layer/src/tcp_steal/http/v2.rs +++ b/mirrord/layer/src/tcp_steal/http/v2.rs @@ -12,6 +12,7 @@ use hyper::{ client::conn::http2::{self, Connection, SendRequest}, rt::Executor, }; +use hyper_util::rt::{TokioExecutor, TokioIo}; use mirrord_protocol::tcp::HttpRequestFallback; use tokio::net::TcpStream; use tracing::trace; @@ -19,21 +20,24 @@ use tracing::trace; use super::HttpV; use crate::{detour::DetourGuard, tcp_steal::http_forwarding::HttpForwarderError}; -// TODO(alex): Import this from `hyper-util` when the crate is actually published. -/// Future executor that utilises `tokio` threads. +/// Thin wrapper over [`TokioExecutor`]. +/// Makes sure that detours are bypassed when executing futures. #[non_exhaustive] #[derive(Default, Debug, Clone)] -pub struct TokioExecutor; +pub struct DetourGuardExecutor { + inner: TokioExecutor, +} -impl Executor for TokioExecutor +impl Executor for DetourGuardExecutor where Fut: Future + Send + 'static, Fut::Output: Send + 'static, { fn execute(&self, fut: Fut) { trace!("starting tokio executor for hyper HTTP/2"); - tokio::spawn(async move { - let _ = DetourGuard::new(); + + self.inner.execute(async move { + let _guard = DetourGuard::new(); fut.await }); } @@ -49,7 +53,8 @@ pub(crate) struct HttpV2(http2::SendRequest>); impl HttpV for HttpV2 { type Sender = SendRequest>; - type Connection = Connection>; + type Connection = + Connection, BoxBody, DetourGuardExecutor>; #[tracing::instrument(level = "trace")] fn new(http_request_sender: Self::Sender) -> Self { @@ -60,7 +65,7 @@ impl HttpV for HttpV2 { async fn handshake( target_stream: TcpStream, ) -> Result<(Self::Sender, Self::Connection), HttpForwarderError> { - Ok(http2::handshake(TokioExecutor::default(), target_stream).await?) + Ok(http2::handshake(DetourGuardExecutor::default(), TokioIo::new(target_stream)).await?) } #[tracing::instrument(level = "trace", skip(self))]