Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable parquet cache config through CLI #25415

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 80 additions & 13 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Entrypoint for InfluxDB 3.0 Edge Server

use anyhow::{bail, Context};
use clap_blocks::{
memory_size::MemorySize,
object_store::{make_object_store, ObjectStoreConfig, ObjectStoreType},
Expand All @@ -22,11 +23,11 @@ use influxdb3_write::{
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::SystemProvider;
use object_store::DynObjectStore;
use object_store::ObjectStore;
use observability_deps::tracing::*;
use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::{collections::HashMap, path::Path};
use std::{collections::HashMap, path::Path, str::FromStr};
use std::{num::NonZeroUsize, sync::Arc};
use thiserror::Error;
use tokio::net::TcpListener;
Expand Down Expand Up @@ -216,6 +217,69 @@ pub struct Config {
/// for any hosts that share the same object store configuration, i.e., the same bucket.
#[clap(long = "host-id", env = "INFLUXDB3_HOST_IDENTIFIER_PREFIX", action)]
pub host_identifier_prefix: String,

/// The size of the in-memory Parquet cache in megabytes (MB).
#[clap(
long = "parquet-mem-cache-size-mb",
env = "INFLUXDB3_PARQUET_MEM_CACHE_SIZE_MB",
default_value = "1000",
action
)]
pub parquet_mem_cache_size: usize,

/// The percentage of entries to prune during a prune operation on the in-memory Parquet cache.
///
/// This must be a number between 0 and 1.
#[clap(
long = "parquet-mem-cache-prune-percentage",
env = "INFLUXDB3_PARQUET_MEM_CACHE_PRUNE_PERCENTAGE",
default_value = "0.1",
action
)]
pub parquet_mem_cache_prune_percentage: PrunePercent,

/// The interval on which to check if the in-memory Parquet cache needs to be pruned.
///
/// Enter as a human-readable time, e.g., "1s", "100ms", "1m", etc.
#[clap(
long = "parquet-mem-cache-prune-interval",
env = "INFLUXDB3_PARQUET_MEM_CACHE_PRUNE_INTERVAL",
default_value = "1s",
action
)]
pub parquet_mem_cache_prune_interval: humantime::Duration,

/// Disable the in-memory Parquet cache. By default, the cache is enabled.
#[clap(
long = "disable-parquet-mem-cache",
env = "INFLUXDB3_DISABLE_PARQUET_MEM_CACHE",
default_value_t = false,
action
)]
pub disable_parquet_mem_cache: bool,
}

#[derive(Debug, Clone, Copy)]
pub struct PrunePercent(f64);

impl From<PrunePercent> for f64 {
fn from(value: PrunePercent) -> Self {
value.0
}
}

impl FromStr for PrunePercent {
type Err = anyhow::Error;

fn from_str(s: &str) -> std::prelude::v1::Result<Self, Self::Err> {
let p = s
.parse::<f64>()
.context("failed to parse prune percent as f64")?;
if p <= 0.0 || p >= 1.0 {
bail!("prune percent must be between 0 and 1");
}
Ok(Self(p))
}
}

/// If `p` does not exist, try to create it as a directory.
Expand Down Expand Up @@ -258,19 +322,22 @@ pub async fn command(config: Config) -> Result<()> {
// Construct a token to trigger clean shutdown
let frontend_shutdown = CancellationToken::new();

let object_store: Arc<DynObjectStore> =
let object_store: Arc<dyn ObjectStore> =
make_object_store(&config.object_store_config).map_err(Error::ObjectStoreParsing)?;
let time_provider = Arc::new(SystemProvider::new());

// TODO(trevor): make the cache capacity and prune percent configurable/optional:
let cache_capacity = 1024 * 1024 * 1024;
let prune_percent = 0.1;
let (object_store, parquet_cache) = create_cached_obj_store_and_oracle(
object_store,
Arc::clone(&time_provider) as _,
cache_capacity,
prune_percent,
);
let (object_store, parquet_cache) = if !config.disable_parquet_mem_cache {
let (object_store, parquet_cache) = create_cached_obj_store_and_oracle(
object_store,
Arc::clone(&time_provider) as _,
config.parquet_mem_cache_size * 1_000,
config.parquet_mem_cache_prune_percentage.into(),
config.parquet_mem_cache_prune_interval.into(),
);
(object_store, Some(parquet_cache))
} else {
(object_store, None)
};

let trace_exporter = config.tracing_config.build()?;

Expand Down Expand Up @@ -349,7 +416,7 @@ pub async fn command(config: Config) -> Result<()> {
Arc::<SystemProvider>::clone(&time_provider),
Arc::clone(&exec),
wal_config,
Some(parquet_cache),
parquet_cache,
)
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?,
Expand Down
26 changes: 18 additions & 8 deletions influxdb3_write/src/parquet_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,10 @@ impl MemCacheOracle {
/// This spawns two background tasks:
/// * one to handle registered [`CacheRequest`]s
/// * one to prune deleted and un-needed cache entries on an interval
// TODO(trevor): this should be more configurable, e.g., channel size, prune interval
fn new(mem_cached_store: Arc<MemCachedObjectStore>) -> Self {
fn new(mem_cached_store: Arc<MemCachedObjectStore>, prune_interval: Duration) -> Self {
let (cache_request_tx, cache_request_rx) = channel(CACHE_REQUEST_BUFFER_SIZE);
background_cache_request_handler(Arc::clone(&mem_cached_store), cache_request_rx);
background_cache_pruner(mem_cached_store);
background_cache_pruner(mem_cached_store, prune_interval);
Self { cache_request_tx }
}
}
Expand All @@ -104,14 +103,15 @@ pub fn create_cached_obj_store_and_oracle(
time_provider: Arc<dyn TimeProvider>,
cache_capacity: usize,
prune_percent: f64,
prune_interval: Duration,
) -> (Arc<dyn ObjectStore>, Arc<dyn ParquetCacheOracle>) {
let store = Arc::new(MemCachedObjectStore::new(
object_store,
cache_capacity,
time_provider,
prune_percent,
));
let oracle = Arc::new(MemCacheOracle::new(Arc::clone(&store)));
let oracle = Arc::new(MemCacheOracle::new(Arc::clone(&store), prune_interval));
(store, oracle)
}

Expand All @@ -120,7 +120,13 @@ pub fn test_cached_obj_store_and_oracle(
object_store: Arc<dyn ObjectStore>,
time_provider: Arc<dyn TimeProvider>,
) -> (Arc<dyn ObjectStore>, Arc<dyn ParquetCacheOracle>) {
create_cached_obj_store_and_oracle(object_store, time_provider, 1024 * 1024 * 1024, 0.1)
create_cached_obj_store_and_oracle(
object_store,
time_provider,
1024 * 1024 * 1024,
0.1,
Duration::from_millis(10),
)
}

/// A value in the cache, containing the actual bytes as well as object store metadata
Expand Down Expand Up @@ -655,10 +661,12 @@ fn background_cache_request_handler(
}

/// A background task for pruning un-needed entries in the cache
// TODO(trevor): the interval could be configurable
fn background_cache_pruner(mem_store: Arc<MemCachedObjectStore>) -> tokio::task::JoinHandle<()> {
fn background_cache_pruner(
mem_store: Arc<MemCachedObjectStore>,
interval_duration: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(10));
let mut interval = tokio::time::interval(interval_duration);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
Expand Down Expand Up @@ -755,11 +763,13 @@ pub(crate) mod tests {
// these are magic numbers that will make it so the third entry exceeds the cache capacity:
let cache_capacity_bytes = 60;
let cache_prune_percent = 0.4;
let cache_prune_interval = Duration::from_millis(10);
let (cached_store, oracle) = create_cached_obj_store_and_oracle(
Arc::clone(&inner_store) as _,
Arc::clone(&time_provider) as _,
cache_capacity_bytes,
cache_prune_percent,
cache_prune_interval,
);
// PUT an entry into the store:
let path_1 = Path::from("0.parquet");
Expand Down