Skip to content

Commit

Permalink
wip, async public trait bad
Browse files Browse the repository at this point in the history
Signed-off-by: Yuchen Liang <[email protected]>
  • Loading branch information
yliang412 committed Sep 27, 2024
1 parent ec07a1e commit 5a87afd
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 25 deletions.
119 changes: 114 additions & 5 deletions pageserver/src/tenant/disk_btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,11 @@ where
Ok(result)
}

pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIterator<'a>
pub fn iter<'a>(self, start_key: &'a [u8; L], ctx: &'a RequestContext) -> DiskBtreeIter<'a>
where
R: 'a + Send,
{
DiskBtreeIterator {
DiskBtreeIter {
stream: Box::pin(self.into_stream(start_key, ctx)),
}
}
Expand Down Expand Up @@ -525,19 +525,128 @@ where
}
}

pub struct DiskBtreeIterator<'a> {
pub trait DiskBtreeIterator {
type Item;
async fn next(&mut self) -> Option<std::result::Result<Self::Item, DiskBtreeError>>;
fn map<B, F>(self, f: F) -> MapDiskBtreeIter<Self, F>
where
Self: Sized,
F: FnMut(Self::Item) -> B,
{
MapDiskBtreeIter::new(self, f)
}

fn filter<P>(self, predicate: P) -> FilterDiskBtreeIter<Self, P>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
FilterDiskBtreeIter::new(self, predicate)
}
}

pub struct DiskBtreeIter<'a> {
#[allow(clippy::type_complexity)]
stream: std::pin::Pin<
Box<dyn Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a + Send>,
>,
}

impl<'a> DiskBtreeIterator<'a> {
pub async fn next(&mut self) -> Option<std::result::Result<(Vec<u8>, u64), DiskBtreeError>> {
impl<'a> DiskBtreeIterator for DiskBtreeIter<'a> {
type Item = (Vec<u8>, u64);

async fn next(&mut self) -> Option<std::result::Result<Self::Item, DiskBtreeError>> {
self.stream.next().await
}
}

pub struct MapDiskBtreeIter<I, F> {
iter: I,
f: F,
}

impl<I, F> MapDiskBtreeIter<I, F> {
pub fn new(iter: I, f: F) -> Self {
MapDiskBtreeIter { iter, f }
}
}

impl<B, I: DiskBtreeIterator, F> DiskBtreeIterator for MapDiskBtreeIter<I, F>
where
F: FnMut(I::Item) -> B,
{
type Item = B;

async fn next(&mut self) -> Option<std::result::Result<Self::Item, DiskBtreeError>> {
self.iter.next().await.map(|res| res.map(|x| (self.f)(x)))
}
}

pub struct FilterDiskBtreeIter<I, P> {
iter: I,
predicate: P,
}

impl<I, P> FilterDiskBtreeIter<I, P> {
pub fn new(iter: I, predicate: P) -> Self {
FilterDiskBtreeIter { iter, predicate }
}
}

impl<I: DiskBtreeIterator, P> DiskBtreeIterator for FilterDiskBtreeIter<I, P>
where
P: FnMut(&I::Item) -> bool,
{
type Item = I::Item;

async fn next(&mut self) -> Option<std::result::Result<Self::Item, DiskBtreeError>> {
self.iter.next().await.and_then(|res| match res {
Ok(x) => {
if (self.predicate)(&x) {
Some(Ok(x))
} else {
None
}
}
Err(e) => Some(Err(e)),
})
}
}

// pub struct DeltaLayerDiskBtreeIterator<'a, F> {
// inner: DiskBtreeIterator<'a, F, >,
// extract: F,
// }

// impl<'a, F> DeltaLayerDiskBtreeIterator<'a, F>
// where
// F: FnMut(Vec<u8>, u64) -> (Key, Lsn, u64),
// {
// pub async fn next(&mut self) -> Option<std::result::Result<(Key, Lsn, u64), DiskBtreeError>> {
// self.inner
// .next()
// .await
// .map(|res| res.map(|(raw_key, value)| (self.extract)(raw_key, value)))
// }
// }

// pub struct ImageLayerDiskBtreeIterator<'a, F> {
// inner: DiskBtreeIterator<'a>,
// extract: F,
// }

// impl<'a, F> ImageLayerDiskBtreeIterator<'a, F>
// where
// F: FnMut(Vec<u8>, u64) -> (Key, Lsn, u64),
// {
// pub async fn next(&mut self) -> Option<std::result::Result<(Key, Lsn, u64), DiskBtreeError>> {
// self.inner
// .next()
// .await
// .map(|res| res.map(|(raw_key, value)| (self.extract)(raw_key, value)))
// }
// }

///
/// Public builder object, for creating a new tree.
///
Expand Down
69 changes: 58 additions & 11 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
DiskBtreeBuilder, DiskBtreeIter, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
use crate::tenant::timeline::GetVectoredError;
Expand Down Expand Up @@ -1426,14 +1426,17 @@ impl DeltaLayerInner {
offset
}

pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
pub(crate) fn iter<'a>(
&'a self,
ctx: &'a RequestContext,
) -> DeltaLayerIterator<'a, DeltaLayerDiskBtreeIter<'a>> {
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
DeltaLayerIterator {
delta_layer: self,
ctx,
index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx).map_delta(),
key_values_batch: std::collections::VecDeque::new(),
is_end: false,
planner: StreamingVectoredReadPlanner::new(
Expand Down Expand Up @@ -1508,16 +1511,19 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del
}
}

pub struct DeltaLayerIterator<'a> {
pub struct DeltaLayerIterator<'a, I = DeltaLayerDiskBtreeIter<'a>> {
delta_layer: &'a DeltaLayerInner,
ctx: &'a RequestContext,
planner: StreamingVectoredReadPlanner,
index_iter: DiskBtreeIterator<'a>,
index_iter: I,
key_values_batch: VecDeque<(Key, Lsn, Value)>,
is_end: bool,
}

impl<'a> DeltaLayerIterator<'a> {
impl<'a, I> DeltaLayerIterator<'a, I>
where
I: DiskBtreeIterator<Item = (Key, Lsn, u64)>,
{
pub(crate) fn layer_dbg_info(&self) -> String {
self.delta_layer.layer_dbg_info()
}
Expand All @@ -1529,11 +1535,7 @@ impl<'a> DeltaLayerIterator<'a> {

let plan = loop {
if let Some(res) = self.index_iter.next().await {
let (raw_key, value) = res?;
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
let blob_ref = BlobRef(value);
let offset = blob_ref.pos();
let (key, lsn, offset) = res?;
if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
break batch_plan;
}
Expand Down Expand Up @@ -1579,6 +1581,51 @@ impl<'a> DeltaLayerIterator<'a> {
.expect("should not be empty"),
))
}

pub async fn filter_key<P>(
self,
predicate: P,
) -> DeltaLayerIterator<'a, impl DiskBtreeIterator<Item = (Key, Lsn, u64)>>
where
P: FnMut(&(Key, Lsn, u64)) -> bool,
{
DeltaLayerIterator {
delta_layer: self.delta_layer,
ctx: self.ctx,
planner: self.planner,
index_iter: self.index_iter.filter(predicate),
key_values_batch: self.key_values_batch,
is_end: self.is_end,
}
}
}

pub struct DeltaLayerDiskBtreeIter<'a> {
iter: DiskBtreeIter<'a>,
}

impl<'a> DiskBtreeIter<'a> {
pub(crate) fn map_delta(self) -> DeltaLayerDiskBtreeIter<'a> {
DeltaLayerDiskBtreeIter { iter: self }
}
}

impl<'a> DiskBtreeIterator for DeltaLayerDiskBtreeIter<'a> {
type Item = (Key, Lsn, u64);

async fn next(
&mut self,
) -> Option<std::result::Result<Self::Item, crate::tenant::disk_btree::DiskBtreeError>> {
self.iter.next().await.map(|res| {
res.map(|(raw_key, value)| {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
let blob_ref = BlobRef(value);
let offset = blob_ref.pos();
(key, lsn, offset)
})
})
}
}

#[cfg(test)]
Expand Down
40 changes: 31 additions & 9 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockBuf, FileBlockReader};
use crate::tenant::disk_btree::{
DiskBtreeBuilder, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
DiskBtreeBuilder, DiskBtreeIter, DiskBtreeIterator, DiskBtreeReader, VisitDirection,
};
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
Expand Down Expand Up @@ -652,7 +652,7 @@ impl ImageLayerInner {
ImageLayerIterator {
image_layer: self,
ctx,
index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx).map_image(self.lsn),
key_values_batch: VecDeque::new(),
is_end: false,
planner: StreamingVectoredReadPlanner::new(
Expand Down Expand Up @@ -999,11 +999,37 @@ pub struct ImageLayerIterator<'a> {
image_layer: &'a ImageLayerInner,
ctx: &'a RequestContext,
planner: StreamingVectoredReadPlanner,
index_iter: DiskBtreeIterator<'a>,
index_iter: ImageLayerDiskBtreeIter<'a>,
key_values_batch: VecDeque<(Key, Lsn, Value)>,
is_end: bool,
}

pub struct ImageLayerDiskBtreeIter<'a> {
iter: DiskBtreeIter<'a>,
lsn: Lsn,
}

impl<'a> DiskBtreeIter<'a> {
pub(crate) fn map_image(self, lsn: Lsn) -> ImageLayerDiskBtreeIter<'a> {
ImageLayerDiskBtreeIter { iter: self, lsn }
}
}

impl<'a> DiskBtreeIterator for ImageLayerDiskBtreeIter<'a> {
type Item = (Key, Lsn, u64);

async fn next(
&mut self,
) -> Option<std::result::Result<Self::Item, crate::tenant::disk_btree::DiskBtreeError>> {
self.iter.next().await.map(|res| {
res.map(|(raw_key, offset)| {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
(key, self.lsn, offset)
})
})
}
}

impl<'a> ImageLayerIterator<'a> {
pub(crate) fn layer_dbg_info(&self) -> String {
self.image_layer.layer_dbg_info()
Expand All @@ -1016,12 +1042,8 @@ impl<'a> ImageLayerIterator<'a> {

let plan = loop {
if let Some(res) = self.index_iter.next().await {
let (raw_key, offset) = res?;
if let Some(batch_plan) = self.planner.handle(
Key::from_slice(&raw_key[..KEY_SIZE]),
self.image_layer.lsn,
offset,
) {
let (key, lsn, offset) = res?;
if let Some(batch_plan) = self.planner.handle(key, lsn, offset) {
break batch_plan;
}
} else {
Expand Down

0 comments on commit 5a87afd

Please sign in to comment.