Skip to content

Commit

Permalink
Change tests to not require multi threaded runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
aviramha committed Feb 13, 2024
1 parent 0486c40 commit 8baeced
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 42 deletions.
1 change: 1 addition & 0 deletions changelog.d/+refactor-tests.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change tests to not require multi threaded runtime
2 changes: 1 addition & 1 deletion tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ k8s-openapi.workspace = true
kube.workspace = true
reqwest.workspace = true
regex.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "net", "macros", "process"] }
tokio = { workspace = true, features = ["rt", "net", "macros", "process"] }
serde_json.workspace = true
mirrord = { artifact = "bin", path = "../mirrord/cli" }
mirrord-operator = { path = "../mirrord/operator" }
Expand Down
2 changes: 1 addition & 1 deletion tests/issue1317/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ edition.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1", features = ["rt", "rt-multi-thread", "net", "macros", "process"] }
tokio = { version = "1", features = ["rt", "net", "macros", "process"] }
hyper = { git = "https://github.com/meowjesty/hyper", branch = "master", features = ["full"] }
tracing = { git = "https://github.com/metalbear-co/tracing", package = "tracing", branch = "worker_options_non_blocking_v1" }
actix-web = "4"
Expand Down
2 changes: 1 addition & 1 deletion tests/rust-bypassed-unix-socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ edition.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.26", features = ["rt", "rt-multi-thread", "macros", "net", "io-util"] }
tokio = { version = "1.26", features = ["rt", "macros", "net", "io-util"] }
2 changes: 1 addition & 1 deletion tests/rust-unix-socket-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ edition.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.26", features = ["rt", "rt-multi-thread", "macros", "net", "io-util"] }
tokio = { version = "1.26", features = ["rt", "macros", "net", "io-util"] }
44 changes: 22 additions & 22 deletions tests/src/traffic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod traffic {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(240))]
pub async fn remote_dns_enabled_works(#[future] service: KubeService) {
let service = service.await;
Expand All @@ -35,7 +35,7 @@ mod traffic {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(240))]
pub async fn remote_dns_lookup_google(#[future] service: KubeService) {
let service = service.await;
Expand All @@ -55,7 +55,7 @@ mod traffic {
// directly sent out from the local application).
#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
pub async fn outgoing_traffic_single_request_enabled(#[future] service: KubeService) {
let service = service.await;
let node_command = vec![
Expand All @@ -71,7 +71,7 @@ mod traffic {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[should_panic]
pub async fn outgoing_traffic_single_request_ipv6(#[future] service: KubeService) {
let service = service.await;
Expand All @@ -88,7 +88,7 @@ mod traffic {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
pub async fn outgoing_traffic_single_request_disabled(#[future] service: KubeService) {
let service = service.await;
let node_command = vec![
Expand All @@ -111,7 +111,7 @@ mod traffic {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
pub async fn outgoing_traffic_make_request_after_listen(#[future] service: KubeService) {
let service = service.await;
let node_command = vec![
Expand All @@ -126,7 +126,7 @@ mod traffic {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
pub async fn outgoing_traffic_make_request_localhost(#[future] service: KubeService) {
let service = service.await;
let node_command = vec![
Expand All @@ -144,7 +144,7 @@ mod traffic {
/// that and verifies that mirrord intercepts and forwards the outgoing udp message.
#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(240))]
pub async fn outgoing_traffic_udp_with_connect(
#[future] udp_logger_service: KubeService,
Expand Down Expand Up @@ -229,7 +229,7 @@ mod traffic {
/// filter to resolve the remote host names.
#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(240))]
pub async fn outgoing_traffic_filter_udp_with_connect(
config_dir: &std::path::PathBuf,
Expand Down Expand Up @@ -347,7 +347,7 @@ mod traffic {
/// application calls `connect` on a UDP socket with outgoing traffic disabled on mirrord.
#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(30))]
pub async fn outgoing_disabled_udp(#[future] service: KubeService) {
let service = service.await;
Expand Down Expand Up @@ -390,15 +390,15 @@ mod traffic {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
pub async fn go19_outgoing_traffic_single_request_enabled(#[future] service: KubeService) {
let command = vec!["go-e2e-outgoing/19.go_test_app"];
test_go(service, command).await;
}

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(60))]
pub async fn go20_outgoing_traffic_single_request_enabled(#[future] service: KubeService) {
let command = vec!["go-e2e-outgoing/20.go_test_app"];
Expand All @@ -407,15 +407,15 @@ mod traffic {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
pub async fn go21_outgoing_traffic_single_request_enabled(#[future] service: KubeService) {
let command = vec!["go-e2e-outgoing/21.go_test_app"];
test_go(service, command).await;
}

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(60))]
pub async fn go19_dns_lookup(#[future] service: KubeService) {
let command = vec!["go-e2e-dns/19.go_test_app"];
Expand All @@ -424,7 +424,7 @@ mod traffic {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(60))]
pub async fn go20_dns_lookup(#[future] service: KubeService) {
let command = vec!["go-e2e-dns/20.go_test_app"];
Expand All @@ -433,7 +433,7 @@ mod traffic {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(60))]
pub async fn go21_dns_lookup(#[future] service: KubeService) {
let command = vec!["go-e2e-dns/21.go_test_app"];
Expand All @@ -442,7 +442,7 @@ mod traffic {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
pub async fn listen_localhost(#[future] service: KubeService) {
let service = service.await;
let node_command = vec!["node", "node-e2e/listen/test_listen_localhost.mjs"];
Expand All @@ -454,7 +454,7 @@ mod traffic {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(120))]
pub async fn gethostname_remote_result(#[future] hostname_service: KubeService) {
let service = hostname_service.await;
Expand All @@ -475,7 +475,7 @@ mod traffic {
/// 3. Verify the client app did not panic.
#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(60))]
pub async fn outgoing_unix_stream_pathname(
#[future]
Expand Down Expand Up @@ -512,7 +512,7 @@ mod traffic {
/// case of connections to unix sockets).
#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(240))]
pub async fn outgoing_bypassed_unix_stream_pathname(#[future] service: KubeService) {
let service = service.await;
Expand All @@ -531,7 +531,7 @@ mod traffic {
}

#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[cfg_attr(not(any(feature = "ephemeral", feature = "job")), ignore)]
pub async fn test_outgoing_traffic_many_requests_enabled(#[future] service: KubeService) {
let service = service.await;
Expand All @@ -548,7 +548,7 @@ mod traffic {
}

#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[cfg_attr(not(any(feature = "ephemeral", feature = "job")), ignore)]
pub async fn test_outgoing_traffic_many_requests_disabled(#[future] service: KubeService) {
let service = service.await;
Expand Down
32 changes: 16 additions & 16 deletions tests/src/traffic/steal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
#[cfg(test)]
mod steal {
use std::{
io::{BufRead, BufReader, Read, Write},
net::{SocketAddr, TcpStream},
io::{BufRead, Read, Write},
net::SocketAddr,
time::Duration,
};

use kube::Client;
use reqwest::{header::HeaderMap, Url};
use rstest::*;
use tokio::time::sleep;
use tokio::{io::BufReader, net::TcpStream, time::sleep};
use tokio_tungstenite::connect_async;

use crate::utils::{
Expand All @@ -22,7 +22,7 @@ mod steal {
#[cfg_attr(not(any(feature = "ephemeral", feature = "job")), ignore)]
#[cfg(target_os = "linux")]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(240))]
async fn steal_http_traffic(
#[future] service: KubeService,
Expand Down Expand Up @@ -61,7 +61,7 @@ mod steal {
#[cfg_attr(not(any(feature = "ephemeral", feature = "job")), ignore)]
#[cfg(target_os = "linux")]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(240))]
async fn steal_http_traffic_with_flush_connections(
#[future] service: KubeService,
Expand Down Expand Up @@ -106,7 +106,7 @@ mod steal {
/// closes a socket.
#[cfg_attr(not(any(feature = "ephemeral", feature = "job")), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(240))]
async fn close_socket(#[future] service: KubeService, #[future] kube_client: Client) {
let application = Application::PythonCloseSocket;
Expand Down Expand Up @@ -185,7 +185,7 @@ mod steal {
/// application closes a socket, we stop stealing existing connections.
#[ignore]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(240))]
async fn close_socket_keep_connection(
#[future] service: KubeService,
Expand Down Expand Up @@ -264,7 +264,7 @@ mod steal {
/// then run test with MIRRORD_TESTS_USE_BINARY=../target/universal-apple-darwin/debug/mirrord
#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(120))]
async fn filter_with_single_client_and_only_matching_requests(
#[future] service: KubeService,
Expand Down Expand Up @@ -307,7 +307,7 @@ mod steal {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(120))]
async fn filter_with_single_client_and_only_matching_requests_new(
config_dir: &std::path::PathBuf,
Expand Down Expand Up @@ -348,7 +348,7 @@ mod steal {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(120))]
async fn filter_with_single_client_requests_by_path(
config_dir: &std::path::PathBuf,
Expand Down Expand Up @@ -404,7 +404,7 @@ mod steal {

#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(120))]
async fn test_filter_with_single_client_and_only_matching_requests_http2(
#[future] http2_service: KubeService,
Expand Down Expand Up @@ -469,7 +469,7 @@ mod steal {
/// then run test with MIRRORD_TESTS_USE_BINARY=../target/universal-apple-darwin/debug/mirrord
#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(120))]
async fn filter_with_single_client_and_some_matching_requests(
#[future] service: KubeService,
Expand Down Expand Up @@ -538,7 +538,7 @@ mod steal {
/// app does not see the traffic.
#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(120))]
async fn complete_passthrough(
#[future] tcp_echo_service: KubeService,
Expand Down Expand Up @@ -566,11 +566,11 @@ mod steal {
.await;

let addr = SocketAddr::new(host.trim().parse().unwrap(), port as u16);
let mut stream = TcpStream::connect(addr).unwrap();
let mut stream = TcpStream::connect(addr).await.unwrap();
stream.write(tcp_data.as_bytes()).unwrap();
let mut reader = BufReader::new(stream);
let mut buf = String::new();
reader.read_line(&mut buf).unwrap();
reader.read_line(&mut buf).await.unwrap();
println!("Got response: {buf}");
// replace "remote: " with empty string, since the response can be split into frames
// and we just need assert the final response
Expand Down Expand Up @@ -603,7 +603,7 @@ mod steal {
/// app does not see the traffic.
#[cfg_attr(not(feature = "job"), ignore)]
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[tokio::test]
#[timeout(Duration::from_secs(60))]
async fn websocket_upgrade(
#[future] websocket_service: KubeService,
Expand Down

0 comments on commit 8baeced

Please sign in to comment.