diff --git a/Cargo.lock b/Cargo.lock index 37d50b68046..4ac9c078dea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -535,6 +535,321 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "aws-config" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7af266887e24cd5f6d2ea7433cacd25dcd4773b7f70e488701968a7cdf51df57" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-sdk-sso", + "aws-sdk-ssooidc", + "aws-sdk-sts", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "hex", + "http 0.2.11", + "hyper 0.14.28", + "ring 0.17.7", + "time", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-credential-types" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d56f287a9e65e4914bfedb5b22c056b65e4c232fca512d5509a9df36386759f" +dependencies = [ + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "zeroize", +] + +[[package]] +name = "aws-runtime" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d6a29eca8ea8982028a4df81883e7001e250a21d323b86418884b5345950a4b" +dependencies = [ + "aws-credential-types", + "aws-sigv4", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand", + "http 0.2.11", + "http-body 0.4.6", + "percent-encoding", + "pin-project-lite", + "tracing", + "uuid", +] + +[[package]] +name = "aws-sdk-sqs" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a16637754ae72aba8d157ad559a75685f93e60e37e008d5197d809b74b37b2e6" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.11", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sso" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d7f527c7b28af1a641f7d89f9e6a4863e8ec00f39d2b731b056fc5ec5ce829" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.11", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-ssooidc" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d0be3224cd574ee8ab5fd7c32087876f25c134c27ac603fcb38669ed8d346b0" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "http 0.2.11", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sdk-sts" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b3167c60d82a13bbaef569da06041644ff41e85c6377e5dad53fa2526ccfe9d" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.11", + "once_cell", + "regex-lite", + "tracing", +] + +[[package]] +name = "aws-sigv4" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b1cbe0eee57a213039088dbdeca7be9352f24e0d72332d961e8a1cb388f82d" +dependencies = [ + "aws-credential-types", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "form_urlencoded", + "hex", + "hmac", + "http 0.2.11", + "http 1.0.0", + "once_cell", + "percent-encoding", + "sha2", + "time", + "tracing", +] + +[[package]] +name = "aws-smithy-async" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "426a5bc369ca7c8d3686439e46edc727f397a47ab3696b13f3ae8c81b3b36132" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "aws-smithy-http" +version = "0.60.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85d6a0619f7b67183067fa3b558f94f90753da2df8c04aeb7336d673f804b0b8" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.11", + "http-body 0.4.6", + "once_cell", + "percent-encoding", + "pin-project-lite", + "pin-utils", + "tracing", +] + +[[package]] +name = "aws-smithy-json" +version = "0.60.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1c1b5186b6f5c579bf0de1bcca9dd3d946d6d51361ea1d18131f6a0b64e13ae" +dependencies = [ + "aws-smithy-types", +] + +[[package]] +name = "aws-smithy-query" +version = "0.60.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c0a2ce65882e788d2cf83ff28b9b16918de0460c47bf66c5da4f6c17b4c9694" +dependencies = [ + "aws-smithy-types", + "urlencoding", +] + +[[package]] +name = "aws-smithy-runtime" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4cb6b3afa5fc9825a75675975dcc3e21764b5476bc91dbc63df4ea3d30a576e" +dependencies = [ + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-runtime-api", + "aws-smithy-types", + "bytes", + "fastrand", + "h2", + "http 0.2.11", + "http-body 0.4.6", + "hyper 0.14.28", + "hyper-rustls", + "once_cell", + "pin-project-lite", + "pin-utils", + "rustls 0.21.10", + "tokio", + "tracing", +] + +[[package]] +name = "aws-smithy-runtime-api" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23165433e80c04e8c09cee66d171292ae7234bae05fa9d5636e33095eae416b2" +dependencies = [ + "aws-smithy-async", + "aws-smithy-types", + "bytes", + "http 0.2.11", + "pin-project-lite", + "tokio", + "tracing", + "zeroize", +] + +[[package]] +name = "aws-smithy-types" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c94a5bec34850b92c9a054dad57b95c1d47f25125f55973e19f6ad788f0381ff" +dependencies = [ + "base64-simd", + "bytes", + "bytes-utils", + "futures-core", + "http 0.2.11", + "http-body 0.4.6", + "itoa", + "num-integer", + "pin-project-lite", + "pin-utils", + "ryu", + "serde", + "time", + "tokio", + "tokio-util", +] + +[[package]] +name = "aws-smithy-xml" +version = "0.60.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16f94c9673412b7a72e3c3efec8de89081c320bf59ea12eed34c417a62ad600" +dependencies = [ + "xmlparser", +] + +[[package]] +name = "aws-types" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ff7e122ee50ca962e9de91f5850cc37e2184b1219611eef6d44aa85929b54f6" +dependencies = [ + "aws-credential-types", + "aws-smithy-async", + "aws-smithy-runtime-api", + "aws-smithy-types", + "http 0.2.11", + "rustc_version", + "tracing", +] + [[package]] name = "axum" version = "0.6.20" @@ -640,6 +955,16 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "base64ct" version = "1.6.0" @@ -884,6 +1209,16 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "bytes-utils" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" +dependencies = [ + "bytes", + "either", +] + [[package]] name = "bytesize" version = "1.3.0" @@ -3516,7 +3851,7 @@ dependencies = [ "mirrord-protocol", "rand", "regex", - "rstest 0.17.0", + "rstest 0.18.2", "serde", "serde_json", "shellexpand", @@ -3565,7 +3900,7 @@ dependencies = [ "os_info", "rand", "regex", - "rstest 0.17.0", + "rstest 0.18.2", "serde", "serde_json", "socket2 0.5.5", @@ -3991,6 +4326,12 @@ dependencies = [ name = "outgoing" version = "3.93.1" +[[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + [[package]] name = "overload" version = "0.1.1" @@ -4791,6 +5132,12 @@ dependencies = [ "regex-syntax 0.8.2", ] +[[package]] +name = "regex-lite" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" + [[package]] name = "regex-syntax" version = "0.6.29" @@ -5882,6 +6229,8 @@ checksum = "8e7a7de15468c6e65dd7db81cf3822c1ec94c71b2a3c1a976ea8e4696c91115c" name = "tests" version = "0.1.0" dependencies = [ + "aws-config", + "aws-sdk-sqs", "bytes", "chrono", "const_format", @@ -6697,6 +7046,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "walkdir" version = "2.4.0" @@ -7174,6 +7529,12 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fcb9cbac069e033553e8bb871be2fbdffcab578eb25bd0f7c508cedc6dcd75a" +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "xz" version = "0.1.0" diff --git a/mirrord/cli/src/error.rs b/mirrord/cli/src/error.rs index 16235af039a..005133abbb7 100644 --- a/mirrord/cli/src/error.rs +++ b/mirrord/cli/src/error.rs @@ -330,7 +330,7 @@ impl From for CliError { feature, operator_version, } => Self::FeatureNotSupportedInOperatorError { - feature, + feature: feature.to_string(), operator_version, }, OperatorApiError::CreateApiError(e) => Self::KubernetesApiFailed(e), diff --git a/mirrord/cli/src/operator/session.rs b/mirrord/cli/src/operator/session.rs index 02cb12227f2..90c46313251 100644 --- a/mirrord/cli/src/operator/session.rs +++ b/mirrord/cli/src/operator/session.rs @@ -3,6 +3,7 @@ use mirrord_operator::{ client::{session_api, OperatorApiError, OperatorOperation}, crd::{MirrordOperatorCrd, SessionCrd, OPERATOR_STATUS_NAME}, }; +use mirrord_operator::crd::NewOperatorFeature; use mirrord_progress::{Progress, ProgressTracker}; use super::get_status_api; @@ -90,58 +91,58 @@ impl SessionCommandHandler { .await .map(|either| either.right()), } - .map_err(|kube_fail| match kube_fail { - // The random `reason` we get when the operator returns from a "missing route". - kube::Error::Api(ErrorResponse { code, reason, .. }) + .map_err(|kube_fail| match kube_fail { + // The random `reason` we get when the operator returns from a "missing route". + kube::Error::Api(ErrorResponse { code, reason, .. }) if code == 404 && reason.contains("parse") => - { - OperatorApiError::UnsupportedFeature { - feature: "session management".to_string(), - operator_version, + { + OperatorApiError::UnsupportedFeature { + feature: NewOperatorFeature::SessionManagement, + operator_version, + } + } + // Something actually went wrong. + other => OperatorApiError::KubeError { + error: other, + operation: OperatorOperation::SessionManagement, + }, + }) + // Finish the progress report here if we have an error response. + .inspect_err(|fail| { + sub_progress.failure(Some(&fail.to_string())); + progress.failure(Some("Session management operation failed!")); + })? + // The kube api interaction was successful, but we might still fail the operation + // itself, so let's check the `Status` and report. + .map(|status| { + if status.is_failure() { + sub_progress.failure(Some(&format!( + "`{command}` failed due to `{}` with code `{}`!", + status.message, status.code + ))); + progress.failure(Some("Session operation failed!")); + + Err(OperatorApiError::StatusFailure { + operation: command.to_string(), + status: Box::new(status), + }) + } else { + sub_progress.success(Some(&format!( + "`{command}` finished successfully with `{}` with code `{}`.", + status.message, status.code + ))); + progress.success(Some("Session operation is completed.")); + + Ok(()) } - } - // Something actually went wrong. - other => OperatorApiError::KubeError { - error: other, - operation: OperatorOperation::SessionManagement, - }, - }) - // Finish the progress report here if we have an error response. - .inspect_err(|fail| { - sub_progress.failure(Some(&fail.to_string())); - progress.failure(Some("Session management operation failed!")); - })? - // The kube api interaction was successful, but we might still fail the operation - // itself, so let's check the `Status` and report. - .map(|status| { - if status.is_failure() { - sub_progress.failure(Some(&format!( - "`{command}` failed due to `{}` with code `{}`!", - status.message, status.code - ))); - progress.failure(Some("Session operation failed!")); - - Err(OperatorApiError::StatusFailure { - operation: command.to_string(), - status: Box::new(status), - }) - } else { - sub_progress.success(Some(&format!( - "`{command}` finished successfully with `{}` with code `{}`.", - status.message, status.code - ))); - progress.success(Some("Session operation is completed.")); - - Ok(()) - } - }) - .transpose()? - // We might've gotten a `SessionCrd` instead of a `Status` (we have a `Left(T)`), - // meaning that the operation has started, but it might not be finished yet. - .unwrap_or_else(|| { - sub_progress.success(Some(&format!("No issues found when executing `{command}`, but the operation status could not be determined at this time."))); - progress.success(Some(&format!("`{command}` is done, but the operation might be pending."))); - }); + }) + .transpose()? + // We might've gotten a `SessionCrd` instead of a `Status` (we have a `Left(T)`), + // meaning that the operation has started, but it might not be finished yet. + .unwrap_or_else(|| { + sub_progress.success(Some(&format!("No issues found when executing `{command}`, but the operation status could not be determined at this time."))); + progress.success(Some(&format!("`{command}` is done, but the operation might be pending."))); + }); Ok(()) } diff --git a/mirrord/config/src/feature.rs b/mirrord/config/src/feature.rs index 31ce4318260..6c04dee9c2c 100644 --- a/mirrord/config/src/feature.rs +++ b/mirrord/config/src/feature.rs @@ -3,11 +3,13 @@ use mirrord_config_derive::MirrordConfig; use schemars::JsonSchema; use self::{copy_target::CopyTargetConfig, env::EnvConfig, fs::FsConfig, network::NetworkConfig}; +use crate::feature::split_queues::SplitQueuesConfig; pub mod copy_target; pub mod env; pub mod fs; pub mod network; +pub mod split_queues; /// Controls mirrord features. /// @@ -87,6 +89,13 @@ pub struct FeatureConfig { /// (`targetless` mode). #[config(nested)] pub copy_target: CopyTargetConfig, + + /// ## feature.split_queues {#feature-split_queues} + /// + /// Define filters to split queues by, and make your local application consume only messages + /// that match those filters. + #[config(nested)] + pub split_queues: SplitQueuesConfig, } impl CollectAnalytics for &FeatureConfig { @@ -95,5 +104,6 @@ impl CollectAnalytics for &FeatureConfig { analytics.add("fs", &self.fs); analytics.add("network", &self.network); analytics.add("copy_target", &self.copy_target); + analytics.add("split_queues", &self.split_queues); } } diff --git a/mirrord/config/src/feature/split_queues.rs b/mirrord/config/src/feature/split_queues.rs new file mode 100644 index 00000000000..aa1cd60fb5b --- /dev/null +++ b/mirrord/config/src/feature/split_queues.rs @@ -0,0 +1,94 @@ +use std::collections::HashMap; + +use mirrord_analytics::{Analytics, CollectAnalytics}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::config::{ConfigContext, FromMirrordConfig, MirrordConfig}; + +pub type QueueId = String; + +/// ```json +/// { +/// "feature": { +/// "split_queues": { +/// "first-queue": { +/// "queue_type": "SQS", +/// "message_filter": { +/// "wows": "so wows", +/// "coolz": "^very .*" +/// } +/// }, +/// "second-queue": { +/// "queue_type": "SomeFutureQueueType", +/// "message_filter": { +/// "wows": "so wows", +/// "coolz": "^very .*" +/// } +/// }, +/// } +/// } +/// } +/// ``` +#[derive(Clone, Debug, Eq, PartialEq, JsonSchema, Deserialize, Default)] +pub struct SplitQueuesConfig(Option>); + +impl SplitQueuesConfig { + pub fn is_set(&self) -> bool { + self.0.is_some() + } + + /// Out of the whole queue splitting config, get only the sqs queues. + pub fn get_sqs_filter(&self) -> Option>> { + self.0.as_ref().map(|queue_id2queue_filter| { + queue_id2queue_filter + .iter() + .filter_map(|(queue_id, queue_filter)| match queue_filter { + QueueFilter::Sqs(filter_mapping) => { + Some((queue_id.clone(), filter_mapping.clone())) + } + }) + .collect() + }) + } +} + +impl MirrordConfig for SplitQueuesConfig { + type Generated = Self; + + fn generate_config( + self, + _context: &mut ConfigContext, + ) -> crate::config::Result { + Ok(self) + } +} + +impl FromMirrordConfig for SplitQueuesConfig { + type Generator = Self; +} + +pub type MessageAttributeName = String; +pub type AttributeValuePattern = String; + +pub type SqsMessageFilter = HashMap; + +/// More queue types might be added in the future. +#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, JsonSchema)] +#[serde(tag = "queue_type", content = "message_filter")] +pub enum QueueFilter { + #[serde(rename = "SQS")] + Sqs(SqsMessageFilter), +} + +impl CollectAnalytics for &SplitQueuesConfig { + fn collect_analytics(&self, analytics: &mut Analytics) { + analytics.add( + "queue_count", + self.0 + .as_ref() + .map(|mapping| mapping.len()) + .unwrap_or_default(), + ) + } +} diff --git a/mirrord/config/src/lib.rs b/mirrord/config/src/lib.rs index 5d312ef4083..112a7445753 100644 --- a/mirrord/config/src/lib.rs +++ b/mirrord/config/src/lib.rs @@ -721,6 +721,7 @@ mod tests { })), })), copy_target: None, + split_queues: None, }), connect_tcp: None, operator: None, diff --git a/mirrord/config/src/target.rs b/mirrord/config/src/target.rs index 86d983be0b3..f9100ede631 100644 --- a/mirrord/config/src/target.rs +++ b/mirrord/config/src/target.rs @@ -227,6 +227,16 @@ impl Target { } } } + + /// Get the target type - "pod", "deployment", "rollout" or "targetless" + pub fn get_target_type(&self) -> &str { + match self { + Target::Targetless => "targetless", + Target::Pod(pod) => pod.target_type(), + Target::Deployment(dep) => dep.target_type(), + Target::Rollout(roll) => roll.target_type(), + } + } } trait TargetDisplay { diff --git a/mirrord/kube/src/api/kubernetes/rollout.rs b/mirrord/kube/src/api/kubernetes/rollout.rs index 5e29894756e..34f610fa0d9 100644 --- a/mirrord/kube/src/api/kubernetes/rollout.rs +++ b/mirrord/kube/src/api/kubernetes/rollout.rs @@ -4,9 +4,9 @@ use k8s_openapi::{ apimachinery::pkg::apis::meta::v1::ObjectMeta, ListableResource, Metadata, NamespaceResourceScope, Resource, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Rollout { metadata: ObjectMeta, pub spec: serde_json::Value, diff --git a/mirrord/operator/Cargo.toml b/mirrord/operator/Cargo.toml index 45bd5fb295d..04ca1771680 100644 --- a/mirrord/operator/Cargo.toml +++ b/mirrord/operator/Cargo.toml @@ -14,29 +14,29 @@ publish.workspace = true edition.workspace = true [features] -default = [] +default = ["crd"] license-fetch = ["dep:reqwest"] client = [ - "crd", - "dep:base64", - "dep:bincode", - "dep:http", - "dep:futures", - "dep:mirrord-analytics", - "dep:mirrord-auth", - "dep:mirrord-kube", - "dep:mirrord-progress", - "dep:mirrord-protocol", - "dep:rand", - "dep:tokio-tungstenite", - "dep:tracing", + "crd", + "dep:base64", + "dep:bincode", + "dep:http", + "dep:futures", + "dep:mirrord-analytics", + "dep:mirrord-auth", + "dep:mirrord-kube", + "dep:mirrord-progress", + "dep:mirrord-protocol", + "dep:rand", + "dep:tokio-tungstenite", + "dep:tracing", ] crd = [ - "dep:k8s-openapi", - "dep:kube", - "dep:mirrord-config", - "dep:tokio", - "dep:serde_json" + "dep:k8s-openapi", + "dep:kube", + "dep:mirrord-config", + "dep:tokio", + "dep:serde_json" ] setup = ["crd", "dep:serde_yaml"] @@ -52,7 +52,7 @@ mirrord-protocol = { path = "../protocol", optional = true } async-trait = "0.1" actix-codec = { workspace = true, optional = true } base64 = { version = "0.21", optional = true } -bincode = { version = "2.0.0-rc.2", features = ["serde"], optional = true } +bincode = { version = "2.0.0-rc.2", features = ["serde"], optional = true } bytes = { workspace = true, optional = true } chrono = { version = "0.4", features = ["clock", "serde"] } http = { version = "0.2", optional = true } diff --git a/mirrord/operator/src/client.rs b/mirrord/operator/src/client.rs index fbcbc1b5885..f6e8087892b 100644 --- a/mirrord/operator/src/client.rs +++ b/mirrord/operator/src/client.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, fmt::{self, Display}, io, }; @@ -34,7 +35,7 @@ use tokio_tungstenite::tungstenite::{Error as TungsteniteError, Message}; use tracing::{debug, error, info, warn}; use crate::crd::{ - CopyTargetCrd, CopyTargetSpec, MirrordOperatorCrd, OperatorFeatures, SessionCrd, TargetCrd, + CopyTargetCrd, CopyTargetSpec, MirrordOperatorCrd, NewOperatorFeature, SessionCrd, TargetCrd, OPERATOR_STATUS_NAME, }; @@ -90,13 +91,13 @@ pub enum OperatorApiError { #[error("mirrord operator {operator_version} does not support feature {feature}")] UnsupportedFeature { - feature: String, + feature: NewOperatorFeature, operator_version: String, }, #[error( - "Tried executing {operation}, but operator returned with `{}` and code `{}``!", - status.reason, status.code + "Tried executing {operation}, but operator returned with `{}` and code `{}``!", + status.reason, status.code )] StatusFailure { operation: String, @@ -111,18 +112,16 @@ pub struct OperatorSessionMetadata { client_certificate: Option, session_id: u64, fingerprint: Option, - operator_features: Vec, + operator_features: Vec, protocol_version: Option, - copy_pod_enabled: Option, } impl OperatorSessionMetadata { fn new( client_certificate: Option, fingerprint: Option, - operator_features: Vec, + operator_features: Vec, protocol_version: Option, - copy_pod_enabled: Option, ) -> Self { Self { client_certificate, @@ -130,7 +129,6 @@ impl OperatorSessionMetadata { fingerprint, operator_features, protocol_version, - copy_pod_enabled, } } @@ -156,7 +154,8 @@ impl OperatorSessionMetadata { } fn proxy_feature_enabled(&self) -> bool { - self.operator_features.contains(&OperatorFeatures::ProxyApi) + self.operator_features + .contains(&NewOperatorFeature::ProxyApi) } } @@ -206,12 +205,10 @@ impl OperatorApi { /// Checks used config against operator specification. fn check_config(config: &LayerConfig, operator: &MirrordOperatorCrd) -> Result<()> { - if config.feature.copy_target.enabled && !operator.spec.copy_target_enabled.unwrap_or(false) - { - return Err(OperatorApiError::UnsupportedFeature { - feature: "copy target".into(), - operator_version: operator.spec.operator_version.clone(), - }); + if config.feature.copy_target.enabled { + operator + .spec + .require_feature(NewOperatorFeature::CopyTarget)?; } Ok(()) @@ -246,8 +243,8 @@ impl OperatorApi { progress: &P, analytics: &mut R, ) -> Result - where - P: Progress + Send + Sync, + where + P: Progress + Send + Sync, { let operator_api = OperatorApi::new(config).await?; @@ -273,7 +270,7 @@ impl OperatorApi { }) .unwrap_or_else(|| "today".to_string()); - let expiring_message = format!("Operator license will expire {expiring_soon}!",); + let expiring_message = format!("Operator license will expire {expiring_soon}!", ); progress.warning(&expiring_message); warn!(expiring_message); @@ -292,15 +289,15 @@ impl OperatorApi { .await .ok() .flatten(); + let features = operator.spec.supported_features(); let metadata = OperatorSessionMetadata::new( client_certificate, operator.spec.license.fingerprint, - operator.spec.features.unwrap_or_default(), + features, operator .spec .protocol_version .and_then(|str_version| str_version.parse().ok()), - operator.spec.copy_target_enabled, ); metadata.set_operator_properties(analytics); @@ -313,10 +310,10 @@ impl OperatorApi { if operator_version > mirrord_version { // we make two sub tasks since it looks best this way version_progress.warning( - &format!( - "Your mirrord plugin/CLI version {} does not match the operator version {}. This can lead to unforeseen issues.", - mirrord_version, - operator_version)); + &format!( + "Your mirrord plugin/CLI version {} does not match the operator version {}. This can lead to unforeseen issues.", + mirrord_version, + operator_version)); version_progress.success(None); version_progress = progress.subtask("comparing versions"); version_progress.warning( @@ -325,13 +322,17 @@ impl OperatorApi { } version_progress.success(None); - let target_to_connect = if config.feature.copy_target.enabled { + let target_to_connect = if config.feature.copy_target.enabled + // use copy_target for splitting queues + || config.feature.split_queues.is_set() + { let mut copy_progress = progress.subtask("copying target"); let copied = operator_api .copy_target( &metadata, config.target.path.clone().unwrap_or(Target::Targetless), config.feature.copy_target.scale_down, + config.feature.split_queues.get_sqs_filter(), ) .await?; copy_progress.success(None); @@ -375,8 +376,8 @@ impl OperatorApi { config.kubeconfig.clone(), config.kube_context.clone(), ) - .await - .map_err(OperatorApiError::CreateApiError)?; + .await + .map_err(OperatorApiError::CreateApiError)?; let target_namespace = if target_config.path.is_some() { target_config.namespace.clone() @@ -492,9 +493,9 @@ impl OperatorApi { .target_api .get_subresource("port-locks", &target.name()) .await - else { - return Ok(()); - }; + else { + return Ok(()); + }; let no_port_locks = lock_target .spec @@ -585,6 +586,7 @@ impl OperatorApi { session_metadata: &OperatorSessionMetadata, target: Target, scale_down: bool, + sqs_filter: Option>>, ) -> Result { let name = TargetCrd::target_name(&target); @@ -594,6 +596,7 @@ impl OperatorApi { target, idle_ttl: Some(Self::COPIED_POD_IDLE_TTL), scale_down, + sqs_filter, }, ); @@ -629,12 +632,12 @@ pub struct ConnectionWrapper { } impl ConnectionWrapper -where - for<'stream> T: StreamExt> - + SinkExt - + Send - + Unpin - + 'stream, + where + for<'stream> T: StreamExt> + + SinkExt + + Send + + Unpin + + 'stream, { fn wrap( connection: T, diff --git a/mirrord/operator/src/crd.rs b/mirrord/operator/src/crd.rs index d9e33a7e74f..8c9bf18f1cc 100644 --- a/mirrord/operator/src/crd.rs +++ b/mirrord/operator/src/crd.rs @@ -1,8 +1,20 @@ +use std::{ + collections::HashMap, + fmt::{Display, Formatter}, +}; + use chrono::NaiveDate; +use k8s_openapi::api::core::v1::{PodSpec, PodTemplateSpec}; use kube::CustomResource; -use mirrord_config::target::{Target, TargetConfig}; +pub use mirrord_config::feature::split_queues::QueueId; +use mirrord_config::{ + feature::split_queues::SqsMessageFilter, + target::{Target, TargetConfig}, +}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +#[cfg(feature = "client")] +use crate::client::OperatorApiError; use self::label_selector::LabelSelector; @@ -12,11 +24,11 @@ pub const TARGETLESS_TARGET_NAME: &str = "targetless"; #[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)] #[kube( - group = "operator.metalbear.co", - version = "v1", - kind = "Target", - root = "TargetCrd", - namespaced +group = "operator.metalbear.co", +version = "v1", +kind = "Target", +root = "TargetCrd", +namespaced )] pub struct TargetSpec { /// None when targetless. @@ -80,19 +92,109 @@ pub static OPERATOR_STATUS_NAME: &str = "operator"; #[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)] #[kube( - group = "operator.metalbear.co", - version = "v1", - kind = "MirrordOperator", - root = "MirrordOperatorCrd", - status = "MirrordOperatorStatus" +group = "operator.metalbear.co", +version = "v1", +kind = "MirrordOperator", +root = "MirrordOperatorCrd", +status = "MirrordOperatorStatus" )] pub struct MirrordOperatorSpec { pub operator_version: String, pub default_namespace: String, - pub features: Option>, + /// Should be removed when we can stop supporting compatibility with versions from before the + /// `supported_features` field was added. + /// "Breaking" that compatibility by removing this field and then running with one old (from + /// before the `supported_features` field) side (client or operator) would make the client + /// think `ProxyApi` is not supported even if it is. + #[deprecated(note = "use supported_features instead")] + features: Option>, + /// Replaces both `features` and `copy_target_enabled`. Operator versions that use a version + /// of this code that has both this and the old fields are expected to populate this field with + /// the full set of features they support, and the old fields with their limited info they + /// support, for old clients. + /// + /// Access this info only via `supported_features()`. + /// Optional for backwards compatibility (new clients can talk to old operators that don't send + /// this field). + supported_features: Option>, pub license: LicenseInfoOwned, pub protocol_version: Option, - pub copy_target_enabled: Option, + /// Should be removed when we can stop supporting compatibility with versions from before the + /// `supported_features` field was added. + /// "Breaking" that compatibility by removing this field and then running with one old (from + /// before the `supported_features` field) side (client or operator) would make the client + /// think copy target is not enabled even if it is. + /// Optional for backwards compatibility (new clients can talk to old operators that don't send + /// this field). + #[deprecated(note = "use supported_features instead")] + copy_target_enabled: Option, +} + +impl MirrordOperatorSpec { + pub fn new( + operator_version: String, + default_namespace: String, + supported_features: Vec, + license: LicenseInfoOwned, + protocol_version: Option, + ) -> Self { + let features = supported_features + .contains(&NewOperatorFeature::ProxyApi) + .then(|| vec![OperatorFeatures::ProxyApi]); + let copy_target_enabled = + Some(supported_features.contains(&NewOperatorFeature::CopyTarget)); + #[allow(deprecated)] // deprecated objects must still be included in construction. + Self { + operator_version, + default_namespace, + supported_features: Some(supported_features), + license, + protocol_version, + features, + copy_target_enabled, + } + } + + /// Get a vector with the features the operator supports. + /// Handles objects sent from old and new operators. + // When the deprecated fields are removed, this can be changed to just return + // `self.supported_features.unwrap_or_default()`. + pub fn supported_features(&self) -> Vec { + self.supported_features + .clone() + // if supported_features was sent, just use that. If not we are dealing with an older + // operator, so we build a vector of new features from the old fields. + .or_else(|| { + // object was sent by an old operator that still uses fields that are now deprecated + #[allow(deprecated)] + self.features.as_ref().map(|features| { + features + .iter() + .map(From::from) + .chain( + self.copy_target_enabled.and_then(|enabled| { + enabled.then_some(NewOperatorFeature::CopyTarget) + }), + ) + .collect() + }) + }) + // Convert `None` to empty vector since we don't expect this to often be + // `None` (although it's ok if it is) and that way the return type is simpler. + .unwrap_or_default() + } + + #[cfg(feature = "client")] + pub fn require_feature(&self, feature: NewOperatorFeature) -> Result<(), OperatorApiError> { + if self.supported_features().contains(&feature) { + Ok(()) + } else { + Err(OperatorApiError::UnsupportedFeature { + feature, + operator_version: self.operator_version.clone(), + }) + } + } } #[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)] @@ -133,10 +235,10 @@ pub struct Session { /// the operator. #[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)] #[kube( - group = "operator.metalbear.co", - version = "v1", - kind = "Session", - root = "SessionCrd" +group = "operator.metalbear.co", +version = "v1", +kind = "Session", +root = "SessionCrd" )] pub struct SessionSpec; @@ -151,20 +253,59 @@ pub struct LicenseInfoOwned { pub subscription_id: Option, } +/// Since this enum does not have a variant marked with `#[serde(other)]`, and is present like that +/// in released clients, existing clients would fail to parse any new variant. This means the +/// operator can never send anything but the one existing variant, otherwise the client will error +/// out. #[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] pub enum OperatorFeatures { ProxyApi, + // DON'T ADD VARIANTS - old clients won't be able to deserialize them. + // Add new features in NewOperatorFeature +} + +#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +pub enum NewOperatorFeature { + ProxyApi, + CopyTarget, + Sqs, + SessionManagement, + /// This variant is what a client sees when the operator includes a feature the client is not + /// yet aware of, because it was introduced in a version newer than the client's. + #[serde(other)] + FeatureFromTheFuture, +} + +impl Display for NewOperatorFeature { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let name = match self { + NewOperatorFeature::ProxyApi => "proxy API", + NewOperatorFeature::CopyTarget => "copy target", + NewOperatorFeature::Sqs => "SQS queue splitting", + NewOperatorFeature::FeatureFromTheFuture => "unknown feature", + NewOperatorFeature::SessionManagement => "session management", + }; + f.write_str(name) + } +} + +impl From<&OperatorFeatures> for NewOperatorFeature { + fn from(old_feature: &OperatorFeatures) -> Self { + match old_feature { + OperatorFeatures::ProxyApi => NewOperatorFeature::ProxyApi, + } + } } /// This [`Resource`](kube::Resource) represents a copy pod created from an existing [`Target`] /// (operator's copy pod feature). #[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)] #[kube( - group = "operator.metalbear.co", - version = "v1", - kind = "CopyTarget", - root = "CopyTargetCrd", - namespaced +group = "operator.metalbear.co", +version = "v1", +kind = "CopyTarget", +root = "CopyTargetCrd", +namespaced )] pub struct CopyTargetSpec { /// Original target. Only [`Target::Pod`] and [`Target::Deployment`] are accepted. @@ -175,6 +316,8 @@ pub struct CopyTargetSpec { /// Should the operator scale down target deployment to 0 while this pod is alive. /// Ignored if [`Target`] is not [`Target::Deployment`]. pub scale_down: bool, + /// queue id -> (attribute name -> regex) + pub sqs_filter: Option>, } /// Features and operations that can be blocked by a `MirrordPolicy`. @@ -191,11 +334,11 @@ pub enum BlockedFeature { /// Custom resource for policies that limit what mirrord features users can use. #[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)] #[kube( - // The operator group is handled by the operator, we want policies to be handled by k8s. - group = "policies.mirrord.metalbear.co", - version = "v1alpha", - kind = "MirrordPolicy", - namespaced +// The operator group is handled by the operator, we want policies to be handled by k8s. +group = "policies.mirrord.metalbear.co", +version = "v1alpha", +kind = "MirrordPolicy", +namespaced )] #[serde(rename_all = "camelCase")] // target_path -> targetPath in yaml. pub struct MirrordPolicySpec { @@ -213,3 +356,189 @@ pub struct MirrordPolicySpec { /// List of features and operations blocked by this policy. pub block: Vec, } + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] // queue_name_key -> queueNameKey in yaml. +pub struct ConfigMapQueueNameSource { + /// The name of the config map that holds the name of the queue we want to split. + pub name: String, + + /// The name of the key in the config map that holds the name of the queue we want to + /// split. + pub queue_name_key: String, +} + +/// Set where the application reads the name of the queue from, so that mirrord can find that queue, +/// split it, and temporarily change the name there to the name of the branch queue when splitting. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] // ConfigMap -> configMap in yaml. +pub enum QueueNameSource { + ConfigMap(ConfigMapQueueNameSource), + EnvVar(String), +} + +pub type OutputQueueName = String; + +/// The details of a queue that should be split. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +#[serde(tag = "queueType")] +// So that controllers that only handle 1 type of queue don't have to adapt when we add more queue types. +#[non_exhaustive] +pub enum SplitQueue { + /// Amazon SQS + /// + /// Where the application gets the queue name from. Will be used to read messages from that + /// queue and distribute them to the output queues. When running with mirrord and splitting + /// this queue, applications will get a modified name from that source. + #[serde(rename = "SQS")] + Sqs(QueueNameSource), +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] // Deployment -> deployment in yaml. +pub enum QueueConsumer { + Deployment(String), + Rollout(String), +} + +impl QueueConsumer { + pub fn get_type_and_name(&self) -> (&str, &str) { + match self { + QueueConsumer::Deployment(dep) => ("deployment", dep), + QueueConsumer::Rollout(roll) => ("rollout", roll) + } + } +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)] +pub struct Filtering { + pub original_spec: PodTemplateSpec, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)] +pub struct QueueNameUpdate { + pub original_name: String, + pub output_name: String, +} + + +/// Details retrieved from K8s resources once the splitter is active, used on filter session +/// creation to create the altered config maps and pod specs that make the application use the +/// output queues instead of the original. +// This status struct is not optimal in that it contains redundant information. This makes the +// controller's code a bit simpler. +// The information on config map updates and on env vars is present in the resource itself, but it +// is organized differently. +#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)] +pub struct QueueDetails { + /// For each queue_id, the actual queue name as retrieved from the target's pod spec or config map. + pub queue_names: HashMap, + + /// Names of env vars that contain the queue name directly in the pod spec, without config + /// map refs, mapped to their queue id. + pub direct_env_vars: HashMap, + + /// For each config map name, a mapping from queue id to key name in the map that holds the name + /// of that queue. + pub config_map_updates: HashMap>, + // ^ ^ ^ + // | | ---- name of key that points to Q name + // | ---- queue id + // ---- ConfigMap name + + // TODO: Don't save whole PodSpec, because its schema is so long you can't create the CRD with + // `kubectl apply` due to a length limit. + /// The original PodSpec of the queue consumer. + pub original_spec: PodSpec, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)] +#[serde(rename_all = "camelCase")] // active_filters -> activeFilters +pub struct QueueSplitterStatus { + pub active_filters: Option, +} + +impl QueueSplitterStatus { + pub fn is_active(&self) -> bool { + self.active_filters.is_some() + } + + pub fn is_inactive(&self) -> bool { + !self.is_active() + } +} + +/// Defines a Custom Resource that holds a central configuration for splitting a queue. mirrord +/// users specify a splitter by name in their configuration. mirrord then starts splitting according +/// to the spec and the user's filter. +#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[kube( +group = "splitters.mirrord.metalbear.co", +version = "v1alpha", +kind = "MirrordQueueSplitter", +shortname = "qs", +status = "QueueSplitterStatus", +namespaced +)] +pub struct MirrordQueueSplitterSpec { + /// A map of the queues that should be split. + /// The key is used by users to associate filters to the right queues. + pub queues: HashMap, + + /// The resource (deployment or Argo rollout) that reads from the queues. + pub consumer: QueueConsumer, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)] +#[serde(rename = "SQSSessionDetails", rename_all = "camelCase")] +pub struct SqsSessionDetails { + // TODO: Don't save whole PodSpec, because its schema is so long you can't create the CRD with + // `kubectl apply` due to a length limit. + pub spec: PodSpec, + pub queue_names: HashMap, +} + +#[derive(Clone, Debug, Default, Deserialize, Serialize, JsonSchema)] +#[serde(rename = "SQSSessionStatus", rename_all = "camelCase")] +pub struct SqsSessionStatus { + pub details: Option, +} + +impl SqsSessionStatus { + pub fn is_ready(&self) -> bool { + self.details.is_some() + } +} + + +/// The [`kube::runtime::wait::Condition`] trait is auto-implemented for this function. +/// To be used in [`kube::runtime::wait::await_condition`]. +pub fn is_session_ready(session: Option<&MirrordSqsSession>) -> bool { + session + .and_then(|session| session.status.as_ref()) + .map(|status| status.is_ready()) + .unwrap_or_default() +} + +// TODO: docs +// TODO: Clients actually never use this resource in any way directly, so maybe we could define it +// in the operator on startup? The operator would probably need permissions to create CRDs and to +// give itself permissions for those CRDs? Is that possible? Right now it's the user that installs +// the operator that defines this CRD and also give the operator permissions to do stuff with it. +#[derive(CustomResource, Clone, Debug, Deserialize, Serialize, JsonSchema)] +#[kube( +group = "splitters.mirrord.metalbear.co", +version = "v1alpha", +kind = "MirrordSQSSession", +root = "MirrordSqsSession", // for Rust naming conventions (Sqs, not SQS) +status = "SqsSessionStatus", +namespaced +)] +#[serde(rename_all = "camelCase")] // queue_filters -> queueFilters +pub struct MirrordSqsSessionSpec { + pub queue_filters: HashMap, + pub queue_consumer: QueueConsumer, + // The Kubernetes API can't deal with 64 bit numbers (with most significant bit set) + // so we save that field as a string even though its source is a u64 + pub session_id: String, +} diff --git a/mirrord/operator/src/setup.rs b/mirrord/operator/src/setup.rs index 7afbcfb7b71..5c98c43ee56 100644 --- a/mirrord/operator/src/setup.rs +++ b/mirrord/operator/src/setup.rs @@ -23,7 +23,7 @@ use k8s_openapi::{ use kube::{CustomResourceExt, Resource}; use thiserror::Error; -use crate::crd::{MirrordPolicy, TargetCrd}; +use crate::crd::{MirrordPolicy, MirrordQueueSplitter, MirrordSqsSession, TargetCrd}; static OPERATOR_NAME: &str = "mirrord-operator"; static OPERATOR_PORT: i32 = 3000; @@ -177,6 +177,12 @@ impl OperatorSetup for Operator { writer.write_all(b"---\n")?; MirrordPolicy::crd().to_writer(&mut writer)?; + writer.write_all(b"---\n")?; + MirrordQueueSplitter::crd().to_writer(&mut writer)?; + + writer.write_all(b"---\n")?; + MirrordSqsSession::crd().to_writer(&mut writer)?; + Ok(()) } } @@ -412,6 +418,28 @@ impl OperatorRole { verbs: vec!["get".to_owned(), "list".to_owned(), "watch".to_owned()], ..Default::default() }, + // For SQS controller to temporarily change deployments to use changed queues. + PolicyRule { + api_groups: Some(vec![ + "apps".to_owned(), + ]), + resources: Some(vec![ + "deployments".to_owned(), + ]), + verbs: vec!["patch".to_owned()], + ..Default::default() + }, + // For SQS controller to temporarily change Argo Rollouts to use changed queues. + PolicyRule { + api_groups: Some(vec![ + "argoproj.io".to_owned(), + ]), + resources: Some(vec![ + "rollouts".to_owned(), + ]), + verbs: vec!["patch".to_owned()], + ..Default::default() + }, PolicyRule { api_groups: Some(vec!["apps".to_owned(), "argoproj.io".to_owned()]), resources: Some(vec![ @@ -450,13 +478,56 @@ impl OperatorRole { verbs: vec!["impersonate".to_owned()], ..Default::default() }, - // Allow the operator to list mirrord policies. + // Allow the operator to list+get mirrord policies. PolicyRule { api_groups: Some(vec!["policies.mirrord.metalbear.co".to_owned()]), resources: Some(vec![MirrordPolicy::plural(&()).to_string()]), verbs: vec!["list".to_owned(), "get".to_owned()], ..Default::default() }, + // Allow the operator to list mirrord queue splitters. + PolicyRule { + api_groups: Some(vec!["splitters.mirrord.metalbear.co".to_owned()]), + resources: Some(vec![MirrordQueueSplitter::plural(&()).to_string()]), + verbs: vec![ + "list".to_owned(), + ], + ..Default::default() + }, + // Allow the SQS controller to update queue splitter status. + PolicyRule { + api_groups: Some(vec!["splitters.mirrord.metalbear.co".to_owned()]), + resources: Some(vec!["mirrordqueuesplitters/status".to_string()]), + verbs: vec![ + // For setting the status in the SQS controller. + "update".to_owned(), + ], + ..Default::default() + }, + // Allow the operator to control mirrord queue filters. + PolicyRule { + api_groups: Some(vec!["splitters.mirrord.metalbear.co".to_owned()]), + resources: Some(vec![MirrordSqsSession::plural(&()).to_string()]), + verbs: vec![ + "create".to_owned(), + "watch".to_owned(), + "list".to_owned(), + "get".to_owned(), + "delete".to_owned(), + "patch".to_owned(), + ], + ..Default::default() + }, + // Allow the SQS controller to update queue splitter status. + PolicyRule { + api_groups: Some(vec!["splitters.mirrord.metalbear.co".to_owned()]), + resources: Some(vec!["mirrordsqssessions/status".to_string()]), + verbs: vec![ + // For setting the status in the SQS controller. + "update".to_owned(), + ], + ..Default::default() + }, ]), ..Default::default() }; diff --git a/tests/Cargo.toml b/tests/Cargo.toml index b8b63b9b653..06751641ab8 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -7,6 +7,8 @@ edition = "2021" doctest = false [dependencies] +aws-config = { version = "1.1.2", optional = true } +aws-sdk-sqs = { version = "1.10.0", features = ["behavior-version-latest"], optional = true } k8s-openapi.workspace = true kube.workspace = true reqwest.workspace = true @@ -36,7 +38,7 @@ const_format = "0.2" default = ["ephemeral", "job", "cli", "targetless"] ephemeral = [] job = [] -operator = [] +operator = ["dep:aws-config", "dep:aws-sdk-sqs"] docker = [] cli = [] targetless = [] diff --git a/tests/configs/sqs_queue_splitting_a.json b/tests/configs/sqs_queue_splitting_a.json new file mode 100644 index 00000000000..40658ddaf3e --- /dev/null +++ b/tests/configs/sqs_queue_splitting_a.json @@ -0,0 +1,13 @@ +{ + "feature": { + "split_queues": { + "e2e-test-queue": { + "queue_type": "SQS", + "message_filter": { + "wows": "so wows", + "coolz": "^very .*" + } + } + } + } +} \ No newline at end of file diff --git a/tests/src/operator.rs b/tests/src/operator.rs index bcccea48431..fbae776ec9f 100644 --- a/tests/src/operator.rs +++ b/tests/src/operator.rs @@ -2,4 +2,5 @@ mod concurrent_steal; mod policies; +mod queue_splitting; mod setup; diff --git a/tests/src/operator/queue_splitting.rs b/tests/src/operator/queue_splitting.rs new file mode 100644 index 00000000000..8baf8182f69 --- /dev/null +++ b/tests/src/operator/queue_splitting.rs @@ -0,0 +1,191 @@ +#![cfg(test)] +#![cfg(feature = "operator")] +//! Test queue splitting features with an operator. + +use std::collections::{BTreeMap, HashMap}; + +use aws_sdk_sqs::types::QueueAttributeName::{FifoQueue, MessageRetentionPeriod}; +use k8s_openapi::api::core::v1::ConfigMap; +use kube::{Api, Client}; +use mirrord_operator::crd::{ + MirrordQueueSplitter, MirrordQueueSplitterSpec, QueueConsumer, QueueNameSource, SplitQueue, +}; +use rstest::*; +use tokio::join; + +use crate::utils::{ + config_dir, kube_client, service, service_with_env, Application, KubeService, ResourceGuard, +}; + +const SQS_CONFIG_MAP_NAME: &str = "mirrord-e2e-test-sqs-splitting"; +const SQS_CONFIG_MAP_KEY_NAME: &str = "queue_name"; +const QUEUE_SPLITTER_RESOURCE_NAME: &str = "mirrord-e2e-test-queue-splitter"; + +#[fixture] +fn sqs_queue_name() -> String { + format!( + "MirrordE2ESplitterTests-{}.fifo", + crate::utils::random_string() + ) +} + +/// Create a new SQS fifo queue with a randomized name, return queue name and url. +#[fixture] +async fn sqs_queue(sqs_queue_name: String) -> (String, String) { + let shared_config = aws_config::load_from_env().await; + let client = aws_sdk_sqs::Client::new(&shared_config); + let queue = client + .create_queue() + .queue_name(sqs_queue_name.clone()) + .attributes(MessageRetentionPeriod, "3600") // delete messages after an hour. + .attributes(FifoQueue, "true") // Fifo for predictable test scenarios + .send() + .await + .unwrap(); + (sqs_queue_name, queue.queue_url.unwrap()) +} + +pub async fn create_config_map( + kube_client: Client, + namespace: &str, + sqs_queue_name: String, +) -> ResourceGuard { + let config_map_api: Api = Api::namespaced(kube_client.clone(), namespace); + let config_map = ConfigMap { + binary_data: None, + data: Some(BTreeMap::from([( + SQS_CONFIG_MAP_KEY_NAME.to_string(), + sqs_queue_name, + )])), + immutable: None, + metadata: Default::default(), + }; + ResourceGuard::create( + config_map_api, + SQS_CONFIG_MAP_NAME.to_string(), + &config_map, + true, + ) + .await + .expect("Could not create config map in E2E test.") +} + +/// Create the `MirrordQueueSplitter` K8s resource and a resource guard to delete it when done. +pub async fn create_splitter_resource( + kube_client: Client, + namespace: &str, + deployment_name: &str, +) -> ResourceGuard { + let qs_api: Api = Api::namespaced(kube_client.clone(), namespace); + let queue_splitter = MirrordQueueSplitter::new( + QUEUE_SPLITTER_RESOURCE_NAME, + MirrordQueueSplitterSpec { + queues: HashMap::from([( + "e2e-test-queue".to_string(), + SplitQueue::Sqs(QueueNameSource::ConfigMap { + name: SQS_CONFIG_MAP_NAME.to_string(), + queue_name_key: SQS_CONFIG_MAP_KEY_NAME.to_string(), + sub_key: None, + }), + )]), + consumer: QueueConsumer::Deployment(deployment_name.to_string()), + }, + ); + ResourceGuard::create( + qs_api, + QUEUE_SPLITTER_RESOURCE_NAME.to_string(), + &queue_splitter, + true, + ) + .await + .expect("Could not create queue splitter in E2E test.") +} + +#[fixture] +pub async fn sqs_consumer_service(#[future] kube_client: Client) -> KubeService { + // By using a different namespace for each test run, we're able to use a constant ConfigMap + // name, and don't have to pass that information to the test service. + let namespace = format!("e2e-tests-sqs-splitting-{}", crate::utils::random_string()); + service_with_env( + &namespace, + "ClusterIP", + "ghcr.io/metalbear-co/mirrord-node-udp-logger:latest", // TODO + "queue-forwarder", + false, + kube_client, + serde_json::json!([ + // TODO + ]), + ) + .await +} + +/// Send 6 messages to the original queue, such that 2 will reach each mirrord run and 2 the +/// deployed app. +async fn write_sqs_messages(queue_url: &str) { + // TODO +} + +/// This test creates a new sqs_queue with a random name and credentials from env. +/// +/// Define a queue splitter for a deployment. Start two services that both consume from an SQS +/// queue, send some messages to the queue, verify that each of the applications running with +/// mirrord get exactly the messages they are supposed to, and that the deployed application gets +/// the rest. +#[rstest] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +pub async fn two_users_one_queue( + config_dir: &std::path::PathBuf, + #[future] kube_client: Client, + #[future] sqs_consumer_service: KubeService, + #[future] sqs_queue: (String, String), +) { + let application = Application::Go21HTTP; // TODO + let kube_client = kube_client.await; + let (sqs_queue_name, sqs_queue_url) = sqs_queue.await; + + let deployed = sqs_consumer_service.await; + + // Create the config map that the remote deployment uses as the source of the queue's name. + let _config_map_guard = + create_config_map(kube_client.clone(), &deployed.namespace, sqs_queue_name); + + // Create the `MirrordQueueSplitter` to be used in the client configurations. + let _splitter_resource_guard = + create_splitter_resource(kube_client.clone(), &deployed.namespace, &deployed.name).await; + + let mut config_path = config_dir.clone(); + config_path.push("sqs_queue_splitting_a.json"); + + let mut client_a = application + .run( + &deployed.target, // TODO: target the deployment maybe? + Some(&deployed.namespace), + None, + Some(vec![("MIRRORD_CONFIG_FILE", config_path.to_str().unwrap())]), + ) + .await; + + let mut config_path = config_dir.clone(); + config_path.push("sqs_queue_splitting_b.json"); + + let mut client_b = application + .run( + &deployed.target, // TODO: target the deployment maybe? + Some(&deployed.namespace), + None, + Some(vec![("MIRRORD_CONFIG_FILE", config_path.to_str().unwrap())]), + ) + .await; + + write_sqs_messages(&sqs_queue_url).await; + + // The test application consumes messages and verifies exact expected messages. + join!( + client_a.wait_assert_success(), + client_b.wait_assert_success() + ); + + // TODO: read output queue and verify that exactly the expected messages were + // consumed and forwarded. +} diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 0ef37e2e801..dc91aeabc74 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -30,7 +30,7 @@ use rand::{distributions::Alphanumeric, Rng}; use reqwest::{RequestBuilder, StatusCode}; use rstest::*; use serde::{de::DeserializeOwned, Serialize}; -use serde_json::json; +use serde_json::{json, Value}; use tempfile::{tempdir, TempDir}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufReader}, @@ -656,6 +656,25 @@ impl Drop for KubeService { } } +fn default_env() -> Value { + json!( + [ + { + "name": "MIRRORD_FAKE_VAR_FIRST", + "value": "mirrord.is.running" + }, + { + "name": "MIRRORD_FAKE_VAR_SECOND", + "value": "7777" + }, + { + "name": "MIRRORD_FAKE_VAR_THIRD", + "value": "foo=bar" + } + ] + ) +} + /// Create a new [`KubeService`] and related Kubernetes resources. The resources will be deleted /// when the returned service is dropped, unless it is dropped during panic. /// This behavior can be changed, see [`FORCE_CLEANUP_ENV_NAME`]. @@ -668,6 +687,62 @@ pub async fn service( #[default("http-echo")] service_name: &str, #[default(true)] randomize_name: bool, #[future] kube_client: Client, +) -> KubeService { + internal_service( + namespace, + service_type, + image, + service_name, + randomize_name, + kube_client.await, + default_env(), + ) +} + +/// Create a new [`KubeService`] and related Kubernetes resources. The resources will be deleted +/// when the returned service is dropped, unless it is dropped during panic. +/// This behavior can be changed, see [`FORCE_CLEANUP_ENV_NAME`]. +/// * `randomize_name` - whether a random suffix should be added to the end of the resource names +/// * `env` - `Value`, should be `Value::Array` of kubernetes container env var definitions. +pub async fn service_with_env( + namespace: &str, + service_type: &str, + image: &str, + service_name: &str, + randomize_name: bool, + kube_client: Client, + env: Value, +) -> KubeService { + internal_service( + namespace, + service_type, + image, + service_name, + randomize_name, + kube_client.await, + env, + ) +} + +/// Internal function to create a custom [`KubeService`]. +/// We keep this private so that whenever we need more customization of test resources, we can +/// change this function and how the public ones use it, and add a new public function that exposes +/// more customization, and we don't need to change all existing usages of public functions/fixtures +/// in tests. +/// +/// Create a new [`KubeService`] and related Kubernetes resources. The resources will be +/// deleted when the returned service is dropped, unless it is dropped during panic. +/// This behavior can be changed, see [`FORCE_CLEANUP_ENV_NAME`]. +/// * `randomize_name` - whether a random suffix should be added to the end of the resource names +/// * `env` - `Value`, should be `Value::Array` of kubernetes container env var definitions. +async fn internal_service( + namespace: &str, + service_type: &str, + image: &str, + service_name: &str, + randomize_name: bool, + kube_client: Client, + env: Value, ) -> KubeService { let delete_after_fail = std::env::var_os(PRESERVE_FAILED_ENV_NAME).is_none(); @@ -758,20 +833,7 @@ pub async fn service( "containerPort": 80 } ], - "env": [ - { - "name": "MIRRORD_FAKE_VAR_FIRST", - "value": "mirrord.is.running" - }, - { - "name": "MIRRORD_FAKE_VAR_SECOND", - "value": "7777" - }, - { - "name": "MIRRORD_FAKE_VAR_THIRD", - "value": "foo=bar" - } - ], + "env": env, } ] }