Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue Splitting Feature #2173

Merged
merged 96 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 92 commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
73e6789
Splitter CRD
t4lz Jan 11, 2024
b3fcadd
working env
t4lz May 7, 2024
df32b32
deletecollection for sqs sessions
t4lz May 14, 2024
d07bc08
change SqsMessageFilter to BTreeMap for iterating.
t4lz May 15, 2024
cb91e2d
change queue_names to BTreeMap for iterating
t4lz May 15, 2024
4147d22
change config_map_updates to BTreeMap for iterating
t4lz May 15, 2024
d17d15a
change queue_names to BTreeMap for iterating
t4lz May 15, 2024
186867a
change queues to BTreeMap for iterating
t4lz May 15, 2024
0cc6d96
get output queue names from splitter status
t4lz May 21, 2024
fdc7ced
splitter status
t4lz May 21, 2024
2578ddd
todo
t4lz May 22, 2024
37795f5
set -> vec
t4lz May 22, 2024
b1524f7
tags
t4lz May 25, 2024
ba17e1c
docs
t4lz May 27, 2024
0a52ca5
remove active_sessions from splitter status
t4lz May 31, 2024
d213e4f
MirrordQueueSplitter -> MirrordWorkloadQueueRegistry, splitters. -> q…
t4lz May 31, 2024
fffd267
Merge branch 'main' into sqs
t4lz Jun 4, 2024
44fd364
merge fix
t4lz Jun 4, 2024
80ee8ff
Merge branch 'main' into sqs
t4lz Jun 12, 2024
da74d6d
lock
t4lz Jun 12, 2024
ca997c9
Merge branch 'main' into sqs
t4lz Jun 20, 2024
ab38356
mergefix
t4lz Jun 20, 2024
b58bb85
remove pod spec from session status
t4lz Jun 24, 2024
c2d412f
remove podspec from registry crd status
t4lz Jun 25, 2024
dff91db
save only updates instead of containers
t4lz Jun 25, 2024
ddbab39
Merge branch 'main' into sqs
t4lz Jun 26, 2024
951acc7
docs
t4lz Jun 26, 2024
6ad3a66
changelog
t4lz Jun 26, 2024
4a0a682
remove tests (moved to sqs-tests branch)
t4lz Jun 26, 2024
192a749
docs and improvements
t4lz Jun 26, 2024
86f532a
cargo fmt
t4lz Jun 26, 2024
8e3f08d
mirrord-schema.json
t4lz Jun 26, 2024
01a5b7a
Merge branch 'main' into sqs
t4lz Jun 27, 2024
cfd608e
run medschool
t4lz Jun 27, 2024
8fd8e62
Merge branch 'main' into sqs
t4lz Jul 4, 2024
4c420e9
Merge branch 'main' into sqs
t4lz Jul 4, 2024
cdd691c
fmt
t4lz Jul 4, 2024
c8884c9
oops
t4lz Jul 9, 2024
24efd56
Merge branch 'main' into sqs
t4lz Jul 9, 2024
ed3d13f
merge
t4lz Jul 11, 2024
03caf46
Merge branch 'main' into sqs
t4lz Jul 14, 2024
20ff89d
display Q consumer
t4lz Jul 15, 2024
88f87cb
fix merge
t4lz Jul 15, 2024
84c05e4
fix feature use
t4lz Jul 15, 2024
4ea2282
rename sqs details and make optional
t4lz Jul 18, 2024
4f003e5
rename QueueDetails -> ActiveSqsSplits
t4lz Jul 18, 2024
770e225
move output_q_names to ActiveSqsSplits
t4lz Jul 18, 2024
17afeea
Merge branch 'main' into sqs
t4lz Jul 18, 2024
ce09b3a
fmt
t4lz Jul 18, 2024
8eaeebd
CR: lambda var name
t4lz Jul 26, 2024
b621750
move method call to own .map call
t4lz Jul 26, 2024
e88bfc2
CR: remove match arm for future variants.
t4lz Jul 26, 2024
7551467
CR: document SqsMessageFilter
t4lz Jul 26, 2024
e4ac7d4
CR: remove get_target_type
t4lz Jul 26, 2024
f642417
CR: Operator feature variant names.
t4lz Jul 30, 2024
53a2188
CR: unused type
t4lz Jul 30, 2024
9a56c85
CR: docs: "pod spec" -> "pod template"
t4lz Jul 31, 2024
87a7af1
CR: remove dead configmaps code
t4lz Aug 2, 2024
643bf23
CR: make session status enum
t4lz Aug 5, 2024
cc33089
CR: remove non_exauhstive
t4lz Aug 5, 2024
ec5809e
status enum stuff
t4lz Aug 6, 2024
05a4e2b
CR: sqs session error - code + reason
t4lz Aug 6, 2024
a0118d1
just need to push a commit
t4lz Aug 6, 2024
b9378d5
pub sqs details
t4lz Aug 6, 2024
23d4e06
pub sqs details
t4lz Aug 6, 2024
d2f7cde
impl Display for SqsSessionError
t4lz Aug 6, 2024
abe7f07
schema
t4lz Aug 7, 2024
82d1254
CR: add container to queue consumer
t4lz Aug 7, 2024
3e6bbe9
QueueConsumer registry matching
t4lz Aug 7, 2024
254a47f
QueueConsumer container getter
t4lz Aug 7, 2024
11d388a
Add RegisteringFilters Status variant
t4lz Aug 7, 2024
ce5c5e7
get_split_details
t4lz Aug 7, 2024
0ec5115
CR: registry docs
t4lz Aug 7, 2024
ad9b009
CR: doc text formatting suggestion
t4lz Aug 7, 2024
56606f5
CR: individual tags per queue
t4lz Aug 7, 2024
a86fc3e
missing pub
t4lz Aug 7, 2024
aae011f
mark unstable
t4lz Aug 7, 2024
8dcccfa
CR: CopyTargetCrd, send queues config
t4lz Aug 7, 2024
571b5a2
non non exauhstive
t4lz Aug 7, 2024
a92e633
CR: CopyTargetCrd, send queues config
t4lz Aug 7, 2024
5c36e00
Added unknown variant to QueueFilter
t4lz Aug 8, 2024
fca1a87
CR: info log on implicit copy-target
t4lz Aug 8, 2024
4f2e53d
Change QueueConsumer to have a valid k8s schema
t4lz Aug 9, 2024
e6e281d
CRD fixes: skip unknown variants in schema, change CleanupError statu…
t4lz Aug 9, 2024
78526f5
starting status has to have items
t4lz Aug 9, 2024
208ae7d
docs
t4lz Aug 10, 2024
d682427
change start time to str because I remember k8s actually can't deal w…
t4lz Aug 11, 2024
e2d2219
rule to patch pods
t4lz Aug 12, 2024
0095c85
api group of pods is empty string
t4lz Aug 12, 2024
24cae8a
Can't patch pod container env :(
t4lz Aug 12, 2024
3691c01
Merge branch 'main' into sqs
t4lz Aug 14, 2024
ed5a76a
Merge branch 'main' into sqs
t4lz Aug 15, 2024
7b7f4dd
CR: out of place serde tag
t4lz Aug 16, 2024
08f5b26
Support installing an SQS-enabled operator via `mirrord operator setu…
t4lz Aug 16, 2024
0448780
incomplete flag docs
t4lz Aug 16, 2024
2e43cdc
Merge branch 'main' into sqs
t4lz Aug 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/2066.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Client side support for the upcoming SQS queue splitting support in *mirrord for Teams*.
49 changes: 49 additions & 0 deletions mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,18 @@
"type": "null"
}
]
},
"split_queues": {
"title": "feature.split_queues {#feature-split_queues}",
"description": "Define filters to split queues by, and make your local application consume only messages that match those filters. If you don't specify any filter for a queue that is however declared in the `MirrordWorkloadQueueRegistry` of the target you're using, a match-nothing filter will be used, and your local application will not receive any messages from that queue.",
"anyOf": [
{
"$ref": "#/definitions/SplitQueuesConfig"
},
{
"type": "null"
}
]
}
},
"additionalProperties": false
Expand Down Expand Up @@ -1431,6 +1443,33 @@
}
]
},
"QueueFilter": {
"description": "More queue types might be added in the future.",
"oneOf": [
{
"description": "Amazon Simple Queue Service.",
"type": "object",
"required": [
"message_filter",
"queue_type"
],
"properties": {
"message_filter": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"queue_type": {
"type": "string",
"enum": [
"SQS"
]
}
}
}
]
},
"RolloutTarget": {
"description": "<!--${internal}--> Mirror the rollout specified by [`RolloutTarget::rollout`].",
"type": "object",
Expand All @@ -1451,6 +1490,16 @@
},
"additionalProperties": false
},
"SplitQueuesConfig": {
"description": "```json { \"feature\": { \"split_queues\": { \"first-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"wows\": \"so wows\", \"coolz\": \"^very .*\" } }, \"second-queue\": { \"queue_type\": \"SQS\", \"message_filter\": { \"who\": \"*you$\" } }, } } } ```",
"type": [
"object",
"null"
],
"additionalProperties": {
"$ref": "#/definitions/QueueFilter"
}
},
"StatefulSetTarget": {
"type": "object",
"required": [
Expand Down
2 changes: 1 addition & 1 deletion mirrord/cli/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl From<OperatorApiError> for CliError {
feature,
operator_version,
} => Self::FeatureNotSupportedInOperatorError {
feature,
feature: feature.to_string(),
operator_version,
},
OperatorApiError::CreateKubeClient(e) => Self::CreateKubeApiFailed(e),
Expand Down
24 changes: 12 additions & 12 deletions mirrord/cli/src/operator/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use mirrord_operator::{
error::{OperatorApiError, OperatorOperation},
MaybeClientCert, OperatorApi,
},
crd::SessionCrd,
crd::{NewOperatorFeature, SessionCrd},
};
use mirrord_progress::{Progress, ProgressTracker};
use tracing::Level;
Expand Down Expand Up @@ -99,7 +99,7 @@ impl SessionCommandHandler {
if code == 404 && reason.contains("parse") =>
{
OperatorApiError::UnsupportedFeature {
feature: "session management".to_string(),
feature: NewOperatorFeature::SessionManagement,
operator_version: operator_api.operator().spec.operator_version.clone(),
}
}
Expand Down Expand Up @@ -135,16 +135,16 @@ impl SessionCommandHandler {
)));
progress.success(Some("Session operation is completed."));

Ok(())
}
})
.transpose()?
// We might've gotten a `SessionCrd` instead of a `Status` (we have a `Left(T)`),
// meaning that the operation has started, but it might not be finished yet.
.unwrap_or_else(|| {
sub_progress.success(Some(&format!("No issues found when executing `{command}`, but the operation status could not be determined at this time.")));
progress.success(Some(&format!("`{command}` is done, but the operation might be pending.")));
});
Ok(())
}
})
.transpose()?
// We might've gotten a `SessionCrd` instead of a `Status` (we have a `Left(T)`),
// meaning that the operation has started, but it might not be finished yet.
.unwrap_or_else(|| {
sub_progress.success(Some(&format!("No issues found when executing `{command}`, but the operation status could not be determined at this time.")));
progress.success(Some(&format!("`{command}` is done, but the operation might be pending.")));
});

Ok(())
}
Expand Down
30 changes: 30 additions & 0 deletions mirrord/config/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1196,6 +1196,36 @@ of regexes specified here. If there is a match, mirrord will connect your applic
the target unix socket address on the target pod. Otherwise, it will leave the connection
to happen locally on your machine.

## feature.split_queues {#feature-split_queues}

Define filters to split queues by, and make your local application consume only messages
that match those filters.
If you don't specify any filter for a queue that is however declared in the
`MirrordWorkloadQueueRegistry` of the target you're using, a match-nothing filter
will be used, and your local application will not receive any messages from that queue.

```json
{
"feature": {
"split_queues": {
"first-queue": {
"queue_type": "SQS",
"message_filter": {
"wows": "so wows",
"coolz": "^very .*"
}
},
"second-queue": {
"queue_type": "SQS",
"message_filter": {
"who": "*you$"
}
},
}
}
}
```

## internal_proxy {#root-internal_proxy}

Configuration for the internal proxy mirrord spawns for each local mirrord session
Expand Down
14 changes: 13 additions & 1 deletion mirrord/config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use schemars::JsonSchema;
use serde::Serialize;

use self::{copy_target::CopyTargetConfig, env::EnvConfig, fs::FsConfig, network::NetworkConfig};
use crate::config::source::MirrordConfigSource;
use crate::{config::source::MirrordConfigSource, feature::split_queues::SplitQueuesConfig};

pub mod copy_target;
pub mod env;
pub mod fs;
pub mod network;
pub mod split_queues;

/// Controls mirrord features.
///
Expand Down Expand Up @@ -96,6 +97,16 @@ pub struct FeatureConfig {
/// Should mirrord return the hostname of the target pod when calling `gethostname`
#[config(default = true)]
pub hostname: bool,

/// ## feature.split_queues {#feature-split_queues}
///
/// Define filters to split queues by, and make your local application consume only messages
/// that match those filters.
/// If you don't specify any filter for a queue that is however declared in the
/// `MirrordWorkloadQueueRegistry` of the target you're using, a match-nothing filter
/// will be used, and your local application will not receive any messages from that queue.
#[config(nested, unstable)]
pub split_queues: SplitQueuesConfig,
}

impl CollectAnalytics for &FeatureConfig {
Expand All @@ -105,5 +116,6 @@ impl CollectAnalytics for &FeatureConfig {
analytics.add("network", &self.network);
analytics.add("copy_target", &self.copy_target);
analytics.add("hostname", self.hostname);
analytics.add("split_queues", &self.split_queues);
}
}
113 changes: 113 additions & 0 deletions mirrord/config/src/feature/split_queues.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::{
collections::{BTreeMap, HashMap},
ops::Not,
};

use mirrord_analytics::{Analytics, CollectAnalytics};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::config::{ConfigContext, FromMirrordConfig, MirrordConfig};

pub type QueueId = String;

/// ```json
/// {
/// "feature": {
/// "split_queues": {
/// "first-queue": {
/// "queue_type": "SQS",
/// "message_filter": {
/// "wows": "so wows",
/// "coolz": "^very .*"
/// }
/// },
Razz4780 marked this conversation as resolved.
Show resolved Hide resolved
/// "second-queue": {
/// "queue_type": "SQS",
/// "message_filter": {
/// "who": "*you$"
/// }
/// },
/// }
/// }
/// }
/// ```
#[derive(Clone, Debug, Eq, PartialEq, JsonSchema, Serialize, Deserialize, Default)]
pub struct SplitQueuesConfig(pub Option<BTreeMap<QueueId, QueueFilter>>);

impl SplitQueuesConfig {
pub fn is_set(&self) -> bool {
self.0.is_some()
}

/// Out of the whole queue splitting config, get only the sqs queues.
pub fn get_sqs_filter(&self) -> Option<HashMap<String, SqsMessageFilter>> {
self.0
.as_ref()
.map(BTreeMap::iter)
.map(|filters| {
filters
// When there are more variants of QueueFilter, change this to a `filter_map`.
.filter_map(|(queue_id, queue_filter)| match queue_filter {
QueueFilter::Sqs(filter_mapping) => {
Some((queue_id.clone(), filter_mapping.clone()))
}
_ => None,
})
.collect()
})
.and_then(|filters_map: HashMap<String, SqsMessageFilter>| {
filters_map.is_empty().not().then_some(filters_map)
})
}
}

impl MirrordConfig for SplitQueuesConfig {
type Generated = Self;

fn generate_config(
self,
_context: &mut ConfigContext,
) -> crate::config::Result<Self::Generated> {
Ok(self)
}
}

impl FromMirrordConfig for SplitQueuesConfig {
type Generator = Self;
}

pub type MessageAttributeName = String;
pub type AttributeValuePattern = String;

/// A filter is a mapping between message attribute names and regexes they should match.
/// The local application will only receive messages that match **all** of the given patterns.
/// This means, only messages that have **all** the `MessageAttributeName`s in the filter,
/// with values of those attributes matching the respective `AttributeValuePattern`.
pub type SqsMessageFilter = BTreeMap<MessageAttributeName, AttributeValuePattern>;

/// More queue types might be added in the future.
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, JsonSchema)]
#[serde(tag = "queue_type", content = "message_filter")]
pub enum QueueFilter {
/// Amazon Simple Queue Service.
#[serde(rename = "SQS")]
Sqs(SqsMessageFilter),
Razz4780 marked this conversation as resolved.
Show resolved Hide resolved
/// When a newer client sends a new filter kind to an older operator, that does not yet know
/// about that filter type, this is what that filter will be deserialized to.
#[schemars(skip)]
#[serde(other, skip_serializing)]
Unknown,
}

impl CollectAnalytics for &SplitQueuesConfig {
fn collect_analytics(&self, analytics: &mut Analytics) {
analytics.add(
"queue_count",
self.0
.as_ref()
.map(|mapping| mapping.len())
.unwrap_or_default(),
)
}
}
1 change: 1 addition & 0 deletions mirrord/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@ mod tests {
})),
copy_target: None,
hostname: None,
split_queues: None,
}),
connect_tcp: None,
container: None,
Expand Down
4 changes: 2 additions & 2 deletions mirrord/kube/src/api/kubernetes/rollout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use k8s_openapi::{
apimachinery::pkg::apis::meta::v1::ObjectMeta, ListableResource, Metadata,
NamespaceResourceScope, Resource,
};
use serde::Deserialize;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Rollout {
metadata: ObjectMeta,
pub spec: serde_json::Value,
Expand Down
2 changes: 1 addition & 1 deletion mirrord/operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mirrord-progress = { path = "../progress", optional = true }
mirrord-protocol = { path = "../protocol", optional = true }

base64 = { workspace = true, optional = true }
bincode = { version = "2.0.0-rc.2", features = ["serde"], optional = true }
bincode = { version = "2.0.0-rc.2", features = ["serde"], optional = true }
chrono = { version = "0.4", features = ["clock", "serde"] }
http = { version = "1", optional = true }
http-body-util = { workspace = true, optional = true }
Expand Down
Loading
Loading