Skip to content

Commit

Permalink
feat(telemetry): adds reads and writes
Browse files Browse the repository at this point in the history
- instrumented code to get read and write measurement
- introduced EventsBucket for collection of reads/writes
- sampler now samples every minute for all metrics (including
  reads/writes)
- other tidy ups

closes: #25372
  • Loading branch information
praveen-influx committed Oct 1, 2024
1 parent c4514bf commit 0930477
Show file tree
Hide file tree
Showing 18 changed files with 627 additions and 108 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,4 @@ serde_json.workspace = true
test_helpers.workspace = true
tonic.workspace = true
tower.workspace = true
test-log.workspace = true
12 changes: 9 additions & 3 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,6 @@ pub async fn command(config: Config) -> Result<()> {
)
.with_jaeger_debug_name(config.tracing_config.traces_jaeger_debug_name);

let common_state =
CommonServerState::new(Arc::clone(&metrics), trace_exporter, trace_header_parser)?;
let persister = Arc::new(Persister::new(
Arc::clone(&object_store),
config.host_identifier_prefix,
Expand All @@ -332,9 +330,16 @@ pub async fn command(config: Config) -> Result<()> {
.map_err(Error::InitializeLastCache)?;
info!(instance_id = ?catalog.instance_id(), "Catalog initialized with");

let _telemetry_store =
let telemetry_store =
setup_telemetry_store(&config.object_store_config, catalog.instance_id(), num_cpus).await;

let common_state = CommonServerState::new(
Arc::clone(&metrics),
trace_exporter,
trace_header_parser,
Arc::clone(&telemetry_store),
)?;

let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
Expand All @@ -356,6 +361,7 @@ pub async fn command(config: Config) -> Result<()> {
Arc::new(config.datafusion_config),
10,
config.query_log_size,
Arc::clone(&telemetry_store),
));

let listener = TcpListener::bind(*config.http_bind_address)
Expand Down
2 changes: 1 addition & 1 deletion influxdb3/tests/server/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ async fn auth() {
);
}

#[tokio::test]
#[test_log::test(tokio::test)]
async fn auth_grpc() {
const HASHED_TOKEN: &str = "5315f0c4714537843face80cca8c18e27ce88e31e9be7a5232dc4dc8444f27c0227a9bd64831d3ab58f652bd0262dd8558dd08870ac9e5c650972ce9e4259439";
const TOKEN: &str = "apiv3_mp75KQAhbqv0GeQXk8MPuZ3ztaLEaR5JzS8iifk1FwuroSVyXXyrJK1c4gEr1kHkmbgzDV-j3MvQpaIMVJBAiA";
Expand Down
2 changes: 1 addition & 1 deletion influxdb3/tests/server/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use test_helpers::assert_contains;
use crate::collect_stream;
use crate::TestServer;

#[tokio::test]
#[test_log::test(tokio::test)]
async fn flight() -> Result<(), influxdb3_client::Error> {
let server = TestServer::spawn().await;

Expand Down
1 change: 1 addition & 0 deletions influxdb3_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_wal = { path = "../influxdb3_wal"}
influxdb3_write = { path = "../influxdb3_write" }
iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }

# crates.io Dependencies
anyhow.workspace = true
Expand Down
6 changes: 6 additions & 0 deletions influxdb3_server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@ where
.await?
};

let num_lines = result.line_count;
let payload_size = body.len();
self.common_state
.telemetry_store
.add_write_metrics(num_lines, payload_size);

if result.invalid_lines.is_empty() {
Ok(Response::new(Body::empty()))
} else {
Expand Down
18 changes: 16 additions & 2 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datafusion::execution::SendableRecordBatchStream;
use hyper::server::conn::AddrIncoming;
use hyper::server::conn::Http;
use hyper::service::service_fn;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_write::persister::Persister;
use iox_query::QueryDatabase;
use iox_query_params::StatementParams;
Expand Down Expand Up @@ -79,18 +80,21 @@ pub struct CommonServerState {
metrics: Arc<metric::Registry>,
trace_exporter: Option<Arc<trace_exporters::export::AsyncExporter>>,
trace_header_parser: TraceHeaderParser,
telemetry_store: Arc<TelemetryStore>,
}

impl CommonServerState {
pub fn new(
metrics: Arc<metric::Registry>,
trace_exporter: Option<Arc<trace_exporters::export::AsyncExporter>>,
trace_header_parser: TraceHeaderParser,
telemetry_store: Arc<TelemetryStore>,
) -> Result<Self> {
Ok(Self {
metrics,
trace_exporter,
trace_header_parser,
telemetry_store,
})
}

Expand Down Expand Up @@ -178,6 +182,7 @@ where
Arc::clone(&server.http.query_executor),
Some(server.authorizer()),
));

let rest_service = hyper::service::make_service_fn(|_| {
let http_server = Arc::clone(&server.http);
let service = service_fn(move |req: hyper::Request<hyper::Body>| {
Expand Down Expand Up @@ -228,6 +233,7 @@ mod tests {
use datafusion::parquet::data_type::AsBytes;
use hyper::{body, Body, Client, Request, Response, StatusCode};
use influxdb3_catalog::catalog::Catalog;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::WalConfig;
use influxdb3_write::last_cache::LastCacheProvider;
use influxdb3_write::parquet_cache::test_cached_obj_store_and_oracle;
Expand Down Expand Up @@ -742,8 +748,14 @@ mod tests {
async fn setup_server(start_time: i64) -> (String, CancellationToken, Arc<dyn WriteBuffer>) {
let trace_header_parser = trace_http::ctx::TraceHeaderParser::new();
let metrics = Arc::new(metric::Registry::new());
let common_state =
crate::CommonServerState::new(Arc::clone(&metrics), None, trace_header_parser).unwrap();
let dummy_telem_store = TelemetryStore::new_without_background_runners();
let common_state = crate::CommonServerState::new(
Arc::clone(&metrics),
None,
trace_header_parser,
dummy_telem_store,
)
.unwrap();
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::new());
let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store);
let parquet_store =
Expand All @@ -764,6 +776,7 @@ mod tests {
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(start_time)));
let dummy_host_id = Arc::from("dummy-host-id");
let instance_id = Arc::from("dummy-instance-id");
let telemetry_store = TelemetryStore::new_without_background_runners();

let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
Expand All @@ -786,6 +799,7 @@ mod tests {
Arc::new(HashMap::new()),
10,
10,
telemetry_store,
);

// bind to port 0 will assign a random available port:
Expand Down
10 changes: 10 additions & 0 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use datafusion::prelude::Expr;
use datafusion_util::config::DEFAULT_SCHEMA;
use datafusion_util::MemoryStream;
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema};
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_write::last_cache::LastCacheFunction;
use influxdb3_write::WriteBuffer;
use iox_query::exec::{Executor, IOxSessionContext, QueryConfig};
Expand Down Expand Up @@ -54,9 +55,11 @@ pub struct QueryExecutorImpl {
datafusion_config: Arc<HashMap<String, String>>,
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
query_log: Arc<QueryLog>,
telemetry_store: Arc<TelemetryStore>,
}

impl QueryExecutorImpl {
#[allow(clippy::too_many_arguments)]
pub fn new(
catalog: Arc<Catalog>,
write_buffer: Arc<dyn WriteBuffer>,
Expand All @@ -65,6 +68,7 @@ impl QueryExecutorImpl {
datafusion_config: Arc<HashMap<String, String>>,
concurrent_query_limit: usize,
query_log_size: usize,
telemetry_store: Arc<TelemetryStore>,
) -> Self {
let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new(
&metrics,
Expand All @@ -83,6 +87,7 @@ impl QueryExecutorImpl {
datafusion_config,
query_execution_semaphore,
query_log,
telemetry_store,
}
}
}
Expand Down Expand Up @@ -146,6 +151,8 @@ impl QueryExecutor for QueryExecutorImpl {
let token = token.permit();

debug!("execute stream of query results");
self.telemetry_store.update_num_queries();

match ctx.execute_stream(Arc::clone(&plan)).await {
Ok(query_results) => {
token.success();
Expand Down Expand Up @@ -589,6 +596,7 @@ mod tests {
use datafusion::{assert_batches_sorted_eq, error::DataFusionError};
use futures::TryStreamExt;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
last_cache::LastCacheProvider, parquet_cache::test_cached_obj_store_and_oracle,
Expand Down Expand Up @@ -656,6 +664,7 @@ mod tests {
);
let metrics = Arc::new(Registry::new());
let df_config = Arc::new(Default::default());
let dummy_telem_store = TelemetryStore::new_without_background_runners();
let query_executor = QueryExecutorImpl::new(
write_buffer.catalog(),
Arc::clone(&write_buffer),
Expand All @@ -664,6 +673,7 @@ mod tests {
df_config,
10,
10,
dummy_telem_store,
);

(write_buffer, query_executor, time_provider)
Expand Down
2 changes: 2 additions & 0 deletions influxdb3_telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ parking_lot.workspace = true
sysinfo.workspace = true
num.workspace = true
thiserror.workspace = true
tonic.workspace = true
tower.workspace = true

[dev-dependencies]
test-log.workspace = true
Expand Down
7 changes: 7 additions & 0 deletions influxdb3_telemetry/proptest-regressions/stats.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Seeds for failure cases proptest has generated in the past. It is
# automatically read and these particular cases re-run before any
# novel cases are generated.
#
# It is recommended to check this file in to source control so that
# everyone who runs the test benefits from these saved cases.
cc 4ce4c13bc33ca25f320b2bd5cda9aa233e8cc798e809301cccf38eddf22fa7c0 # shrinks to min = 0, max = 0, curr_avg = 4339, num_samples = 0, new_value = 0
113 changes: 113 additions & 0 deletions influxdb3_telemetry/src/bucket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use observability_deps::tracing::debug;

use crate::stats::Stats;

#[derive(Debug, Default)]
pub(crate) struct EventsBucket {
pub writes: PerMinuteWrites,
pub queries: PerMinuteReads,

pub(crate) num_writes: usize,
pub(crate) num_queries: usize,
}

impl EventsBucket {
pub fn new() -> Self {
Self::default()
}

pub fn add_write_sample(&mut self, num_lines: usize, size_bytes: usize) {
self.writes.add_sample(num_lines, size_bytes);
self.num_writes += 1;
}

pub fn update_num_queries(&mut self) {
self.queries.add_sample();
self.num_queries += 1;
}

pub(crate) fn reset(&mut self) {
*self = EventsBucket::default();
debug!(
write_bucket = ?self,
"Resetting write bucket"
);
}
}

#[derive(Debug, Default)]
pub(crate) struct PerMinuteWrites {
pub lines: Stats<u64>,
pub size_bytes: Stats<u64>,
}

impl PerMinuteWrites {
pub fn add_sample(&mut self, num_lines: usize, size_bytes: usize) -> Option<()> {
self.lines.update(num_lines as u64);
self.size_bytes.update(size_bytes as u64)?;
Some(())
}
}

#[derive(Debug, Default)]
pub(crate) struct PerMinuteReads {
pub num_queries: Stats<u64>,
}

impl PerMinuteReads {
pub fn add_sample(&mut self) -> Option<()> {
self.num_queries.update(1);
Some(())
}
}

#[cfg(test)]
mod tests {
use observability_deps::tracing::info;

use super::*;

#[test_log::test(test)]
fn test_bucket_for_writes() {
let mut bucket = EventsBucket::new();
info!(bucket = ?bucket, "Events bucket empty");
bucket.add_write_sample(1, 1);
info!(bucket = ?bucket, "Events bucket added 1");
bucket.add_write_sample(2, 2);
info!(bucket = ?bucket, "Events bucket added 2");
bucket.add_write_sample(3, 3);
info!(bucket = ?bucket, "Events bucket added 3");
bucket.add_write_sample(4, 4);
info!(bucket = ?bucket, "Events bucket added 4");

assert_eq!(4, bucket.num_writes);
assert_eq!(1, bucket.writes.lines.avg);
assert_eq!(1, bucket.writes.size_bytes.avg);

assert_eq!(1, bucket.writes.lines.min);
assert_eq!(1, bucket.writes.size_bytes.min);

assert_eq!(4, bucket.writes.lines.max);
assert_eq!(4, bucket.writes.size_bytes.max);

bucket.add_write_sample(20, 20);
info!(bucket = ?bucket, "Events bucket added 20");
assert_eq!(4, bucket.writes.lines.avg);
assert_eq!(4, bucket.writes.size_bytes.avg);

bucket.update_num_queries();
bucket.update_num_queries();
bucket.update_num_queries();
assert_eq!(3, bucket.num_queries);

bucket.reset();
assert_eq!(0, bucket.writes.lines.min);
assert_eq!(0, bucket.writes.lines.max);
assert_eq!(0, bucket.writes.lines.avg);
assert_eq!(0, bucket.writes.size_bytes.min);
assert_eq!(0, bucket.writes.size_bytes.max);
assert_eq!(0, bucket.writes.size_bytes.avg);
assert_eq!(0, bucket.num_writes);
assert_eq!(0, bucket.num_queries);
}
}
4 changes: 3 additions & 1 deletion influxdb3_telemetry/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mod cpu_mem_sampler;
mod bucket;
mod metrics;
mod sampler;
mod sender;
mod stats;
pub mod store;
Expand Down
Loading

0 comments on commit 0930477

Please sign in to comment.