Skip to content

Commit

Permalink
DB: return newest *complete* set of snapshots from `get_latest_snapsh…
Browse files Browse the repository at this point in the history
…ots`

When snapshot set creation takes extensive amount of time
(tens of minutes) due to entity linkage, individual snapshots are
being added gradually. But this means that `get_latest_snapshots`
returns only subset of all entities.

This commit fixes this situation by using older - but complete -
set of snapshots.

Workaround uses `#metadata` Mongo collection and checks number of
complete worker jobs, so new parameter is added to `EntityDatabase`
constructor.
  • Loading branch information
DavidB137 committed Oct 17, 2023
1 parent 08d53da commit 1073cee
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 8 deletions.
4 changes: 3 additions & 1 deletion dp3/api/internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ def validate(cls, v, values):
# Load configuration, entity and attribute specification and connect to DP3 message broker.
CONFIG = read_config_dir(conf_env.CONF_DIR, recursive=True)
MODEL_SPEC = ModelSpec(CONFIG.get("db_entities"))
DB = EntityDatabase(CONFIG.get("database"), MODEL_SPEC)
DB = EntityDatabase(
CONFIG.get("database"), MODEL_SPEC, CONFIG.get("processing_core.worker_processes")
)
TASK_WRITER = TaskQueueWriter(
conf_env.APP_NAME,
CONFIG.get("processing_core.worker_processes"),
Expand Down
22 changes: 16 additions & 6 deletions dp3/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ class EntityDatabase:
"""

def __init__(
self,
db_conf: HierarchicalDict,
model_spec: ModelSpec,
self, db_conf: HierarchicalDict, model_spec: ModelSpec, num_processes: int
) -> None:
self.log = logging.getLogger("EntityDatabase")

Expand All @@ -109,6 +107,7 @@ def __init__(
time.sleep(delay)

self._db_schema_config = model_spec
self._num_processes = num_processes

# Init and switch to correct database
self._db = self._db[config.db_name]
Expand Down Expand Up @@ -362,11 +361,22 @@ def get_latest_snapshots(
self._assert_etype_exists(etype)

snapshot_col = self._snapshots_col_name(etype)
latest_snapshot = self._db[snapshot_col].find_one({}, sort=[("_id", -1)])
if latest_snapshot is None:

# Find newest fully completed snapshot set
latest_fully_completed_snapshot_metadata = self._db["#metadata"].find_one(
{
"#module": "SnapShooter",
"workers_finished": self._num_processes,
"linked_finished": True,
},
sort=[("#time_created", -1)],
)

if latest_fully_completed_snapshot_metadata is None:
return self._db[snapshot_col].find(), self._db[snapshot_col].count_documents({})

latest_snapshot_date = latest_snapshot["_time_created"]
# Extract date and query using it
latest_snapshot_date = latest_fully_completed_snapshot_metadata["#time_created"]
query = {"_time_created": latest_snapshot_date}
if eid_filter != "":
query["eid"] = {"$regex": eid_filter}
Expand Down
2 changes: 1 addition & 1 deletion dp3/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def main(app_name: str, config_dir: str, process_index: int, verbose: bool) -> N
# Create instances of core components
log.info(f"***** {app_name} worker {process_index} of {num_processes} start *****")

db = EntityDatabase(config.get("database"), model_spec)
db = EntityDatabase(config.get("database"), model_spec, num_processes)

global_scheduler = scheduler.Scheduler()
task_executor = TaskExecutor(db, platform_config)
Expand Down

0 comments on commit 1073cee

Please sign in to comment.