Skip to content

Commit

Permalink
Move session cmd stuff to module. // docs
Browse files Browse the repository at this point in the history
  • Loading branch information
meowjesty committed Feb 19, 2024
1 parent 8cb7a54 commit 21d3ad3
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 88 deletions.
30 changes: 22 additions & 8 deletions mirrord/cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,16 +223,30 @@ pub(super) enum OperatorCommand {
#[arg(short = 'f')]
config_file: Option<String>,
},
Session {
#[arg(long, conflicts_with = "kill_all", conflicts_with = "retain_active")]
kill: Option<u32>,

#[arg(long, conflicts_with = "kill", conflicts_with = "retain_active")]
kill_all: bool,
/// Operator session management commands.
///
/// Allows the user to forcefully kill living sessions.
#[command(subcommand)]
Session(SessionCommand),
}

#[arg(long, conflicts_with = "kill_all", conflicts_with = "kill")]
retain_active: bool,
/// `mirrord operator session` family of commands.
///
/// Allows the user to forcefully kill operator sessions, use with care!
#[derive(Debug, Subcommand, Clone, Copy)]
pub(crate) enum SessionCommand {
/// Kills the session specified by `id`.
Kill {
/// Id of the session.
#[arg(short, long)]
id: u32,
},
/// Kills all operator sessions.
KillAll,

/// Kills _inactive_ sessions, might be useful if an undead session is still being stored in
/// the session storage.
RetainActive,
}

#[derive(ValueEnum, Clone, Debug)]
Expand Down
96 changes: 16 additions & 80 deletions mirrord/cli/src/operator.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,33 @@
use std::{fs::File, path::PathBuf, time::Duration};

use kube::{api::DeleteParams, Api};
use kube::Api;
use mirrord_config::{
config::{ConfigContext, MirrordConfig},
LayerFileConfig,
};
use mirrord_kube::api::kubernetes::create_kube_api;
use mirrord_operator::{
client::{session_api, OperatorApiError, OperatorOperation},
client::{OperatorApiError, OperatorOperation},
crd::{LicenseInfoOwned, MirrordOperatorCrd, MirrordOperatorSpec, OPERATOR_STATUS_NAME},
setup::{LicenseType, Operator, OperatorNamespace, OperatorSetup, SetupOptions},
};
use mirrord_progress::{Progress, ProgressTracker};
use prettytable::{row, Table};
use serde::Deserialize;
use tokio::fs;
use tracing::{error, warn};
use tracing::warn;

use self::session::{
operator_session_kill_all, operator_session_kill_one, operator_session_retain_active,
};
use crate::{
config::{OperatorArgs, OperatorCommand},
error::CliError,
Result,
Result, SessionCommand,
};

mod session;

#[derive(Deserialize)]
struct OperatorVersionResponse {
operator: String,
Expand Down Expand Up @@ -270,77 +275,6 @@ Operator License
Ok(())
}

/// Handles the `mirrord operator session` family of commands:
///
/// - `--kill-one {session_id}`: kills the operator session specified by `session_id`;
/// - `--kill-all`: kills every operator session, this is basically a `.clear()`;
/// - `--retain-active`: performs a clean-up for operator sessions that are still stored;
#[tracing::instrument(level = "trace", ret)]
async fn operator_session(
kill_one: Option<u32>,
kill_all: bool,
retain_active: bool,
) -> Result<()> {
let mut progress = ProgressTracker::from_env("Operator session action");

let session_api = session_api(None)
.await
.inspect_err(|fail| error!("Failed to even get sessio_api {fail:?}!"))?;

let mut operation_progress = progress.subtask("preparing to execute session operation...");

let result = if let Some(session_id) = kill_one {
operation_progress.print("killing session with id {session_id}");

session_api
.delete(&format!("kill_one/{session_id}"), &DeleteParams::default())
.await
.map_err(|error| OperatorApiError::KubeError {
error,
operation: OperatorOperation::GettingStatus,
})
.map_err(CliError::from)
} else if kill_all {
operation_progress.print("killing all sessions");

session_api
.delete("kill_all", &DeleteParams::default())
.await
.map_err(|error| OperatorApiError::KubeError {
error,
operation: OperatorOperation::GettingStatus,
})
.map_err(CliError::from)
} else if retain_active {
operation_progress.print("retaining only active sessions");

session_api
.delete("retain_active", &DeleteParams::default())
.await
.map_err(|error| OperatorApiError::KubeError {
error,
operation: OperatorOperation::GettingStatus,
})
.map_err(CliError::from)
} else {
panic!("You must select one of the session options, there is no default!");
}
.inspect_err(|_| {
operation_progress.failure(Some("Failed to execute session operation!"));
})?;

if let Some(status) = result.right() {
operation_progress.success(Some(&format!(
"session operation completed with {status:?}"
)));
}

operation_progress.success(Some("session operation finished"));
progress.success(Some("Done with session stuff!"));

Ok(())
}

/// Handle commands related to the operator `mirrord operator ...`
pub(crate) async fn operator_command(args: OperatorArgs) -> Result<()> {
match args.command {
Expand All @@ -352,10 +286,12 @@ pub(crate) async fn operator_command(args: OperatorArgs) -> Result<()> {
license_path,
} => operator_setup(accept_tos, file, namespace, license_key, license_path).await,
OperatorCommand::Status { config_file } => operator_status(config_file).await,
OperatorCommand::Session {
kill,
kill_all,
retain_active,
} => operator_session(kill, kill_all, retain_active).await,
OperatorCommand::Session(SessionCommand::Kill { id }) => {
operator_session_kill_one(id).await
}
OperatorCommand::Session(SessionCommand::KillAll) => operator_session_kill_all().await,
OperatorCommand::Session(SessionCommand::RetainActive) => {
operator_session_retain_active().await
}
}
}
102 changes: 102 additions & 0 deletions mirrord/cli/src/operator/session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use kube::{api::DeleteParams, core::Status, Api};
use mirrord_operator::client::{session_api, OperatorApiError, OperatorOperation};
use mirrord_progress::{Progress, ProgressTracker};
use tracing::error;

use crate::{error::CliError, Result};

/// Prepares progress and kube api for use in the operator session commands.
#[tracing::instrument(level = "trace", ret)]
async fn operator_session_prepare() -> Result<(
ProgressTracker,
Api<mirrord_operator::crd::SessionCrd>,
ProgressTracker,
)> {
let progress = ProgressTracker::from_env("Operator session action");

let session_api = session_api(None)
.await
.inspect_err(|fail| error!("Failed to even get sessio_api {fail:?}!"))?;

let sub_progress = progress.subtask("preparing to execute session operation...");

Ok((progress, session_api, sub_progress))
}

/// Handles the cleanup part of progress after an operator session command.
#[tracing::instrument(level = "trace", ret)]
fn operator_session_finished(
result: Option<Status>,
mut sub_progress: ProgressTracker,
mut progress: ProgressTracker,
) -> Result<()> {
if let Some(status) = result {
sub_progress.success(Some(&format!(
"session operation completed with {status:?}"
)));
}

sub_progress.success(Some("session operation finished"));
progress.success(Some("Done with session stuff!"));

Ok(())
}

/// `mirrord operator session kill_all`: kills every operator session, this is basically a
/// `.clear()`;
#[tracing::instrument(level = "trace", ret)]
pub(super) async fn operator_session_kill_all() -> Result<()> {
let (progress, api, sub_progress) = operator_session_prepare().await?;

sub_progress.print("killing all sessions");

let result = api
.delete("kill_all", &DeleteParams::default())
.await
.map_err(|error| OperatorApiError::KubeError {
error,
operation: OperatorOperation::GettingStatus,
})
.map_err(CliError::from)?;

operator_session_finished(result.right(), sub_progress, progress)
}

/// `mirrord operator session kill {id}`: kills the operator session specified by `id`.
#[tracing::instrument(level = "trace", ret)]
pub(super) async fn operator_session_kill_one(id: u32) -> Result<()> {
let (progress, api, sub_progress) = operator_session_prepare().await?;

sub_progress.print("killing session with id {session_id}");

let result = api
.delete(&format!("kill_one/{id}"), &DeleteParams::default())
.await
.map_err(|error| OperatorApiError::KubeError {
error,
operation: OperatorOperation::GettingStatus,
})
.map_err(CliError::from)?;

operator_session_finished(result.right(), sub_progress, progress)
}

/// `mirrord operator session kill {id}`: performs a clean-up for operator sessions that are still
/// stored;
#[tracing::instrument(level = "trace", ret)]
pub(super) async fn operator_session_retain_active() -> Result<()> {
let (progress, api, sub_progress) = operator_session_prepare().await?;

sub_progress.print("retaining only active sessions");

let result = api
.delete("retain_active", &DeleteParams::default())
.await
.map_err(|error| OperatorApiError::KubeError {
error,
operation: OperatorOperation::GettingStatus,
})
.map_err(CliError::from)?;

operator_session_finished(result.right(), sub_progress, progress)
}

0 comments on commit 21d3ad3

Please sign in to comment.