Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry): static values, cpu and mem metrics gathering #25380

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pretty_assertions = "1.4.0"
prost = "0.12.6"
prost-build = "0.12.6"
prost-types = "0.12.6"
proptest = { version = "1", default-features = false, features = ["std"] }
rand = "0.8.5"
reqwest = { version = "0.11.24", default-features = false, features = ["rustls-tls", "stream", "json"] }
secrecy = "0.8.0"
Expand All @@ -110,6 +111,7 @@ unicode-segmentation = "1.11.0"
url = "2.5.0"
urlencoding = "1.1"
uuid = { version = "1", features = ["v4"] }
num = { version = "0.4.3" }

# Core.git crates we depend on
arrow_util = { git = "https://github.com/influxdata/influxdb3_core", rev = "1eaa4ed5ea147bc24db98d9686e457c124dfd5b7"}
Expand Down
33 changes: 31 additions & 2 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use clap_blocks::{
memory_size::MemorySize,
object_store::{make_object_store, ObjectStoreConfig},
object_store::{make_object_store, ObjectStoreConfig, ObjectStoreType},
socket_addr::SocketAddr,
tokio::TokioDatafusionConfig,
};
Expand All @@ -14,6 +14,7 @@ use influxdb3_server::{
auth::AllOrNothingAuthorizer, builder::ServerBuilder, query_executor::QueryExecutorImpl, serve,
CommonServerState,
};
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
last_cache::LastCacheProvider, parquet_cache::create_cached_obj_store_and_oracle,
Expand Down Expand Up @@ -329,8 +330,11 @@ pub async fn command(config: Config) -> Result<()> {

let last_cache = LastCacheProvider::new_from_catalog(&catalog.clone_inner())
.map_err(Error::InitializeLastCache)?;

info!(instance_id = ?catalog.instance_id(), "Catalog initialized with");

let _telemetry_store =
setup_telemetry_store(&config.object_store_config, catalog.instance_id(), num_cpus).await;

let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
Expand Down Expand Up @@ -378,6 +382,31 @@ pub async fn command(config: Config) -> Result<()> {
Ok(())
}

async fn setup_telemetry_store(
object_store_config: &ObjectStoreConfig,
instance_id: Arc<str>,
num_cpus: usize,
) -> Arc<TelemetryStore> {
let os = std::env::consts::OS;
let influxdb_pkg_version = env!("CARGO_PKG_VERSION");
praveen-influx marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@praveen-influx - sorry didn't catch this during review, but I think it would be worth re-using from the influxdb3_process workspace crate for some of these environment variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can use it for the version as built here.

I'll probably need other lookup for the package name CARGO_PKG_NAME - is it worth adding that lookup to the influxdb3_process crate as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth adding that lookup to the influxdb3_process crate as well?

I think it would be appropriate. It sounds like we may also need to make some adjustments in influxdb3_process for differentiating Pro vs. OSS but that is TBD pending discussion with the perf team by the looks of it.

let influxdb_pkg_name = env!("CARGO_PKG_NAME");
// Following should show influxdb3-0.1.0
let influx_version = format!("{}-{}", influxdb_pkg_name, influxdb_pkg_version);
praveen-influx marked this conversation as resolved.
Show resolved Hide resolved
let obj_store_type = object_store_config
.object_store
.unwrap_or(ObjectStoreType::Memory);
let storage_type = obj_store_type.as_str();

TelemetryStore::new(
instance_id,
Arc::from(os),
Arc::from(influx_version),
Arc::from(storage_type),
num_cpus,
)
.await
}

fn parse_datafusion_config(
s: &str,
) -> Result<HashMap<String, String>, Box<dyn std::error::Error + Send + Sync + 'static>> {
Expand Down
4 changes: 4 additions & 0 deletions influxdb3_telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ futures.workspace = true
futures-util.workspace = true
reqwest.workspace = true
parking_lot.workspace = true
sysinfo.workspace = true
num.workspace = true
thiserror.workspace = true

[dev-dependencies]
test-log.workspace = true
proptest.workspace = true

63 changes: 63 additions & 0 deletions influxdb3_telemetry/src/cpu_mem_sampler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
use std::{sync::Arc, time::Duration};

use observability_deps::tracing::debug;
use sysinfo::{ProcessRefreshKind, System};

use crate::Result;
use crate::{store::TelemetryStore, TelemetryError};

struct CpuAndMemorySampler {
system: System,
}

impl CpuAndMemorySampler {
pub fn new(system: System) -> Self {
Self { system }
}

pub fn get_cpu_and_mem_used(&mut self) -> Result<(f32, u64)> {
let pid = sysinfo::get_current_pid().map_err(TelemetryError::CannotGetPid)?;
self.system.refresh_pids_specifics(
&[pid],
ProcessRefreshKind::new()
.with_cpu()
.with_memory()
.with_disk_usage(),
);

let process = self
.system
.process(pid)
.unwrap_or_else(|| panic!("cannot get process with pid: {}", pid));

let memory_used = process.memory();
let cpu_used = process.cpu_usage();

debug!(
mem_used = ?memory_used,
cpu_used = ?cpu_used,
"trying to sample data for cpu/memory");

Ok((cpu_used, memory_used))
}
}

pub(crate) async fn sample_cpu_and_memory(
store: Arc<TelemetryStore>,
duration_secs: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut sampler = CpuAndMemorySampler::new(System::new());

// sample every minute
let mut interval = tokio::time::interval(duration_secs);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
interval.tick().await;
if let Ok((cpu_used, memory_used)) = sampler.get_cpu_and_mem_used() {
store.add_cpu_and_memory(cpu_used, memory_used);
}
}
})
}
150 changes: 14 additions & 136 deletions influxdb3_telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,142 +1,20 @@
use observability_deps::tracing::error;
use serde::Serialize;
use std::{sync::Arc, time::Duration};
mod cpu_mem_sampler;
mod sender;
mod stats;
pub mod store;

/// This store is responsible for holding all the stats which
/// will be sent in the background to the server.
pub struct TelemetryStore {
inner: parking_lot::Mutex<TelemetryStoreInner>,
}

impl TelemetryStore {
pub async fn new(
instance_id: String,
os: String,
influx_version: String,
storage_type: String,
cores: u32,
) -> Arc<Self> {
let inner = TelemetryStoreInner::new(instance_id, os, influx_version, storage_type, cores);
let store = Arc::new(TelemetryStore {
inner: parking_lot::Mutex::new(inner),
});
send_telemetry_in_background(store.clone()).await;
store
}

pub fn add_cpu_utilization(&self, value: u32) {
let mut inner_store = self.inner.lock();
inner_store.cpu_utilization_percent = Some(value);
}

pub fn snapshot(&self) -> ExternalTelemetry {
let inner_store = self.inner.lock();
inner_store.snapshot()
}
}

struct TelemetryStoreInner {
instance_id: String,
os: String,
influx_version: String,
storage_type: String,
cores: u32,
// just for explanation
cpu_utilization_percent: Option<u32>,
}
use thiserror::Error;

impl TelemetryStoreInner {
pub fn new(
instance_id: String,
os: String,
influx_version: String,
storage_type: String,
cores: u32,
) -> Self {
TelemetryStoreInner {
os,
instance_id,
influx_version,
storage_type,
cores,
cpu_utilization_percent: None,
}
}
#[derive(Debug, Error)]
pub enum TelemetryError {
#[error("cannot serialize to JSON: {0}")]
CannotSerializeJson(#[from] serde_json::Error),

pub fn snapshot(&self) -> ExternalTelemetry {
ExternalTelemetry {
os: self.os.clone(),
version: self.influx_version.clone(),
instance_id: self.instance_id.clone(),
storage_type: self.storage_type.clone(),
cores: self.cores,
cpu_utilization_percent: self.cpu_utilization_percent,
}
}
}
#[error("failed to get pid: {0}")]
CannotGetPid(&'static str),

#[derive(Serialize)]
pub struct ExternalTelemetry {
pub os: String,
pub version: String,
pub storage_type: String,
pub instance_id: String,
pub cores: u32,
pub cpu_utilization_percent: Option<u32>,
#[error("cannot send telemetry: {0}")]
CannotSendToTelemetryServer(#[from] reqwest::Error),
}

async fn send_telemetry_in_background(store: Arc<TelemetryStore>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let client = reqwest::Client::new();
// TODO: Pass in the duration rather than hardcode it to 1hr
let mut interval = tokio::time::interval(Duration::from_secs(60 * 60));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
interval.tick().await;
let telemetry = store.snapshot();
let maybe_json = serde_json::to_vec(&telemetry);
match maybe_json {
Ok(json) => {
// TODO: wire it up to actual telemetry sender
let _res = client
.post("https://telemetry.influxdata.endpoint.com")
.body(json)
.send()
.await;
}
Err(e) => {
error!(error = ?e, "Cannot send telemetry");
}
}
}
})
}

#[cfg(test)]
mod tests {
use super::*;

#[test_log::test(tokio::test)]
async fn test_telemetry_handle_creation() {
// create store
let store: Arc<TelemetryStore> = TelemetryStore::new(
"some-instance-id".to_owned(),
"Linux".to_owned(),
"OSS-v3.0".to_owned(),
"Memory".to_owned(),
10,
)
.await;
// check snapshot
let snapshot = store.snapshot();
assert_eq!("some-instance-id", snapshot.instance_id);

// add cpu utilization
store.add_cpu_utilization(89);

// check snapshot again
let snapshot = store.snapshot();
assert_eq!(Some(89), snapshot.cpu_utilization_percent);
}
}
pub type Result<T, E = TelemetryError> = std::result::Result<T, E>;
55 changes: 55 additions & 0 deletions influxdb3_telemetry/src/sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::{sync::Arc, time::Duration};

use observability_deps::tracing::{debug, error};

use crate::store::{TelemetryPayload, TelemetryStore};
use crate::{Result, TelemetryError};

pub(crate) struct TelemetrySender {
client: reqwest::Client,
req_path: String,
}

impl TelemetrySender {
pub fn new(client: reqwest::Client, req_path: String) -> Self {
Self { client, req_path }
}

pub async fn try_sending(&self, telemetry: &TelemetryPayload) -> Result<()> {
debug!(telemetry = ?telemetry, "trying to send data to telemetry server");
let json = serde_json::to_vec(&telemetry).map_err(TelemetryError::CannotSerializeJson)?;
self.client
.post(self.req_path.as_str())
.body(json)
.send()
.await
.map_err(TelemetryError::CannotSendToTelemetryServer)?;
debug!("Successfully sent telemetry data to server");
Ok(())
}
}

pub(crate) async fn send_telemetry_in_background(
store: Arc<TelemetryStore>,
duration_secs: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let telem_sender = TelemetrySender::new(
reqwest::Client::new(),
"https://telemetry.influxdata.foo.com".to_owned(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This getting updated in a follow on PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - this will be updated in one of the follow on PRs

);
let mut interval = tokio::time::interval(duration_secs);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
interval.tick().await;
let telemetry = store.snapshot();
if let Err(e) = telem_sender.try_sending(&telemetry).await {
error!(error = ?e, "Cannot send telemetry");
}
// if we tried sending and failed, we currently still reset the
// metrics, it is ok to miss few samples
store.reset_metrics();
}
})
}
Loading