Skip to content

Commit

Permalink
test: add test to verify parquet cache function
Browse files Browse the repository at this point in the history
This makes the parquet cache optional at the write buffer level, and adds
a test that verifies that the cache catches and prevents requests to the
object store in the event of a cache hit.
  • Loading branch information
hiltontj committed Sep 30, 2024
1 parent d97a6d9 commit 5c63c0a
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 5 deletions.
21 changes: 21 additions & 0 deletions influxdb3_write/src/parquet_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,10 @@ pub(crate) mod tests {
pub(crate) struct TestObjectStore {
inner: Arc<dyn ObjectStore>,
get: RequestCounter,
get_opts: RequestCounter,
get_range: RequestCounter,
get_ranges: RequestCounter,
head: RequestCounter,
notifies: Option<(Arc<Notify>, Arc<Notify>)>,
}

Expand All @@ -942,7 +945,10 @@ pub(crate) mod tests {
Self {
inner,
get: Default::default(),
get_opts: Default::default(),
get_range: Default::default(),
get_ranges: Default::default(),
head: Default::default(),
notifies: None,
}
}
Expand All @@ -960,9 +966,21 @@ pub(crate) mod tests {
self.get.read().get(path).copied().unwrap_or(0)
}

pub(crate) fn get_opts_request_count(&self, path: &Path) -> usize {
self.get_opts.read().get(path).copied().unwrap_or(0)
}

pub(crate) fn get_range_request_count(&self, path: &Path) -> usize {
self.get_range.read().get(path).copied().unwrap_or(0)
}

pub(crate) fn get_ranges_request_count(&self, path: &Path) -> usize {
self.get_ranges.read().get(path).copied().unwrap_or(0)
}

pub(crate) fn head_request_count(&self, path: &Path) -> usize {
self.head.read().get(path).copied().unwrap_or(0)
}
}

impl std::fmt::Display for TestObjectStore {
Expand Down Expand Up @@ -1015,6 +1033,7 @@ pub(crate) mod tests {
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
*self.get_opts.write().entry(location.clone()).or_insert(0) += 1;
self.inner.get_opts(location, options).await
}

Expand All @@ -1032,10 +1051,12 @@ pub(crate) mod tests {
location: &Path,
ranges: &[Range<usize>],
) -> object_store::Result<Vec<Bytes>> {
*self.get_ranges.write().entry(location.clone()).or_insert(0) += 1;
self.inner.get_ranges(location, ranges).await
}

async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
*self.head.write().entry(location.clone()).or_insert(0) += 1;
self.inner.head(location).await
}

Expand Down
134 changes: 129 additions & 5 deletions influxdb3_write/src/write_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,7 @@ mod tests {
// through to the object store for parquet files:
let test_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new())));
let obj_store: Arc<dyn ObjectStore> = Arc::clone(&test_store) as _;
let (wbuf, ctx) = setup(
let (wbuf, ctx) = setup_cache_optional(
Time::from_timestamp_nanos(0),
Arc::clone(&obj_store),
WalConfig {
Expand All @@ -1591,6 +1591,7 @@ mod tests {
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
true,
)
.await;
let db_name = "my_corp";
Expand Down Expand Up @@ -1647,8 +1648,10 @@ mod tests {
// check the number of requests to that path before making a query:
// there should be one get request, made by the cache oracle:
assert_eq!(1, test_store.get_request_count(&path));
// there should be no get_range requests, since those are only made during query planning:
assert_eq!(0, test_store.get_opts_request_count(&path));
assert_eq!(0, test_store.get_ranges_request_count(&path));
assert_eq!(0, test_store.get_range_request_count(&path));
assert_eq!(0, test_store.head_request_count(&path));

let batches = get_table_batches(&wbuf, db_name, tbl_name, &ctx).await;
assert_batches_sorted_eq!(
Expand All @@ -1672,7 +1675,114 @@ mod tests {

// counts should not change, since requests for this parquet file hit the cache:
assert_eq!(1, test_store.get_request_count(&path));
assert_eq!(0, test_store.get_opts_request_count(&path));
assert_eq!(0, test_store.get_ranges_request_count(&path));
assert_eq!(0, test_store.get_range_request_count(&path));
assert_eq!(0, test_store.head_request_count(&path));
}
#[tokio::test]
async fn test_no_parquet_cache() {
// set up a write buffer using a TestObjectStore so we can spy on requests that get
// through to the object store for parquet files:
let test_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new())));
let obj_store: Arc<dyn ObjectStore> = Arc::clone(&test_store) as _;
let (wbuf, ctx) = setup_cache_optional(
Time::from_timestamp_nanos(0),
Arc::clone(&obj_store),
WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
false,
)
.await;
let db_name = "my_corp";
let tbl_name = "temp";

// make some writes to generate a snapshot:
do_writes(
db_name,
&wbuf,
&[
TestWrite {
lp: format!(
"\
{tbl_name},warehouse=us-east,room=01a,device=10001 reading=36\n\
{tbl_name},warehouse=us-east,room=01b,device=10002 reading=29\n\
{tbl_name},warehouse=us-east,room=02a,device=30003 reading=33\n\
"
),
time_seconds: 1,
},
TestWrite {
lp: format!(
"\
{tbl_name},warehouse=us-east,room=01a,device=10001 reading=37\n\
{tbl_name},warehouse=us-east,room=01b,device=10002 reading=28\n\
{tbl_name},warehouse=us-east,room=02a,device=30003 reading=32\n\
"
),
time_seconds: 2,
},
// This write will trigger the snapshot:
TestWrite {
lp: format!(
"\
{tbl_name},warehouse=us-east,room=01a,device=10001 reading=35\n\
{tbl_name},warehouse=us-east,room=01b,device=10002 reading=24\n\
{tbl_name},warehouse=us-east,room=02a,device=30003 reading=30\n\
"
),
time_seconds: 3,
},
],
)
.await;

// Wait for snapshot to be created, once this is done, then the parquet has been persisted:
verify_snapshot_count(1, &wbuf.persister).await;

// get the path for the created parquet file:
let persisted_files = wbuf.persisted_files().get_files(db_name, tbl_name);
assert_eq!(1, persisted_files.len());
let path = ObjPath::from(persisted_files[0].path.as_str());

// check the number of requests to that path before making a query:
// there should be no get or get_range requests since nothing has asked for this file yet:
assert_eq!(0, test_store.get_request_count(&path));
assert_eq!(0, test_store.get_opts_request_count(&path));
assert_eq!(0, test_store.get_ranges_request_count(&path));
assert_eq!(0, test_store.get_range_request_count(&path));
assert_eq!(0, test_store.head_request_count(&path));

let batches = get_table_batches(&wbuf, db_name, tbl_name, &ctx).await;
assert_batches_sorted_eq!(
[
"+--------+---------+------+----------------------+-----------+",
"| device | reading | room | time | warehouse |",
"+--------+---------+------+----------------------+-----------+",
"| 10001 | 35.0 | 01a | 1970-01-01T00:00:03Z | us-east |",
"| 10001 | 36.0 | 01a | 1970-01-01T00:00:01Z | us-east |",
"| 10001 | 37.0 | 01a | 1970-01-01T00:00:02Z | us-east |",
"| 10002 | 24.0 | 01b | 1970-01-01T00:00:03Z | us-east |",
"| 10002 | 28.0 | 01b | 1970-01-01T00:00:02Z | us-east |",
"| 10002 | 29.0 | 01b | 1970-01-01T00:00:01Z | us-east |",
"| 30003 | 30.0 | 02a | 1970-01-01T00:00:03Z | us-east |",
"| 30003 | 32.0 | 02a | 1970-01-01T00:00:02Z | us-east |",
"| 30003 | 33.0 | 02a | 1970-01-01T00:00:01Z | us-east |",
"+--------+---------+------+----------------------+-----------+",
],
&batches
);

// counts should change, since requests for this parquet file were made with no cache:
assert_eq!(0, test_store.get_request_count(&path));
assert_eq!(0, test_store.get_opts_request_count(&path));
assert_eq!(1, test_store.get_ranges_request_count(&path));
assert_eq!(2, test_store.get_range_request_count(&path));
assert_eq!(0, test_store.head_request_count(&path));
}

struct TestWrite<LP> {
Expand Down Expand Up @@ -1755,10 +1865,24 @@ mod tests {
start: Time,
object_store: Arc<dyn ObjectStore>,
wal_config: WalConfig,
) -> (WriteBufferImpl, IOxSessionContext) {
setup_cache_optional(start, object_store, wal_config, true).await
}

async fn setup_cache_optional(
start: Time,
object_store: Arc<dyn ObjectStore>,
wal_config: WalConfig,
use_cache: bool,
) -> (WriteBufferImpl, IOxSessionContext) {
let time_provider: Arc<dyn TimeProvider> = Arc::new(MockProvider::new(start));
let (object_store, parquet_cache) =
test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider));
let (object_store, parquet_cache) = if use_cache {
let (object_store, parquet_cache) =
test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider));
(object_store, Some(parquet_cache))
} else {
(object_store, None)
};
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let catalog = persister.load_or_create_catalog().await.unwrap();
let last_cache = LastCacheProvider::new_from_catalog(&catalog.clone_inner()).unwrap();
Expand All @@ -1769,7 +1893,7 @@ mod tests {
Arc::clone(&time_provider),
crate::test_help::make_exec(),
wal_config,
Some(parquet_cache),
parquet_cache,
)
.await
.unwrap();
Expand Down

0 comments on commit 5c63c0a

Please sign in to comment.