Skip to content

Commit

Permalink
feat(telemetry): static values, cpu and mem metrics gathering
Browse files Browse the repository at this point in the history
- basic setup to initialise the static values for telemetry store added.
- cpu and memory used by influxdb3 is sampled at 1min interval
- some minor tidyups

Closes: #25370, #25371
  • Loading branch information
praveen-influx committed Sep 24, 2024
1 parent c1a5e1b commit 969f845
Show file tree
Hide file tree
Showing 9 changed files with 532 additions and 139 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ unicode-segmentation = "1.11.0"
url = "2.5.0"
urlencoding = "1.1"
uuid = { version = "1", features = ["v4"] }
num = { version = "0.4.3" }

# Core.git crates we depend on
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "1eaa4ed5ea147bc24db98d9686e457c124dfd5b7"}
Expand Down
35 changes: 32 additions & 3 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use clap_blocks::{
memory_size::MemorySize,
object_store::{make_object_store, ObjectStoreConfig},
object_store::{make_object_store, ObjectStoreConfig, ObjectStoreType},
socket_addr::SocketAddr,
tokio::TokioDatafusionConfig,
};
Expand All @@ -14,6 +14,7 @@ use influxdb3_server::{
auth::AllOrNothingAuthorizer, builder::ServerBuilder, query_executor::QueryExecutorImpl, serve,
CommonServerState,
};
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
last_cache::LastCacheProvider, persister::Persister, write_buffer::WriteBufferImpl, WriteBuffer,
Expand Down Expand Up @@ -255,9 +256,10 @@ pub async fn command(config: Config) -> Result<()> {

// Construct a token to trigger clean shutdown
let frontend_shutdown = CancellationToken::new();
let obj_store_config = &config.object_store_config;

let object_store: Arc<DynObjectStore> =
make_object_store(&config.object_store_config).map_err(Error::ObjectStoreParsing)?;
make_object_store(obj_store_config).map_err(Error::ObjectStoreParsing)?;

let trace_exporter = config.tracing_config.build()?;

Expand Down Expand Up @@ -324,8 +326,11 @@ pub async fn command(config: Config) -> Result<()> {

let last_cache = LastCacheProvider::new_from_catalog(&catalog.clone_inner())
.map_err(Error::InitializeLastCache)?;

info!(instance_id = ?catalog.instance_id(), "Catalog initialized with");

let _telemetry_store =
setup_telemetry_store(&config.object_store_config, catalog.instance_id(), num_cpus).await;

let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
Expand Down Expand Up @@ -372,6 +377,30 @@ pub async fn command(config: Config) -> Result<()> {
Ok(())
}

async fn setup_telemetry_store(
object_store_config: &ObjectStoreConfig,
instance_id: Arc<str>,
num_cpus: usize,
) -> Arc<TelemetryStore> {
let os = std::env::consts::OS;
let influxdb_pkg_version = env!("CARGO_PKG_VERSION");
let influxdb_pkg_name = env!("CARGO_PKG_NAME");
let influx_version = format!("{}-{}", influxdb_pkg_name, influxdb_pkg_version);
let obj_store_type = object_store_config
.object_store
.unwrap_or(ObjectStoreType::Memory);
let storage_type = obj_store_type.as_str();

TelemetryStore::new(
instance_id,
Arc::from(os),
Arc::from(influx_version),
Arc::from(storage_type),
num_cpus,
)
.await
}

fn parse_datafusion_config(
s: &str,
) -> Result<HashMap<String, String>, Box<dyn std::error::Error + Send + Sync + 'static>> {
Expand Down
3 changes: 3 additions & 0 deletions influxdb3_telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ futures.workspace = true
futures-util.workspace = true
reqwest.workspace = true
parking_lot.workspace = true
sysinfo.workspace = true
num.workspace = true
thiserror.workspace = true

[dev-dependencies]
test-log.workspace = true
Expand Down
63 changes: 63 additions & 0 deletions influxdb3_telemetry/src/cpu_mem_sampler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::{sync::Arc, time::Duration};

use observability_deps::tracing::debug;
use sysinfo::{ProcessRefreshKind, System};

use crate::Result;
use crate::{store::TelemetryStore, TelemetryError};

struct CpuAndMemorySampler {
system: System,
}

impl CpuAndMemorySampler {
pub fn new(system: System) -> Self {
Self { system }
}

pub fn get_cpu_and_mem_used(&mut self) -> Result<(f32, u64)> {
let pid = sysinfo::get_current_pid().map_err(TelemetryError::CannotGetPid)?;
self.system.refresh_pids_specifics(
&[pid],
ProcessRefreshKind::new()
.with_cpu()
.with_memory()
.with_disk_usage(),
);

let process = self
.system
.process(pid)
.unwrap_or_else(|| panic!("cannot get process with pid: {}", pid));

let memory_used = process.memory();
let cpu_used = process.cpu_usage();

debug!(
mem_used = ?memory_used,
cpu_used = ?cpu_used,
"trying to sample data for cpu/memory");

Ok((cpu_used, memory_used))
}
}

pub(crate) async fn sample_cpu_and_memory(
store: Arc<TelemetryStore>,
duration_secs: u64,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut sampler = CpuAndMemorySampler::new(System::new());

// sample every minute
let mut interval = tokio::time::interval(Duration::from_secs(duration_secs));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
interval.tick().await;
if let Ok((cpu_used, memory_used)) = sampler.get_cpu_and_mem_used() {
store.add_cpu_and_memory(cpu_used, memory_used);
}
}
})
}
150 changes: 14 additions & 136 deletions influxdb3_telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,142 +1,20 @@
use observability_deps::tracing::error;
use serde::Serialize;
use std::{sync::Arc, time::Duration};
mod cpu_mem_sampler;
mod sender;
mod stats;
pub mod store;

/// This store is responsible for holding all the stats which
/// will be sent in the background to the server.
pub struct TelemetryStore {
inner: parking_lot::Mutex<TelemetryStoreInner>,
}

impl TelemetryStore {
pub async fn new(
instance_id: String,
os: String,
influx_version: String,
storage_type: String,
cores: u32,
) -> Arc<Self> {
let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores);
let store = Arc::new(TelemetryStore {
inner: parking_lot::Mutex::new(inner),
});
send_telemetry_in_background(store.clone()).await;
store
}

pub fn add_cpu_utilization(&self, value: u32) {
let mut inner_store = self.inner.lock();
inner_store.cpu_utilization_percent = Some(value);
}

pub fn snapshot(&self) -> ExternalTelemetry {
let inner_store = self.inner.lock();
inner_store.snapshot()
}
}

struct TelemetryStoreInner {
instance_id: String,
os: String,
influx_version: String,
storage_type: String,
cores: u32,
// just for explanation
cpu_utilization_percent: Option<u32>,
}
use thiserror::Error;

impl TelemetryStoreInner {
pub fn new(
instance_id: String,
os: String,
influx_version: String,
storage_type: String,
cores: u32,
) -> Self {
TelemetryStoreInner {
os,
instance_id,
influx_version,
storage_type,
cores,
cpu_utilization_percent: None,
}
}
#[derive(Debug, Error)]
pub enum TelemetryError {
#[error("cannot serialize to JSON: {0}")]
CannotSerializeJson(#[from] serde_json::Error),

pub fn snapshot(&self) -> ExternalTelemetry {
ExternalTelemetry {
os: self.os.clone(),
version: self.influx_version.clone(),
instance_id: self.instance_id.clone(),
storage_type: self.storage_type.clone(),
cores: self.cores,
cpu_utilization_percent: self.cpu_utilization_percent,
}
}
}
#[error("failed to get pid: {0}")]
CannotGetPid(&'static str),

#[derive(Serialize)]
pub struct ExternalTelemetry {
pub os: String,
pub version: String,
pub storage_type: String,
pub instance_id: String,
pub cores: u32,
pub cpu_utilization_percent: Option<u32>,
#[error("cannot send telemetry: {0}")]
CannotSendToTelemetryServer(#[from] reqwest::Error),
}

async fn send_telemetry_in_background(store: Arc<TelemetryStore>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let client = reqwest::Client::new();
// TODO: Pass in the duration rather than hardcode it to 1hr
let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
interval.tick().await;
let telemetry = store.snapshot();
let maybe_json = serde_json::to_vec(&telemetry);
match maybe_json {
Ok(json) => {
// TODO: wire it up to actual telemetry sender
let _res = client
.post("https://telemetry.influxdata.endpoint.com")
.body(json)
.send()
.await;
}
Err(e) => {
error!(error = ?e, "Cannot send telemetry");
}
}
}
})
}

#[cfg(test)]
mod tests {
use super::*;

#[test_log::test(tokio::test)]
async fn test_telemetry_handle_creation() {
// create store
let store: Arc<TelemetryStore> = TelemetryStore::new(
"some-instance-id".to_owned(),
"Linux".to_owned(),
"OSS-v3.0".to_owned(),
"Memory".to_owned(),
10,
)
.await;
// check snapshot
let snapshot = store.snapshot();
assert_eq!("some-instance-id", snapshot.instance_id);

// add cpu utilization
store.add_cpu_utilization(89);

// check snapshot again
let snapshot = store.snapshot();
assert_eq!(Some(89), snapshot.cpu_utilization_percent);
}
}
pub type Result<T, E = TelemetryError> = std::result::Result<T, E>;
56 changes: 56 additions & 0 deletions influxdb3_telemetry/src/sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use std::{sync::Arc, time::Duration};

use observability_deps::tracing::{debug, error};

use crate::store::{TelemetryPayload, TelemetryStore};
use crate::{Result, TelemetryError};

pub(crate) struct TelemetrySender {
client: reqwest::Client,
req_path: String,
}

impl TelemetrySender {
pub fn new(client: reqwest::Client, req_path: String) -> Self {
Self { client, req_path }
}

pub async fn try_sending(&self, telemetry: &TelemetryPayload) -> Result<()> {
debug!(telemetry = ?telemetry, "trying to send data to telemetry server");
let json = serde_json::to_vec(&telemetry).map_err(TelemetryError::CannotSerializeJson)?;
self.client
.post(self.req_path.as_str())
.body(json)
.send()
.await
.map_err(TelemetryError::CannotSendToTelemetryServer)?;
debug!("Successfully sent telemetry data to server");
Ok(())
}
}

pub(crate) async fn send_telemetry_in_background(
store: Arc<TelemetryStore>,
duration_secs: u64,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let telem_sender = TelemetrySender::new(
reqwest::Client::new(),
"https://telemetry.influxdata.foo.com".to_owned(),
);
let mut interval = tokio::time::interval(Duration::from_secs(duration_secs));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
interval.tick().await;
let telemetry = store.snapshot();
debug!(telemetry = ?telemetry, "trying to send data to telemetry server");
if let Err(e) = telem_sender.try_sending(&telemetry).await {
error!(error = ?e, "Cannot send telemetry");
}
// TODO: if we tried sending and failed, we currently still reset the
// metrics, is this ok? Should there be a retry mechanism?
store.reset_metrics();
}
})
}
Loading

0 comments on commit 969f845

Please sign in to comment.