Skip to content

Commit

Permalink
Out of pods operator fixes (#1800)
Browse files Browse the repository at this point in the history
* Add Operator Specific OOP handling

* Small

* This?

* This

* This?

* Update

* Tiny

* Update

* Docs
  • Loading branch information
DmitryDodzin authored Aug 20, 2023
1 parent cdc87c9 commit 4a99411
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/+oop-operator.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Small changes relevant to operator for #1782.
1 change: 1 addition & 0 deletions mirrord/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ impl ClientConnectionHandler {
self.respond(DaemonMessage::SwitchProtocolVersionResponse(version))
.await?;
}
ClientMessage::ReadyForLogs => {}
}

Ok(true)
Expand Down
8 changes: 4 additions & 4 deletions mirrord/agent/src/steal/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,14 +248,14 @@ impl TcpConnectionStealer {

if HTTP_FRAMED_VERSION.matches(version) {
Ok(daemon_tx
.send(DaemonTcp::HttpRequest(
request.into_serializable_fallback().await?,
.send(DaemonTcp::HttpRequestFramed(
request.into_serializable().await?,
))
.await?)
} else {
Ok(daemon_tx
.send(DaemonTcp::HttpRequestFramed(
request.into_serializable().await?,
.send(DaemonTcp::HttpRequest(
request.into_serializable_fallback().await?,
))
.await?)
}
Expand Down
1 change: 1 addition & 0 deletions mirrord/kube/src/api/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ static AGENT_READY_REGEX: LazyLock<Regex> = LazyLock::new(|| {
* Wait until the agent prints the "agent ready" message.
* Return agent version extracted from the message (if found).
*/
#[tracing::instrument(level = "trace", skip(pod_api), ret)]
async fn wait_for_agent_startup(
pod_api: &Api<Pod>,
pod_name: &str,
Expand Down
2 changes: 2 additions & 0 deletions mirrord/kube/src/api/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl RuntimeData {
})
}

#[tracing::instrument(level = "trace", skip(client), ret)]
pub async fn check_node(&self, client: &kube::Client) -> NodeCheck {
let node_api: Api<Node> = Api::all(client.clone());
let pod_api: Api<Pod> = Api::all(client.clone());
Expand Down Expand Up @@ -153,6 +154,7 @@ impl RuntimeData {
}
}

#[derive(Debug)]
pub enum NodeCheck {
Success,
Failed(String, usize),
Expand Down
12 changes: 10 additions & 2 deletions mirrord/layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ use mirrord_layer_macro::{hook_fn, hook_guard_fn};
use mirrord_protocol::{
dns::{DnsLookup, GetAddrInfoRequest},
tcp::{HttpResponseFallback, LayerTcpSteal},
ClientMessage, DaemonMessage,
ClientMessage, DaemonMessage, CLIENT_READY_FOR_LOGS,
};
use outgoing::{tcp::TcpOutgoingHandler, udp::UdpOutgoingHandler};
use regex::RegexSet;
Expand Down Expand Up @@ -780,7 +780,15 @@ impl Layer {
DaemonMessage::PauseTarget(_) => {
unreachable!("We set pausing target only on initialization, shouldn't happen")
}
DaemonMessage::SwitchProtocolVersionResponse(_) => Ok(()),
DaemonMessage::SwitchProtocolVersionResponse(protocol_version) => {
if CLIENT_READY_FOR_LOGS.matches(&protocol_version) {
if let Err(err) = self.tx.send(ClientMessage::ReadyForLogs).await {
warn!("Unable to ready-up for logs: {err}");
}
}

Ok(())
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion mirrord/protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mirrord-protocol"
version = "1.3.0"
version = "1.3.1"
authors.workspace = true
description.workspace = true
documentation.workspace = true
Expand Down
7 changes: 7 additions & 0 deletions mirrord/protocol/src/codec.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::{
collections::{HashMap, HashSet},
io,
sync::LazyLock,
};

use actix_codec::{Decoder, Encoder};
use bincode::{error::DecodeError, Decode, Encode};
use bytes::{Buf, BufMut, BytesMut};
use mirrord_macros::protocol_break;
use semver::VersionReq;

use crate::{
dns::{GetAddrInfoRequest, GetAddrInfoResponse},
Expand Down Expand Up @@ -80,6 +82,10 @@ pub enum FileRequest {
GetDEnts64(GetDEnts64Request),
}

/// Minimal mirrord-protocol version that allows `ClientMessage::ReadyForLogs` message.
pub static CLIENT_READY_FOR_LOGS: LazyLock<VersionReq> =
LazyLock::new(|| ">=1.3.1".parse().expect("Bad Identifier"));

/// `-layer` --> `-agent` messages.
#[derive(Encode, Decode, Debug, PartialEq, Eq, Clone)]
pub enum ClientMessage {
Expand All @@ -95,6 +101,7 @@ pub enum ClientMessage {
/// Whether to pause or unpause the target container.
PauseTargetRequest(bool),
SwitchProtocolVersion(#[bincode(with_serde)] semver::Version),
ReadyForLogs,
}

/// Type alias for `Result`s that should be returned from mirrord-agent to mirrord-layer.
Expand Down
4 changes: 3 additions & 1 deletion mirrord/protocol/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,10 @@ impl HttpRequestFallback {
}
}

/// Minimal mirrord-protocol version that allows `DaemonTcp::HttpRequestFramed` instead of
/// `DaemonTcp::HttpRequest`.
pub static HTTP_FRAMED_VERSION: LazyLock<VersionReq> =
LazyLock::new(|| "<1.3.0".parse().expect("Bad Identifier"));
LazyLock::new(|| ">=1.3.0".parse().expect("Bad Identifier"));

/// Protocol break - on version 2, please add source port, dest/src IP to the message
/// so we can avoid losing this information.
Expand Down

0 comments on commit 4a99411

Please sign in to comment.