Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for HTTP upgrades in intproxy (HTTP filter) #2226

Merged
merged 12 commits into from
Feb 12, 2024
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/2224.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Changed internal proxy to allow for HTTP upgrades with filtered HTTP steal.
1 change: 1 addition & 0 deletions mirrord/intproxy/src/background_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ pub enum TaskError<Err> {
}

/// An update received from a [`BackgroundTask`] registered in the [`BackgroundTasks`] struct.
#[derive(Debug)]
pub enum TaskUpdate<MOut, Err> {
/// The task produced a message.
Message(MOut),
Expand Down
183 changes: 67 additions & 116 deletions mirrord/intproxy/src/proxies/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,20 @@ use std::{
net::{IpAddr, SocketAddr},
};

use hyper::Version;
use mirrord_intproxy_protocol::{
ConnMetadataRequest, ConnMetadataResponse, IncomingRequest, IncomingResponse, LayerId,
MessageId, PortSubscribe, PortSubscription, PortUnsubscribe, ProxyToLayerMessage,
};
use mirrord_protocol::{
tcp::{DaemonTcp, HttpRequestFallback, HttpResponseFallback, NewTcpConnection},
tcp::{DaemonTcp, HttpRequestFallback, NewTcpConnection},
ConnectionId, ResponseError,
};
use thiserror::Error;
use tokio::net::TcpSocket;

use self::{
http_interceptor::{HttpInterceptor, HttpInterceptorError},
interceptor::{Interceptor, InterceptorError, MessageOut},
port_subscription_ext::PortSubscriptionExt,
raw_interceptor::RawInterceptor,
subscriptions::SubscriptionsManager,
};
use crate::{
Expand All @@ -31,28 +29,29 @@ use crate::{
};

mod http;
mod http_interceptor;
mod interceptor;
mod port_subscription_ext;
mod raw_interceptor;
mod subscriptions;

/// Common type for errors of the [`RawInterceptor`] and the [`HttpInterceptor`].
#[derive(Error, Debug)]
enum InterceptorError {
#[error("{0}")]
Raw(#[from] io::Error),
#[error("{0}")]
Http(#[from] HttpInterceptorError),
}

/// Common type for messages produced by the [`RawInterceptor`] and the [`HttpInterceptor`].
pub enum InterceptorMessageOut {
Bytes(Vec<u8>),
Http(HttpResponseFallback),
/// Creates and binds a new [`TcpSocket`].
/// The socket has the same IP version and address as the given `addr`.
fn bind_similar(addr: SocketAddr) -> io::Result<TcpSocket> {
match addr.ip() {
addr @ IpAddr::V4(..) => {
let socket = TcpSocket::new_v4()?;
socket.bind(SocketAddr::new(addr, 0))?;
Ok(socket)
}
addr @ IpAddr::V6(..) => {
let socket = TcpSocket::new_v6()?;
socket.bind(SocketAddr::new(addr, 0))?;
Ok(socket)
}
}
}

/// Id of a single interceptor task. Used to manage interceptor tasks with the [`BackgroundTasks`]
/// struct.
/// Id of a single [`Interceptor`] task. Used to manage interceptor tasks with the
/// [`BackgroundTasks`] struct.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct InterceptorId(pub ConnectionId);

Expand All @@ -65,19 +64,7 @@ impl fmt::Display for InterceptorId {
/// Errors that can occur when handling the `incoming` feature.
#[derive(Error, Debug)]
pub enum IncomingProxyError {
/// The proxy received from the agent a message incompatible with the `steal` feature, but it
/// operates in the `steal` mode. This should never happen.
#[error("received TCP mirror message while in steal mode: {0:?}")]
ReceivedMirrorMessage(DaemonTcp),
/// The proxy received from the agent a message related to the `steal` feature, but it does not
/// operate in the `steal` mode. This should never happen.
#[error("received TCP steal message while in mirror mode: {0:?}")]
ReceivedStealMessage(DaemonTcp),
/// The agent sent an HTTP request with unsupported [`Version`].
/// [`Version::HTTP_3`] is currently not supported.
#[error("{0:?} is not supported")]
UnsupportedHttpVersion(Version),
#[error("{0}")]
#[error(transparent)]
Io(#[from] io::Error),
#[error("subscribing port failed: {0}")]
SubscriptionFailed(ResponseError),
Expand All @@ -92,11 +79,15 @@ pub enum IncomingProxyMessage {
AgentSteal(DaemonTcp),
}

struct InterceptorHandle<I: BackgroundTask> {
tx: TaskSender<I>,
/// Handle for an [`Interceptor`].
struct InterceptorHandle {
/// A channel for sending messages to the [`Interceptor`] task.
tx: TaskSender<Interceptor>,
/// Port subscription that the intercepted connection belongs to.
subscription: PortSubscription,
}

/// Store for mapping [`Interceptor`] socket addresses to addresses of the original peers.
#[derive(Default)]
struct MetadataStore {
prepared_responses: HashMap<ConnMetadataRequest, ConnMetadataResponse>,
Expand Down Expand Up @@ -129,45 +120,21 @@ impl MetadataStore {
/// Handles logic and state of the `incoming` feature.
/// Run as a [`BackgroundTask`].
///
/// Handles two types of communication: raw TCP and HTTP.
///
/// # TCP flow
///
/// 1. Proxy receives a [`PortSubscribe`] request from the layer and sends a corresponding request
/// to the agent.
/// 2. Proxy receives a confirmation from the agent and responds to the layer.
/// 3. Proxy receives [`NewTcpConnection`](mirrord_protocol::tcp::NewTcpConnection) messages from
/// the agent. For each connection, it creates a new [`RawInterceptor`] task.
/// 4. The interceptor connects to the socket specified in the original [`PortSubscribe`] request.
/// 5. The proxy passes the data between the agent and the [`RawInterceptor`] task. If the proxy
/// does not operate in the `steal` mode, data coming from the interceptor is discarded.
/// 6. If the layer closes the connection, the [`RawInterceptor`] exits and the proxy notifies the
/// agent. If the agent closes the connection, the proxy shuts down the [`RawInterceptor`].
/// Handles port subscriptions state of the connected layers. Utilizes multiple background tasks
/// ([`Interceptor`]s) to handle incoming connections. Each connection is managed by a single
/// [`Interceptor`], that establishes a TCP connection with the user application's port and proxies
/// data.
///
/// # HTTP flow
///
/// 1. Proxy receives a [`PortSubscribe`] request from the layer and sends a corresponding request
/// to the agent.
/// 2. Proxy receives a confirmation from the agent and responds to the layer.
/// 3. Proxy receives [`HttpRequest`](mirrord_protocol::tcp::HttpRequest)s from the agent. If there
/// is no registered [`HttpInterceptor`] task for the [`ConnectionId`] specified in the request,
/// the proxy creates one.
/// 4. The interceptor connects to the socket specified in the original [`PortSubscribe`] request.
/// 5. The proxy passes the requests and the responses between the agent and the [`HttpInterceptor`]
/// task. If the proxy does not operate in the `steal` mode, responses coming from the
/// interceptor are discarded.
/// 6. If the layer closes the connection, the [`HttpInterceptor`] exits and the proxy notifies the
/// agent. If the agent closes the connection, the proxy shuts down the [`HttpInterceptor`].
/// Incoming connections are created by the agent either explicitly ([`NewTcpConnection`] message)
/// or implicitly ([`HttpRequest`](mirrord_protocol::tcp::HttpRequest)).
#[derive(Default)]
pub struct IncomingProxy {
/// Active port subscriptions for all layers.
subscriptions: SubscriptionsManager,
/// [`TaskSender`]s for active [`RawInterceptor`]s.
interceptors_raw: HashMap<InterceptorId, InterceptorHandle<RawInterceptor>>,
/// [`TaskSender`]s for active [`HttpInterceptor`]s.
interceptors_http: HashMap<InterceptorId, InterceptorHandle<HttpInterceptor>>,
/// For receiving updates from both [`RawInterceptor`]s and [`HttpInterceptor`]s.
background_tasks: BackgroundTasks<InterceptorId, InterceptorMessageOut, InterceptorError>,
/// [`TaskSender`]s for active [`Interceptor`]s.
interceptors: HashMap<InterceptorId, InterceptorHandle>,
/// For receiving updates from [`Interceptor`]s.
background_tasks: BackgroundTasks<InterceptorId, MessageOut, InterceptorError>,
/// For managing intercepted connections metadata.
metadata_store: MetadataStore,
}
Expand Down Expand Up @@ -210,38 +177,40 @@ impl IncomingProxy {
}
}

/// Retrieves or creates [`HttpInterceptor`] for the given [`HttpRequestFallback`].
/// The request may or may not belong to an existing connection (unlike [`RawInterceptor`]s,
/// [`HttpInterceptor`]s are created lazily).
/// Retrieves or creates an [`Interceptor`] for the given [`HttpRequestFallback`].
/// The request may or may not belong to an existing connection (when stealing with an http
/// filter, connections are created implicitly).
#[tracing::instrument(level = "trace", skip(self))]
fn get_or_create_http_interceptor(
fn get_interceptor_for_http_request(
&mut self,
request: &HttpRequestFallback,
) -> Result<Option<&TaskSender<HttpInterceptor>>, IncomingProxyError> {
) -> Result<Option<&TaskSender<Interceptor>>, IncomingProxyError> {
let id: InterceptorId = InterceptorId(request.connection_id());

let interceptor = match self.interceptors_http.entry(id) {
let interceptor = match self.interceptors.entry(id) {
Entry::Occupied(e) => e.into_mut(),

Entry::Vacant(e) => {
let Some(sub) = self.subscriptions.get(request.port()) else {
let Some(subscription) = self.subscriptions.get(request.port()) else {
tracing::trace!(
"received a new http request for port {} that is no longer mirrored",
request.port()
"received a new connection for port {} that is no longer mirrored",
request.port(),
);

return Ok(None);
};

let version = request.version();
let interceptor_socket = bind_similar(subscription.listening_on)?;

let interceptor = self.background_tasks.register(
HttpInterceptor::new(sub.listening_on, version),
InterceptorId(request.connection_id()),
Interceptor::new(interceptor_socket, subscription.listening_on),
id,
Self::CHANNEL_SIZE,
);

e.insert(InterceptorHandle {
tx: interceptor,
subscription: sub.subscription.clone(),
subscription: subscription.subscription.clone(),
})
}
};
Expand All @@ -258,15 +227,11 @@ impl IncomingProxy {
) -> Result<(), IncomingProxyError> {
match message {
DaemonTcp::Close(close) => {
self.interceptors_raw
.remove(&InterceptorId(close.connection_id));
self.interceptors_http
self.interceptors
.remove(&InterceptorId(close.connection_id));
}
DaemonTcp::Data(data) => {
if let Some(interceptor) = self
.interceptors_raw
.get(&InterceptorId(data.connection_id))
if let Some(interceptor) = self.interceptors.get(&InterceptorId(data.connection_id))
{
interceptor.tx.send(data.bytes).await;
} else {
Expand All @@ -278,14 +243,14 @@ impl IncomingProxy {
}
DaemonTcp::HttpRequest(req) => {
let req = HttpRequestFallback::Fallback(req);
let interceptor = self.get_or_create_http_interceptor(&req)?;
let interceptor = self.get_interceptor_for_http_request(&req)?;
if let Some(interceptor) = interceptor {
interceptor.send(req).await;
}
}
DaemonTcp::HttpRequestFramed(req) => {
let req = HttpRequestFallback::Framed(req);
let interceptor = self.get_or_create_http_interceptor(&req)?;
let interceptor = self.get_interceptor_for_http_request(&req)?;
if let Some(interceptor) = interceptor {
interceptor.send(req).await;
}
Expand All @@ -297,29 +262,18 @@ impl IncomingProxy {
source_port,
local_address,
}) => {
let Some(sub) = self.subscriptions.get(destination_port) else {
let Some(subscription) = self.subscriptions.get(destination_port) else {
tracing::trace!("received a new connection for port {destination_port} that is no longer mirrored");
return Ok(());
};

let interceptor_socket = match sub.listening_on.ip() {
addr @ IpAddr::V4(..) => {
let socket = TcpSocket::new_v4()?;
socket.bind(SocketAddr::new(addr, 0))?;
socket
}
addr @ IpAddr::V6(..) => {
let socket = TcpSocket::new_v6()?;
socket.bind(SocketAddr::new(addr, 0))?;
socket
}
};
let interceptor_socket = bind_similar(subscription.listening_on)?;

let id = InterceptorId(connection_id);

self.metadata_store.expect(
ConnMetadataRequest {
listener_address: sub.listening_on,
listener_address: subscription.listening_on,
peer_address: interceptor_socket.local_addr()?,
},
id,
Expand All @@ -330,15 +284,16 @@ impl IncomingProxy {
);

let interceptor = self.background_tasks.register(
RawInterceptor::new(sub.listening_on, interceptor_socket),
Interceptor::new(interceptor_socket, subscription.listening_on),
id,
Self::CHANNEL_SIZE,
);
self.interceptors_raw.insert(

self.interceptors.insert(
id,
InterceptorHandle {
tx: interceptor,
subscription: sub.subscription.clone(),
subscription: subscription.subscription.clone(),
},
);
}
Expand Down Expand Up @@ -368,13 +323,9 @@ impl IncomingProxy {
}

fn get_subscription(&self, interceptor_id: InterceptorId) -> Option<&PortSubscription> {
if let Some(handle) = self.interceptors_raw.get(&interceptor_id) {
Some(&handle.subscription)
} else if let Some(handle) = self.interceptors_http.get(&interceptor_id) {
Some(&handle.subscription)
} else {
None
}
self.interceptors
.get(&interceptor_id)
.map(|handle| &handle.subscription)
}
}

Expand Down Expand Up @@ -417,7 +368,7 @@ impl BackgroundTask for IncomingProxy {

let msg = self.get_subscription(id).map(|s| s.wrap_agent_unsubscribe_connection(id.0));
if let Some(msg) = msg {
message_bus.send(ProxyMessage::ToAgent(msg)).await;
message_bus.send(msg).await;
}
},

Expand Down
Loading
Loading