Skip to content

Commit

Permalink
Database: Improve index usage.
Browse files Browse the repository at this point in the history
  • Loading branch information
xsedla1o committed Aug 22, 2024
1 parent 0d3e116 commit ff0be07
Showing 1 changed file with 25 additions and 17 deletions.
42 changes: 25 additions & 17 deletions dp3/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,18 @@ def _init_database_schema(self, db_name) -> None:
self._db[snapshot_col].create_index("_time_created", background=True)

snapshot_col = self._snapshots_col_name(etype)
# To get the empty bucket for an entity without fetching all its buckets
self._db[snapshot_col].create_index(
[("_id", pymongo.DESCENDING), ("count", pymongo.ASCENDING)],
background=True,
)
# To fetch the oversized entities only
self._db[snapshot_col].create_index(
[("oversized", pymongo.DESCENDING)],
partialFilterExpression={"oversized": {"$eq": True}},
background=True,
)
# For deleting old snapshots
self._db[snapshot_col].create_index("_time_created", background=True)

# Create a TTL index for metadata collection
Expand Down Expand Up @@ -1126,8 +1138,8 @@ def save_snapshot(self, etype: str, snapshot: dict, ctime: datetime):
os_col = self._oversized_snapshots_col_name(etype)

# Find out if the snapshot is oversized
normal, oversized, new = self._get_snapshot_state(etype, {eid})
if new | normal:
normal, oversized = self._get_snapshot_state(etype, {eid})
if normal:
try:
self._db[snapshot_col].update_one(
self._snapshot_bucket_eid_filter(eid)
Expand All @@ -1144,7 +1156,6 @@ def save_snapshot(self, etype: str, snapshot: dict, ctime: datetime):
},
upsert=True,
)
self._cache_snapshot_state(etype, new, oversized)
except (WriteError, OperationFailure, DocumentTooLarge) as e:
if e.code != BSON_OBJECT_TOO_LARGE:
raise e
Expand All @@ -1161,33 +1172,30 @@ def save_snapshot(self, etype: str, snapshot: dict, ctime: datetime):
self._db[os_col].insert_one(snapshot)
return

def _get_snapshot_state(self, etype: str, eids: set[str]) -> tuple[set, set, set]:
def _get_snapshot_state(self, etype: str, eids: set[str]) -> tuple[set, set]:
"""Get current state of snapshot of given `eid`."""
unknown = eids
normal = self._normal_snapshot_eids[etype] & unknown
oversized = self._oversized_snapshot_eids[etype] & unknown
unknown = unknown - normal - oversized

if not unknown:
return normal, oversized, unknown
return normal, oversized

snapshot_col = self._snapshots_col_name(etype)
new_normal = set()
new_oversized = set()
for doc in self._db[snapshot_col].find(
self._snapshot_bucket_eids_filter(unknown), {"oversized": 1}
self._snapshot_bucket_eids_filter(unknown) | {"oversized": True},
{"oversized": 1},
):
eid = doc["_id"].rsplit("_#", maxsplit=1)[0]
if doc.get("oversized", False):
new_oversized.add(eid)
else:
new_normal.add(eid)
new_oversized.add(eid)

self._normal_snapshot_eids[etype] |= new_normal
unknown = unknown - new_oversized
self._normal_snapshot_eids[etype] |= unknown
self._oversized_snapshot_eids[etype] |= new_oversized
unknown = unknown - new_normal - new_oversized

return normal | new_normal, oversized | new_oversized, unknown
return normal | unknown, oversized | new_oversized

def _cache_snapshot_state(self, etype: str, normal: set, oversized: set):
"""Cache snapshot state for given `etype`."""
Expand Down Expand Up @@ -1223,15 +1231,15 @@ def save_snapshots(self, etype: str, snapshots: list[dict], ctime: datetime):
snapshots_by_eid[snapshot["eid"]].append(snapshot)

# Find out if any of the snapshots are oversized
normal, oversized, new = self._get_snapshot_state(etype, set(snapshots_by_eid.keys()))
normal, oversized = self._get_snapshot_state(etype, set(snapshots_by_eid.keys()))

upserts = []
update_originals = []
oversized_inserts = []
oversized_updates = []

# A normal snapshot, shift the last snapshot to history and update last
for eid in normal | new:
for eid in normal:
upserts.append(
UpdateOne(
self._snapshot_bucket_eid_filter(eid)
Expand Down Expand Up @@ -1305,7 +1313,7 @@ def save_snapshots(self, etype: str, snapshots: list[dict], ctime: datetime):
raise DatabaseError(f"Insert of snapshots failed: {str(e)[:2048]}") from e

# Cache the new state
self._cache_snapshot_state(etype, new - new_oversized, new_oversized)
self._cache_snapshot_state(etype, set(), new_oversized)

def _get_metadata_id(self, module: str, time: datetime, worker_id: Optional[int] = None) -> str:
"""Generates unique metadata id based on `module`, `time` and the worker index."""
Expand Down

0 comments on commit ff0be07

Please sign in to comment.