Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e test for filtered WS stealing #2214

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ members = [
"tests/rust-unix-socket-client",
"tests/rust-bypassed-unix-socket",
"tests/issue1317",
"tests/rust-websockets",
]
resolver = "2"

Expand Down
1 change: 1 addition & 0 deletions changelog.d/+steal-ws-test.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prepared an e2e test for stealing WebSockets connections with an HTTP filter set.
10 changes: 10 additions & 0 deletions tests/rust-websockets/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "rust-websockets"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
axum = { version = "0.6", features = ["ws"] }
tokio = { version = "1", features = ["rt", "macros"]}
79 changes: 79 additions & 0 deletions tests/rust-websockets/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//! Test app that opens an HTTP server at port `80`. It accepts WebSockets connections on the root
//! path and echoes all [`Message::Text`] and [`Message::Binary`] messages. The app exists
//! successfully after any WebSockets connection closes.

use std::net::SocketAddr;

use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
},
routing::get,
Router,
};
use tokio::sync::mpsc;

#[tokio::main(flavor = "current_thread")]
async fn main() {
let addr: SocketAddr = "0.0.0.0:80".parse().unwrap();

let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel::<()>();

let app = Router::new()
.route(
"/",
get(
|ws: WebSocketUpgrade, shutdown: State<mpsc::UnboundedSender<()>>| async move {
ws.on_upgrade(move |socket| handle_socket(socket, shutdown.0))
},
),
)
.with_state(shutdown_tx);

axum::Server::bind(&addr)
.serve(app.into_make_service())
.with_graceful_shutdown(async move {
shutdown_rx.recv().await;
})
.await
.unwrap();
}

async fn handle_socket(mut socket: WebSocket, shutdown: mpsc::UnboundedSender<()>) {
loop {
let msg = match socket.recv().await {
Some(Ok(msg)) => msg,
Some(Err(err)) => {
eprintln!("Error while receiving message: {err:?}");
break;
}
None => {
eprintln!("Connection broke");
break;
}
};

let res = match msg {
Message::Binary(data) => socket.send(Message::Binary(data)).await,
Message::Text(text) => socket.send(Message::Text(text)).await,
Message::Close(close_frame) => {
eprintln!(
"Client closed connection with reason: {:?}",
close_frame.map(|frame| frame.reason)
);
shutdown.send(()).ok();
break;
}
Message::Ping(..) | Message::Pong(..) => {
// ping pong is handled by axum framework
Ok(())
}
};

if let Err(err) = res {
eprintln!("Error while sending message: {err:?}");
break;
}
}
}
94 changes: 87 additions & 7 deletions tests/src/traffic/steal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ mod steal {
time::Duration,
};

use futures_util::{SinkExt, StreamExt};
use kube::Client;
use reqwest::{header::HeaderMap, Url};
use rstest::*;
use tokio::time::sleep;
use tokio_tungstenite::connect_async;
use tokio_tungstenite::{
connect_async,
tungstenite::{client::IntoClientRequest, Message},
};

use crate::utils::{
config_dir, get_service_host_and_port, get_service_url, http2_service, kube_client,
Expand Down Expand Up @@ -596,16 +600,17 @@ mod steal {
application.assert(&mirrorded_process).await;
}

/// Test the case where running with `steal` set and an http header filter, we get an HTTP
/// upgrade request, and this should not reach the local app.
/// Test the case where we're running with `steal` set and an http header filter, the target
/// gets an HTTP upgrade request, but the request does not match the filter and should not
/// reach the local app.
///
/// We verify that the traffic is forwarded to- and handled by the deployed app, and the local
/// We verify that the traffic is handled by the deployed app, and the local
/// app does not see the traffic.
#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[timeout(Duration::from_secs(60))]
async fn websocket_upgrade(
async fn websocket_upgrade_no_filter_match(
#[future] websocket_service: KubeService,
#[future] kube_client: Client,
#[values(Application::PythonFastApiHTTP, Application::NodeHTTP)] application: Application,
Expand All @@ -615,8 +620,6 @@ mod steal {
)]
write_data: String,
) {
use futures_util::{SinkExt, StreamExt};

let service = websocket_service.await;
let kube_client = kube_client.await;
let (host, port) = get_service_host_and_port(kube_client.clone(), &service).await;
Expand Down Expand Up @@ -679,4 +682,81 @@ mod steal {

application.assert(&mirrorded_process).await;
}

/// Test the case where we're running with `steal` set and an http header filter, the target
/// gets an HTTP upgrade request, the request matches the filter and the whole websocket
/// connection should be handled by the local app.
///
/// We verify that the traffic is handled by the local app.
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[timeout(Duration::from_secs(60))]
async fn websocket_upgrade_filter_match(
#[future] websocket_service: KubeService,
#[future] kube_client: Client,
#[values(Application::RustWebsockets)] application: Application,
) {
let service = websocket_service.await;
let kube_client = kube_client.await;
let (host, port) = get_service_host_and_port(kube_client.clone(), &service).await;

let mut mirrorded_process = application
.run(
&service.target,
Some(&service.namespace),
Some(vec!["--steal"]),
Some(vec![("MIRRORD_HTTP_HEADER_FILTER", "x-filter: yes")]),
)
.await;

mirrorded_process
.wait_for_line(Duration::from_secs(40), "daemon subscribed")
.await;

// Create a websocket connection to test the HTTP upgrade steal.
// Add a header so that the request matches the filter.
let mut request = format!("ws://{}:{port}", host.trim())
.into_client_request()
.unwrap();
request
.headers_mut()
.append("x-filter", "yes".try_into().unwrap());
let (mut stream, _) = connect_async(request)
.await
.expect("failed to create connection");

let messages = [
Message::Text("local: hello_1".to_string()),
Message::Binary("local: hello_2".as_bytes().to_vec()),
];
for message in &messages {
stream
.send(message.clone())
.await
.expect("failed to send message");
loop {
let response = stream
.next()
.await
.expect("connection broke")
.expect("failed to read message");
match response {
Message::Ping(data) => stream
.send(Message::Pong(data))
.await
.expect("failed to send message"),
response if &response == message => break,
other => panic!("unexpected message received: {other:?}"),
}
}
}

stream
.close(None)
.await
.expect("failed to close connection");

let status = mirrorded_process.wait().await;
assert!(status.success(), "test process failed");
}
}
2 changes: 2 additions & 0 deletions tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub enum Application {
CurlToKubeApi,
PythonCloseSocket,
PythonCloseSocketKeepConnection,
RustWebsockets,
}

#[derive(Debug)]
Expand Down Expand Up @@ -333,6 +334,7 @@ impl Application {
Application::CurlToKubeApi => {
vec!["curl", "https://kubernetes/api", "--insecure"]
}
Application::RustWebsockets => vec!["../target/debug/rust-websockets"],
}
}

Expand Down
Loading