Skip to content

Commit

Permalink
SnapShooter: Run finished metadata.
Browse files Browse the repository at this point in the history
  • Loading branch information
xsedla1o committed Oct 16, 2023
1 parent 3891cf6 commit 08d53da
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
2 changes: 2 additions & 0 deletions dp3/common/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,13 @@ class Snapshot(Task):
Attributes:
entities: List of (entity_type, entity_id)
time: timestamp for snapshot creation
final: If True, this is the last linked snapshot for the given time
"""

entities: list[tuple[str, str]]
time: datetime
type: SnapshotMessageType
final: bool = False

def routing_key(self):
return "-".join(f"{etype}:{eid}" for etype, eid in self.entities)
Expand Down
27 changes: 23 additions & 4 deletions dp3/snapshots/snapshooter.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,16 @@ def add_to_link_cache(self, eid: str, dp: DataPointBase):
def make_snapshots(self):
"""Creates snapshots for all entities currently active in database."""
time = datetime.now()
self.db.save_metadata(
time,
{
"task_creation_start": time,
"entities": 0,
"components": 0,
"workers_finished": 0,
"linked_finished": False,
},
)

# distribute list of possibly linked entities to all workers
cached = self.get_cached_link_entity_ids()
Expand All @@ -234,22 +244,28 @@ def make_snapshots(self):

# Load links only for a reduced set of entities
self.log.debug("Loading linked entities.")
self.db.save_metadata(time, {"task_creation_start": time, "entities": 0, "components": 0})

times = {}
counts = {"entities": 0, "components": 0}
try:
linked_entities = self.get_linked_entities(time, cached)
times["components_loaded"] = datetime.now()

for linked_entities_component in linked_entities:
for i, linked_entities_component in enumerate(linked_entities):
counts["entities"] += len(linked_entities_component)
counts["components"] += 1

self.snapshot_queue_writer.put_task(
task=Snapshot(
entities=linked_entities_component, time=time, type=SnapshotMessageType.task
entities=linked_entities_component,
time=time,
type=SnapshotMessageType.task,
final=(i + 1 == len(linked_entities)),
)
)

if len(linked_entities) == 0:
self.db.update_metadata(time, metadata={"linked_finished": True})
except pymongo.errors.CursorNotFound as err:
self.log.exception(err)
finally:
Expand Down Expand Up @@ -347,7 +363,7 @@ def make_snapshots_by_hash(self, task: Snapshot):
self.db.update_metadata(
task.time,
metadata={},
increase={"entities": entity_cnt, "components": entity_cnt},
increase={"entities": entity_cnt, "components": entity_cnt, "workers_finished": 1},
)
self.log.debug("Worker snapshot creation done.")

Expand Down Expand Up @@ -423,6 +439,9 @@ def make_snapshot(self, task: Snapshot):
for rtype_rid, record in entity_values.items():
self.db.save_snapshot(rtype_rid[0], record, task.time)

if task.final:
self.db.update_metadata(task.time, metadata={"linked_finished": True})

def run_timeseries_processing(self, entity_type, master_record):
"""
- all registered timeseries processing modules must be called
Expand Down

0 comments on commit 08d53da

Please sign in to comment.