Skip to content

Commit

Permalink
feat: enable parquet cache config through CLI (#25415)
Browse files Browse the repository at this point in the history
  • Loading branch information
hiltontj authored Oct 1, 2024
1 parent a05c3fe commit 83aca43
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 21 deletions.
93 changes: 80 additions & 13 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Entrypoint for InfluxDB 3.0 Edge Server

use anyhow::{bail, Context};
use clap_blocks::{
memory_size::MemorySize,
object_store::{make_object_store, ObjectStoreConfig, ObjectStoreType},
Expand All @@ -22,11 +23,11 @@ use influxdb3_write::{
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::SystemProvider;
use object_store::DynObjectStore;
use object_store::ObjectStore;
use observability_deps::tracing::*;
use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::{collections::HashMap, path::Path};
use std::{collections::HashMap, path::Path, str::FromStr};
use std::{num::NonZeroUsize, sync::Arc};
use thiserror::Error;
use tokio::net::TcpListener;
Expand Down Expand Up @@ -216,6 +217,69 @@ pub struct Config {
/// for any hosts that share the same object store configuration, i.e., the same bucket.
#[clap(long = "host-id", env = "INFLUXDB3_HOST_IDENTIFIER_PREFIX", action)]
pub host_identifier_prefix: String,

/// The size of the in-memory Parquet cache in megabytes (MB).
#[clap(
long = "parquet-mem-cache-size-mb",
env = "INFLUXDB3_PARQUET_MEM_CACHE_SIZE_MB",
default_value = "1000",
action
)]
pub parquet_mem_cache_size: usize,

/// The percentage of entries to prune during a prune operation on the in-memory Parquet cache.
///
/// This must be a number between 0 and 1.
#[clap(
long = "parquet-mem-cache-prune-percentage",
env = "INFLUXDB3_PARQUET_MEM_CACHE_PRUNE_PERCENTAGE",
default_value = "0.1",
action
)]
pub parquet_mem_cache_prune_percentage: PrunePercent,

/// The interval on which to check if the in-memory Parquet cache needs to be pruned.
///
/// Enter as a human-readable time, e.g., "1s", "100ms", "1m", etc.
#[clap(
long = "parquet-mem-cache-prune-interval",
env = "INFLUXDB3_PARQUET_MEM_CACHE_PRUNE_INTERVAL",
default_value = "1s",
action
)]
pub parquet_mem_cache_prune_interval: humantime::Duration,

/// Disable the in-memory Parquet cache. By default, the cache is enabled.
#[clap(
long = "disable-parquet-mem-cache",
env = "INFLUXDB3_DISABLE_PARQUET_MEM_CACHE",
default_value_t = false,
action
)]
pub disable_parquet_mem_cache: bool,
}

#[derive(Debug, Clone, Copy)]
pub struct PrunePercent(f64);

impl From<PrunePercent> for f64 {
fn from(value: PrunePercent) -> Self {
value.0
}
}

impl FromStr for PrunePercent {
type Err = anyhow::Error;

fn from_str(s: &str) -> std::prelude::v1::Result<Self, Self::Err> {
let p = s
.parse::<f64>()
.context("failed to parse prune percent as f64")?;
if p <= 0.0 || p >= 1.0 {
bail!("prune percent must be between 0 and 1");
}
Ok(Self(p))
}
}

/// If `p` does not exist, try to create it as a directory.
Expand Down Expand Up @@ -258,19 +322,22 @@ pub async fn command(config: Config) -> Result<()> {
// Construct a token to trigger clean shutdown
let frontend_shutdown = CancellationToken::new();

let object_store: Arc<DynObjectStore> =
let object_store: Arc<dyn ObjectStore> =
make_object_store(&config.object_store_config).map_err(Error::ObjectStoreParsing)?;
let time_provider = Arc::new(SystemProvider::new());

// TODO(trevor): make the cache capacity and prune percent configurable/optional:
let cache_capacity = 1024 * 1024 * 1024;
let prune_percent = 0.1;
let (object_store, parquet_cache) = create_cached_obj_store_and_oracle(
object_store,
Arc::clone(&time_provider) as _,
cache_capacity,
prune_percent,
);
let (object_store, parquet_cache) = if !config.disable_parquet_mem_cache {
let (object_store, parquet_cache) = create_cached_obj_store_and_oracle(
object_store,
Arc::clone(&time_provider) as _,
config.parquet_mem_cache_size * 1_000,
config.parquet_mem_cache_prune_percentage.into(),
config.parquet_mem_cache_prune_interval.into(),
);
(object_store, Some(parquet_cache))
} else {
(object_store, None)
};

let trace_exporter = config.tracing_config.build()?;

Expand Down Expand Up @@ -349,7 +416,7 @@ pub async fn command(config: Config) -> Result<()> {
Arc::<SystemProvider>::clone(&time_provider),
Arc::clone(&exec),
wal_config,
Some(parquet_cache),
parquet_cache,
)
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?,
Expand Down
26 changes: 18 additions & 8 deletions influxdb3_write/src/parquet_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,10 @@ impl MemCacheOracle {
/// This spawns two background tasks:
/// * one to handle registered [`CacheRequest`]s
/// * one to prune deleted and un-needed cache entries on an interval
// TODO(trevor): this should be more configurable, e.g., channel size, prune interval
fn new(mem_cached_store: Arc<MemCachedObjectStore>) -> Self {
fn new(mem_cached_store: Arc<MemCachedObjectStore>, prune_interval: Duration) -> Self {
let (cache_request_tx, cache_request_rx) = channel(CACHE_REQUEST_BUFFER_SIZE);
background_cache_request_handler(Arc::clone(&mem_cached_store), cache_request_rx);
background_cache_pruner(mem_cached_store);
background_cache_pruner(mem_cached_store, prune_interval);
Self { cache_request_tx }
}
}
Expand All @@ -104,14 +103,15 @@ pub fn create_cached_obj_store_and_oracle(
time_provider: Arc<dyn TimeProvider>,
cache_capacity: usize,
prune_percent: f64,
prune_interval: Duration,
) -> (Arc<dyn ObjectStore>, Arc<dyn ParquetCacheOracle>) {
let store = Arc::new(MemCachedObjectStore::new(
object_store,
cache_capacity,
time_provider,
prune_percent,
));
let oracle = Arc::new(MemCacheOracle::new(Arc::clone(&store)));
let oracle = Arc::new(MemCacheOracle::new(Arc::clone(&store), prune_interval));
(store, oracle)
}

Expand All @@ -120,7 +120,13 @@ pub fn test_cached_obj_store_and_oracle(
object_store: Arc<dyn ObjectStore>,
time_provider: Arc<dyn TimeProvider>,
) -> (Arc<dyn ObjectStore>, Arc<dyn ParquetCacheOracle>) {
create_cached_obj_store_and_oracle(object_store, time_provider, 1024 * 1024 * 1024, 0.1)
create_cached_obj_store_and_oracle(
object_store,
time_provider,
1024 * 1024 * 1024,
0.1,
Duration::from_millis(10),
)
}

/// A value in the cache, containing the actual bytes as well as object store metadata
Expand Down Expand Up @@ -655,10 +661,12 @@ fn background_cache_request_handler(
}

/// A background task for pruning un-needed entries in the cache
// TODO(trevor): the interval could be configurable
fn background_cache_pruner(mem_store: Arc<MemCachedObjectStore>) -> tokio::task::JoinHandle<()> {
fn background_cache_pruner(
mem_store: Arc<MemCachedObjectStore>,
interval_duration: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(10));
let mut interval = tokio::time::interval(interval_duration);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
Expand Down Expand Up @@ -755,11 +763,13 @@ pub(crate) mod tests {
// these are magic numbers that will make it so the third entry exceeds the cache capacity:
let cache_capacity_bytes = 60;
let cache_prune_percent = 0.4;
let cache_prune_interval = Duration::from_millis(10);
let (cached_store, oracle) = create_cached_obj_store_and_oracle(
Arc::clone(&inner_store) as _,
Arc::clone(&time_provider) as _,
cache_capacity_bytes,
cache_prune_percent,
cache_prune_interval,
);
// PUT an entry into the store:
let path_1 = Path::from("0.parquet");
Expand Down

0 comments on commit 83aca43

Please sign in to comment.