Skip to content

Commit

Permalink
more e2e stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
t4lz committed Feb 13, 2024
1 parent b1d23d5 commit fbcc582
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 31 deletions.
2 changes: 1 addition & 1 deletion tests/configs/sqs_queue_splitting_a.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"feature": {
"split_queues": {
"first-queue": {
"e2e-test-queue": {
"queue_type": "SQS",
"message_filter": {
"wows": "so wows",
Expand Down
86 changes: 71 additions & 15 deletions tests/src/operator/queue_splitting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@
#![cfg(feature = "operator")]
//! Test queue splitting features with an operator.

use std::time::Duration;
use std::collections::{BTreeMap, HashMap};

use aws_sdk_sqs::types::QueueAttributeName::{FifoQueue, MessageRetentionPeriod};
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{Api, Client};
use reqwest::header::HeaderMap;
use mirrord_operator::crd::{
MirrordQueueSplitter, MirrordQueueSplitterSpec, QueueConsumer, QueueNameSource, SplitQueue,
};
use rstest::*;
use tokio::join;

use crate::utils::{
config_dir, kube_client, send_request, service, Application, KubeService, ResourceGuard,
TEST_RESOURCE_LABEL,
config_dir, kube_client, service, service_with_env, Application, KubeService, ResourceGuard,
};

const SQS_CONFIG_MAP_NAME: &str = "mirrord-e2e-test-sqs-splitting";
const SQS_CONFIG_MAP_KEY_NAME: &str = "queue_name";
const QUEUE_SPLITTER_RESOURCE_NAME: &str = "mirrord-e2e-test-queue-splitter";

#[fixture]
fn sqs_queue_name() -> String {
Expand Down Expand Up @@ -49,7 +53,10 @@ pub async fn create_config_map(
let config_map_api: Api<ConfigMap> = Api::namespaced(kube_client.clone(), namespace);
let config_map = ConfigMap {
binary_data: None,
data: None,
data: Some(BTreeMap::from([(
SQS_CONFIG_MAP_KEY_NAME.to_string(),
sqs_queue_name,
)])),
immutable: None,
metadata: Default::default(),
};
Expand All @@ -60,25 +67,67 @@ pub async fn create_config_map(
true,
)
.await
.expect("Could not create policy in E2E test.")
.expect("Could not create config map in E2E test.")
}

/// Create the `MirrordQueueSplitter` K8s resource and a resource guard to delete it when done.
pub async fn create_splitter_resource(
kube_client: Client,
namespace: &str,
deployment_name: &str,
) -> ResourceGuard {
let qs_api: Api<MirrordQueueSplitter> = Api::namespaced(kube_client.clone(), namespace);
let queue_splitter = MirrordQueueSplitter::new(
QUEUE_SPLITTER_RESOURCE_NAME,
MirrordQueueSplitterSpec {
queues: HashMap::from([(
"e2e-test-queue".to_string(),
SplitQueue::Sqs(QueueNameSource::ConfigMap {
name: SQS_CONFIG_MAP_NAME.to_string(),
queue_name_key: SQS_CONFIG_MAP_KEY_NAME.to_string(),
sub_key: None,
}),
)]),
consumer: QueueConsumer::Deployment(deployment_name.to_string()),
},
);
ResourceGuard::create(
qs_api,
QUEUE_SPLITTER_RESOURCE_NAME.to_string(),
&queue_splitter,
true,
)
.await
.expect("Could not create queue splitter in E2E test.")
}

#[fixture]
pub async fn sqs_consumer_service(#[future] kube_client: Client) -> KubeService {
// By using a different namespace for each test run, we're able to use a constant ConfigMap
// name, and don't have to pass that information to the test service.
let namespace = format!("e2e-tests-sqs-splitting-{}", crate::utils::random_string());
let service = service(
service_with_env(
&namespace,
"ClusterIP",
"ghcr.io/metalbear-co/mirrord-node-udp-logger:latest", // TODO
"udp-logger",
true,
"queue-forwarder",
false,
kube_client,
serde_json::json!([
// TODO
]),
)
.await;
.await
}

/// Send 6 messages to the original queue, such that 2 will reach each mirrord run and 2 the
/// deployed app.
async fn write_sqs_messages(queue_url: &str) {
// TODO
}

/// This test creates a new sqs_queue with a random name and credentials from env.
///
/// Define a queue splitter for a deployment. Start two services that both consume from an SQS
/// queue, send some messages to the queue, verify that each of the applications running with
/// mirrord get exactly the messages they are supposed to, and that the deployed application gets
Expand All @@ -91,7 +140,7 @@ pub async fn two_users_one_queue(
#[future] sqs_consumer_service: KubeService,
#[future] sqs_queue: (String, String),
) {
let application = Application::Go21HTTP;
let application = Application::Go21HTTP; // TODO
let kube_client = kube_client.await;
let (sqs_queue_name, sqs_queue_url) = sqs_queue.await;

Expand All @@ -101,6 +150,10 @@ pub async fn two_users_one_queue(
let _config_map_guard =
create_config_map(kube_client.clone(), &deployed.namespace, sqs_queue_name);

// Create the `MirrordQueueSplitter` to be used in the client configurations.
let _splitter_resource_guard =
create_splitter_resource(kube_client.clone(), &deployed.namespace, &deployed.name).await;

let mut config_path = config_dir.clone();
config_path.push("sqs_queue_splitting_a.json");

Expand All @@ -125,11 +178,14 @@ pub async fn two_users_one_queue(
)
.await;

// TODO: send messages to the original queue.
write_sqs_messages(&sqs_queue_url).await;

// TODO: wait here for both clients to exit and assert exit status 0 in each of them.
// the test application consumes messages and verifies exact expected messages.
// The test application consumes messages and verifies exact expected messages.
join!(
client_a.wait_assert_success(),
client_b.wait_assert_success()
);

// TODO: read output queue and verify that exactly the expected messages were
// consumed forwarded.
// consumed and forwarded.
}
92 changes: 77 additions & 15 deletions tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use rand::{distributions::Alphanumeric, Rng};
use reqwest::{RequestBuilder, StatusCode};
use rstest::*;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::json;
use serde_json::{json, Value};
use tempfile::{tempdir, TempDir};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufReader},
Expand Down Expand Up @@ -647,6 +647,25 @@ impl Drop for KubeService {
}
}

fn default_env() -> Value {
json!(
[
{
"name": "MIRRORD_FAKE_VAR_FIRST",
"value": "mirrord.is.running"
},
{
"name": "MIRRORD_FAKE_VAR_SECOND",
"value": "7777"
},
{
"name": "MIRRORD_FAKE_VAR_THIRD",
"value": "foo=bar"
}
]
)
}

/// Create a new [`KubeService`] and related Kubernetes resources. The resources will be deleted
/// when the returned service is dropped, unless it is dropped during panic.
/// This behavior can be changed, see [`FORCE_CLEANUP_ENV_NAME`].
Expand All @@ -659,6 +678,62 @@ pub async fn service(
#[default("http-echo")] service_name: &str,
#[default(true)] randomize_name: bool,
#[future] kube_client: Client,
) -> KubeService {
internal_service(
namespace,
service_type,
image,
service_name,
randomize_name,
kube_client.await,
default_env(),
)
}

/// Create a new [`KubeService`] and related Kubernetes resources. The resources will be deleted
/// when the returned service is dropped, unless it is dropped during panic.
/// This behavior can be changed, see [`FORCE_CLEANUP_ENV_NAME`].
/// * `randomize_name` - whether a random suffix should be added to the end of the resource names
/// * `env` - `Value`, should be `Value::Array` of kubernetes container env var definitions.
pub async fn service_with_env(
namespace: &str,
service_type: &str,
image: &str,
service_name: &str,
randomize_name: bool,
kube_client: Client,
env: Value,
) -> KubeService {
internal_service(
namespace,
service_type,
image,
service_name,
randomize_name,
kube_client.await,
env,
)
}

/// Internal function to create a custom [`KubeService`].
/// We keep this private so that whenever we need more customization of test resources, we can
/// change this function and how the public ones use it, and add a new public function that exposes
/// more customization, and we don't need to change all existing usages of public functions/fixtures
/// in tests.
///
/// Create a new [`KubeService`] and related Kubernetes resources. The resources will be
/// deleted when the returned service is dropped, unless it is dropped during panic.
/// This behavior can be changed, see [`FORCE_CLEANUP_ENV_NAME`].
/// * `randomize_name` - whether a random suffix should be added to the end of the resource names
/// * `env` - `Value`, should be `Value::Array` of kubernetes container env var definitions.
async fn internal_service(
namespace: &str,
service_type: &str,
image: &str,
service_name: &str,
randomize_name: bool,
kube_client: Client,
env: Value,
) -> KubeService {
let delete_after_fail = std::env::var_os(PRESERVE_FAILED_ENV_NAME).is_none();

Expand Down Expand Up @@ -747,20 +822,7 @@ pub async fn service(
"containerPort": 80
}
],
"env": [
{
"name": "MIRRORD_FAKE_VAR_FIRST",
"value": "mirrord.is.running"
},
{
"name": "MIRRORD_FAKE_VAR_SECOND",
"value": "7777"
},
{
"name": "MIRRORD_FAKE_VAR_THIRD",
"value": "foo=bar"
}
],
"env": env,
}
]
}
Expand Down

0 comments on commit fbcc582

Please sign in to comment.