From fbcc5828b4094da7681c9f32c21ca7de752e0c00 Mon Sep 17 00:00:00 2001 From: t4lz Date: Mon, 5 Feb 2024 10:55:54 +0100 Subject: [PATCH] more e2e stuff --- tests/configs/sqs_queue_splitting_a.json | 2 +- tests/src/operator/queue_splitting.rs | 86 ++++++++++++++++++---- tests/src/utils.rs | 92 ++++++++++++++++++++---- 3 files changed, 149 insertions(+), 31 deletions(-) diff --git a/tests/configs/sqs_queue_splitting_a.json b/tests/configs/sqs_queue_splitting_a.json index 298662c262e..40658ddaf3e 100644 --- a/tests/configs/sqs_queue_splitting_a.json +++ b/tests/configs/sqs_queue_splitting_a.json @@ -1,7 +1,7 @@ { "feature": { "split_queues": { - "first-queue": { + "e2e-test-queue": { "queue_type": "SQS", "message_filter": { "wows": "so wows", diff --git a/tests/src/operator/queue_splitting.rs b/tests/src/operator/queue_splitting.rs index 3e90576034a..8baf8182f69 100644 --- a/tests/src/operator/queue_splitting.rs +++ b/tests/src/operator/queue_splitting.rs @@ -2,20 +2,24 @@ #![cfg(feature = "operator")] //! Test queue splitting features with an operator. -use std::time::Duration; +use std::collections::{BTreeMap, HashMap}; use aws_sdk_sqs::types::QueueAttributeName::{FifoQueue, MessageRetentionPeriod}; use k8s_openapi::api::core::v1::ConfigMap; use kube::{Api, Client}; -use reqwest::header::HeaderMap; +use mirrord_operator::crd::{ + MirrordQueueSplitter, MirrordQueueSplitterSpec, QueueConsumer, QueueNameSource, SplitQueue, +}; use rstest::*; +use tokio::join; use crate::utils::{ - config_dir, kube_client, send_request, service, Application, KubeService, ResourceGuard, - TEST_RESOURCE_LABEL, + config_dir, kube_client, service, service_with_env, Application, KubeService, ResourceGuard, }; const SQS_CONFIG_MAP_NAME: &str = "mirrord-e2e-test-sqs-splitting"; +const SQS_CONFIG_MAP_KEY_NAME: &str = "queue_name"; +const QUEUE_SPLITTER_RESOURCE_NAME: &str = "mirrord-e2e-test-queue-splitter"; #[fixture] fn sqs_queue_name() -> String { @@ -49,7 +53,10 @@ pub async fn create_config_map( let config_map_api: Api = Api::namespaced(kube_client.clone(), namespace); let config_map = ConfigMap { binary_data: None, - data: None, + data: Some(BTreeMap::from([( + SQS_CONFIG_MAP_KEY_NAME.to_string(), + sqs_queue_name, + )])), immutable: None, metadata: Default::default(), }; @@ -60,7 +67,38 @@ pub async fn create_config_map( true, ) .await - .expect("Could not create policy in E2E test.") + .expect("Could not create config map in E2E test.") +} + +/// Create the `MirrordQueueSplitter` K8s resource and a resource guard to delete it when done. +pub async fn create_splitter_resource( + kube_client: Client, + namespace: &str, + deployment_name: &str, +) -> ResourceGuard { + let qs_api: Api = Api::namespaced(kube_client.clone(), namespace); + let queue_splitter = MirrordQueueSplitter::new( + QUEUE_SPLITTER_RESOURCE_NAME, + MirrordQueueSplitterSpec { + queues: HashMap::from([( + "e2e-test-queue".to_string(), + SplitQueue::Sqs(QueueNameSource::ConfigMap { + name: SQS_CONFIG_MAP_NAME.to_string(), + queue_name_key: SQS_CONFIG_MAP_KEY_NAME.to_string(), + sub_key: None, + }), + )]), + consumer: QueueConsumer::Deployment(deployment_name.to_string()), + }, + ); + ResourceGuard::create( + qs_api, + QUEUE_SPLITTER_RESOURCE_NAME.to_string(), + &queue_splitter, + true, + ) + .await + .expect("Could not create queue splitter in E2E test.") } #[fixture] @@ -68,17 +106,28 @@ pub async fn sqs_consumer_service(#[future] kube_client: Client) -> KubeService // By using a different namespace for each test run, we're able to use a constant ConfigMap // name, and don't have to pass that information to the test service. let namespace = format!("e2e-tests-sqs-splitting-{}", crate::utils::random_string()); - let service = service( + service_with_env( &namespace, "ClusterIP", "ghcr.io/metalbear-co/mirrord-node-udp-logger:latest", // TODO - "udp-logger", - true, + "queue-forwarder", + false, kube_client, + serde_json::json!([ + // TODO + ]), ) - .await; + .await } +/// Send 6 messages to the original queue, such that 2 will reach each mirrord run and 2 the +/// deployed app. +async fn write_sqs_messages(queue_url: &str) { + // TODO +} + +/// This test creates a new sqs_queue with a random name and credentials from env. +/// /// Define a queue splitter for a deployment. Start two services that both consume from an SQS /// queue, send some messages to the queue, verify that each of the applications running with /// mirrord get exactly the messages they are supposed to, and that the deployed application gets @@ -91,7 +140,7 @@ pub async fn two_users_one_queue( #[future] sqs_consumer_service: KubeService, #[future] sqs_queue: (String, String), ) { - let application = Application::Go21HTTP; + let application = Application::Go21HTTP; // TODO let kube_client = kube_client.await; let (sqs_queue_name, sqs_queue_url) = sqs_queue.await; @@ -101,6 +150,10 @@ pub async fn two_users_one_queue( let _config_map_guard = create_config_map(kube_client.clone(), &deployed.namespace, sqs_queue_name); + // Create the `MirrordQueueSplitter` to be used in the client configurations. + let _splitter_resource_guard = + create_splitter_resource(kube_client.clone(), &deployed.namespace, &deployed.name).await; + let mut config_path = config_dir.clone(); config_path.push("sqs_queue_splitting_a.json"); @@ -125,11 +178,14 @@ pub async fn two_users_one_queue( ) .await; - // TODO: send messages to the original queue. + write_sqs_messages(&sqs_queue_url).await; - // TODO: wait here for both clients to exit and assert exit status 0 in each of them. - // the test application consumes messages and verifies exact expected messages. + // The test application consumes messages and verifies exact expected messages. + join!( + client_a.wait_assert_success(), + client_b.wait_assert_success() + ); // TODO: read output queue and verify that exactly the expected messages were - // consumed forwarded. + // consumed and forwarded. } diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 35cbc1c4f59..c2db408e1a1 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -30,7 +30,7 @@ use rand::{distributions::Alphanumeric, Rng}; use reqwest::{RequestBuilder, StatusCode}; use rstest::*; use serde::{de::DeserializeOwned, Serialize}; -use serde_json::json; +use serde_json::{json, Value}; use tempfile::{tempdir, TempDir}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufReader}, @@ -647,6 +647,25 @@ impl Drop for KubeService { } } +fn default_env() -> Value { + json!( + [ + { + "name": "MIRRORD_FAKE_VAR_FIRST", + "value": "mirrord.is.running" + }, + { + "name": "MIRRORD_FAKE_VAR_SECOND", + "value": "7777" + }, + { + "name": "MIRRORD_FAKE_VAR_THIRD", + "value": "foo=bar" + } + ] + ) +} + /// Create a new [`KubeService`] and related Kubernetes resources. The resources will be deleted /// when the returned service is dropped, unless it is dropped during panic. /// This behavior can be changed, see [`FORCE_CLEANUP_ENV_NAME`]. @@ -659,6 +678,62 @@ pub async fn service( #[default("http-echo")] service_name: &str, #[default(true)] randomize_name: bool, #[future] kube_client: Client, +) -> KubeService { + internal_service( + namespace, + service_type, + image, + service_name, + randomize_name, + kube_client.await, + default_env(), + ) +} + +/// Create a new [`KubeService`] and related Kubernetes resources. The resources will be deleted +/// when the returned service is dropped, unless it is dropped during panic. +/// This behavior can be changed, see [`FORCE_CLEANUP_ENV_NAME`]. +/// * `randomize_name` - whether a random suffix should be added to the end of the resource names +/// * `env` - `Value`, should be `Value::Array` of kubernetes container env var definitions. +pub async fn service_with_env( + namespace: &str, + service_type: &str, + image: &str, + service_name: &str, + randomize_name: bool, + kube_client: Client, + env: Value, +) -> KubeService { + internal_service( + namespace, + service_type, + image, + service_name, + randomize_name, + kube_client.await, + env, + ) +} + +/// Internal function to create a custom [`KubeService`]. +/// We keep this private so that whenever we need more customization of test resources, we can +/// change this function and how the public ones use it, and add a new public function that exposes +/// more customization, and we don't need to change all existing usages of public functions/fixtures +/// in tests. +/// +/// Create a new [`KubeService`] and related Kubernetes resources. The resources will be +/// deleted when the returned service is dropped, unless it is dropped during panic. +/// This behavior can be changed, see [`FORCE_CLEANUP_ENV_NAME`]. +/// * `randomize_name` - whether a random suffix should be added to the end of the resource names +/// * `env` - `Value`, should be `Value::Array` of kubernetes container env var definitions. +async fn internal_service( + namespace: &str, + service_type: &str, + image: &str, + service_name: &str, + randomize_name: bool, + kube_client: Client, + env: Value, ) -> KubeService { let delete_after_fail = std::env::var_os(PRESERVE_FAILED_ENV_NAME).is_none(); @@ -747,20 +822,7 @@ pub async fn service( "containerPort": 80 } ], - "env": [ - { - "name": "MIRRORD_FAKE_VAR_FIRST", - "value": "mirrord.is.running" - }, - { - "name": "MIRRORD_FAKE_VAR_SECOND", - "value": "7777" - }, - { - "name": "MIRRORD_FAKE_VAR_THIRD", - "value": "foo=bar" - } - ], + "env": env, } ] }