Skip to content

Commit

Permalink
Added internal_proxy timeout configurations to allow users specify …
Browse files Browse the repository at this point in the history
…timeouts (#1770)

* Added `internal_proxy` timeout configurations to allow users specify timeouts in edge cases.

* right

* ..
  • Loading branch information
aviramha authored Aug 7, 2023
1 parent 56da982 commit 9d04879
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 37 deletions.
1 change: 1 addition & 0 deletions changelog.d/1761.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added `internal_proxy` timeout configurations to allow users specify timeouts in edge cases.
38 changes: 38 additions & 0 deletions mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@
}
]
},
"internal_proxy": {
"title": "internal_proxy {#root-internal_proxy}",
"anyOf": [
{
"$ref": "#/definitions/InternalProxyFileConfig"
},
{
"type": "null"
}
]
},
"kube_context": {
"title": "kube_context {#root-kube_context}",
"description": "Kube context to use from the kubeconfig file. Will use current context if not specified.\n\n```json { \"kube_context\": \"mycluster\" } ```",
Expand Down Expand Up @@ -739,6 +750,33 @@
}
]
},
"InternalProxyFileConfig": {
"description": "Configuration for the internal proxy mirrord spawns for each local mirrord session that local layers use to connect to the remote agent\n\nThis is seldom used, but if you get `ConnectionRefused` errors, you might want to increase the timeouts a bit.\n\n```json { \"internal_proxy\": { \"start_idle_timeout\": 30, \"idle_timeout\": 5, } } ```",
"type": "object",
"properties": {
"idle_timeout": {
"title": "internal_proxy.idle_timeout {#agent-idle_timeout}",
"description": "How much time to wait while we don't have any active connections before exiting.\n\nCommon cases would be running a chain of processes that skip using the layer and don't connect to the proxy.\n\n```json { \"internal_proxy\": { \"idle_timeout\": 30 } } ```",
"type": [
"integer",
"null"
],
"format": "uint64",
"minimum": 0.0
},
"start_idle_timeout": {
"title": "internal_proxy.start_idle_timeout {#agent-start_idle_timeout}",
"description": "How much time to wait for the first connection to the proxy in seconds.\n\nCommon cases would be running with dlv or any other debugger, which sets a breakpoint on process execution, delaying the layer startup and connection to proxy.\n\n```json { \"internal_proxy\": { \"start_idle_timeout\": 60 } } ```",
"type": [
"integer",
"null"
],
"format": "uint64",
"minimum": 0.0
}
},
"additionalProperties": false
},
"LinuxCapability": {
"type": "string",
"enum": [
Expand Down
14 changes: 1 addition & 13 deletions mirrord/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub(super) enum Commands {

/// Internal proxy - used to aggregate connections from multiple layers
#[command(hide = true, name = "intproxy")]
InternalProxy(Box<InternalProxyArgs>),
InternalProxy,
}

#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
Expand Down Expand Up @@ -266,18 +266,6 @@ pub(super) struct ExtensionExecArgs {
pub executable: Option<String>,
}

#[derive(Args, Debug)]
pub(super) struct InternalProxyArgs {
/// Launch timeout until we get first connection.
/// If layer doesn't connect in this time, we timeout and exit.
#[arg(short = 't', default_value_t = 30)]
pub timeout: u64,

/// Specify config file to use
#[arg(short = 'f')]
pub config_file: Option<String>,
}

#[derive(Args, Debug)]
pub(super) struct WaitlistArgs {
/// Email to register
Expand Down
5 changes: 0 additions & 5 deletions mirrord/cli/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ impl MirrordExecution {
// We only need the executable on macos, for SIP handling.
#[cfg(target_os = "macos")] executable: Option<&str>,
progress: &P,
timeout: Option<u64>,
) -> Result<Self>
where
P: Progress + Send + Sync,
Expand Down Expand Up @@ -119,10 +118,6 @@ impl MirrordExecution {
.stderr(std::process::Stdio::null())
.stdin(std::process::Stdio::null());

if let Some(timeout) = timeout {
proxy_command.arg("-t").arg(timeout.to_string());
}

match &connect_info {
AgentConnectInfo::DirectKubernetes(name, port) => {
proxy_command.env("MIRRORD_CONNECT_AGENT", name);
Expand Down
4 changes: 2 additions & 2 deletions mirrord/cli/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ pub(crate) async fn extension_exec(args: ExtensionExecArgs, progress: &TaskProgr
// or run tasks before actually launching.
#[cfg(target_os = "macos")]
let mut execution_info =
MirrordExecution::start(&config, args.executable.as_deref(), &progress, Some(60)).await?;
MirrordExecution::start(&config, args.executable.as_deref(), &progress).await?;
#[cfg(not(target_os = "macos"))]
let mut execution_info = MirrordExecution::start(&config, &progress, Some(60)).await?;
let mut execution_info = MirrordExecution::start(&config, &progress).await?;

// We don't execute so set envs aren't passed, so we need to add config file and target to env.
execution_info.environment.extend(env);
Expand Down
23 changes: 11 additions & 12 deletions mirrord/cli/src/internal_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::{error, info, log::trace};

use crate::{
config::InternalProxyArgs,
error::{InternalProxyError, Result},
};
use crate::error::{InternalProxyError, Result};

unsafe fn redirect_fd_to_dev_null(fd: libc::c_int) {
let devnull_fd = libc::open(b"/dev/null\0" as *const [u8; 10] as _, libc::O_RDWR);
Expand Down Expand Up @@ -190,7 +187,7 @@ fn create_listen_socket() -> Result<TcpListener, InternalProxyError> {

/// Main entry point for the internal proxy.
/// It listens for inbound layer connect and forwards to agent.
pub(crate) async fn proxy(args: InternalProxyArgs) -> Result<()> {
pub(crate) async fn proxy() -> Result<()> {
let started = std::time::Instant::now();
// Let it assign port for us then print it for the user.
let listener = create_listen_socket()?;
Expand Down Expand Up @@ -218,11 +215,13 @@ pub(crate) async fn proxy(args: InternalProxyArgs) -> Result<()> {

print_port(&listener)?;

// wait for first connection `FIRST_CONNECTION_TIMEOUT` seconds, or timeout.
let (stream, _) = timeout(Duration::from_secs(args.timeout), listener.accept())
.await
.map_err(|_| InternalProxyError::FirstConnectionTimeout)?
.map_err(InternalProxyError::AcceptError)?;
let (stream, _) = timeout(
Duration::from_secs(config.internal_proxy.start_idle_timeout),
listener.accept(),
)
.await
.map_err(|_| InternalProxyError::FirstConnectionTimeout)?
.map_err(InternalProxyError::AcceptError)?;

let mut active_connections = JoinSet::new();

Expand Down Expand Up @@ -251,12 +250,12 @@ pub(crate) async fn proxy(args: InternalProxyArgs) -> Result<()> {
trace!("intproxy main connection canceled.");
break;
}
_ = tokio::time::sleep(Duration::from_secs(5)) => {
_ = tokio::time::sleep(Duration::from_secs(config.internal_proxy.idle_timeout)) => {
if active_connections.is_empty() {
trace!("intproxy timeout, no active connections. Exiting.");
break;
}
trace!("intproxy 5 sec tick, active_connections: {active_connections:?}.");
trace!("intproxy {} sec tick, active_connections: {active_connections:?}.", config.internal_proxy.idle_timeout);
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions mirrord/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,9 @@ async fn exec(args: &ExecArgs, progress: &TaskProgress) -> Result<()> {
let config = LayerConfig::from_env()?;

#[cfg(target_os = "macos")]
let execution_info =
MirrordExecution::start(&config, Some(&args.binary), progress, None).await?;
let execution_info = MirrordExecution::start(&config, Some(&args.binary), progress).await?;
#[cfg(not(target_os = "macos"))]
let execution_info = MirrordExecution::start(&config, progress, None).await?;
let execution_info = MirrordExecution::start(&config, progress).await?;

#[cfg(target_os = "macos")]
let (_did_sip_patch, binary) = match execution_info.patched_path {
Expand Down Expand Up @@ -399,7 +398,7 @@ async fn main() -> miette::Result<()> {
Commands::ExtensionExec(args) => {
extension_exec(*args, &MAIN_PROGRESS_TASK.subtask("ext")).await?
}
Commands::InternalProxy(args) => internal_proxy::proxy(*args).await?,
Commands::InternalProxy => internal_proxy::proxy().await?,
Commands::Waitlist(args) => register_to_waitlist(args.email).await?,
}

Expand Down
57 changes: 57 additions & 0 deletions mirrord/config/src/internal_proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use mirrord_config_derive::MirrordConfig;
use schemars::JsonSchema;

use crate::config::source::MirrordConfigSource;

/// Configuration for the internal proxy mirrord spawns for each local mirrord session
/// that local layers use to connect to the remote agent
///
/// This is seldom used, but if you get `ConnectionRefused` errors, you might
/// want to increase the timeouts a bit.
///
/// ```json
/// {
/// "internal_proxy": {
/// "start_idle_timeout": 30,
/// "idle_timeout": 5,
/// }
/// }
/// ```
#[derive(MirrordConfig, Clone, Debug)]
#[config(map_to = "InternalProxyFileConfig", derive = "JsonSchema")]
#[cfg_attr(test, config(derive = "PartialEq"))]
pub struct InternalProxyConfig {
/// ### internal_proxy.start_idle_timeout {#agent-start_idle_timeout}
///
/// How much time to wait for the first connection to the proxy in seconds.
///
/// Common cases would be running with dlv or any other debugger, which sets a breakpoint
/// on process execution, delaying the layer startup and connection to proxy.
///
/// ```json
/// {
/// "internal_proxy": {
/// "start_idle_timeout": 60
/// }
/// }
/// ```
#[config(default = 60)]
pub start_idle_timeout: u64,

/// ### internal_proxy.idle_timeout {#agent-idle_timeout}
///
/// How much time to wait while we don't have any active connections before exiting.
///
/// Common cases would be running a chain of processes that skip using the layer
/// and don't connect to the proxy.
///
/// ```json
/// {
/// "internal_proxy": {
/// "idle_timeout": 30
/// }
/// }
/// ```
#[config(default = 5)]
pub idle_timeout: u64,
}
8 changes: 7 additions & 1 deletion mirrord/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
pub mod agent;
pub mod config;
pub mod feature;
pub mod internal_proxy;
pub mod target;
pub mod util;

Expand All @@ -24,7 +25,7 @@ use tracing::warn;

use crate::{
agent::AgentConfig, config::source::MirrordConfigSource, feature::FeatureConfig,
target::TargetConfig, util::VecOrSingle,
internal_proxy::InternalProxyConfig, target::TargetConfig, util::VecOrSingle,
};

const PAUSE_WITHOUT_STEAL_WARNING: &str =
Expand Down Expand Up @@ -303,6 +304,10 @@ pub struct LayerConfig {
/// ```
#[config(env = "MIRRORD_KUBE_CONTEXT")]
pub kube_context: Option<String>,

/// # internal_proxy {#root-internal_proxy}
#[config(nested)]
pub internal_proxy: InternalProxyConfig,
}

impl LayerConfig {
Expand Down Expand Up @@ -666,6 +671,7 @@ mod tests {
operator: None,
sip_binaries: None,
kube_context: None,
internal_proxy: None,
};

assert_eq!(config, expect);
Expand Down

0 comments on commit 9d04879

Please sign in to comment.