Skip to content

Commit

Permalink
Database: Batch master updates on new datapoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
xsedla1o committed Jul 17, 2024
1 parent 4a51c80 commit d75c544
Showing 1 changed file with 68 additions and 32 deletions.
100 changes: 68 additions & 32 deletions dp3/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,15 @@ def __init__(

self._raw_buffer_locks = {etype: threading.Lock() for etype in model_spec.entities}
self._raw_buffers = defaultdict(list)
self._master_buffer_locks = {etype: threading.Lock() for etype in model_spec.entities}
self._master_buffers = defaultdict(dict)

self._sched = Scheduler()
seconds = ",".join(
f"{int(i)}" for i in range(60) if int(i - process_index) % min(num_processes, 3) == 0
)
self._sched.register(self._push_raw, second=seconds)
self._sched.register(self._push_master, second=seconds)

self.log.info("Database successfully initialized!")

Expand Down Expand Up @@ -184,6 +188,7 @@ def stop(self) -> None:
"""Stops the database sync, push remaining datapoints."""
self._sched.stop()
self._push_raw()
self._push_master()

def register_on_entity_delete(
self, f_one: Callable[[str, str], None], f_many: Callable[[str, list[str]], None]
Expand Down Expand Up @@ -343,7 +348,7 @@ def insert_datapoints(
self._raw_buffers[etype].extend(dps_dicts)

# Update master document
master_changes = {"$push": {}, "$set": {}}
master_changes = {"pushes": defaultdict(list), "$set": {}}
for dp in dps:
attr_spec = self._db_schema_config.attr(etype, dp.attr)

Expand All @@ -358,47 +363,31 @@ def insert_datapoints(

# Push new data of observation
if attr_spec.t == AttrType.OBSERVATIONS:
if dp.attr in master_changes["$push"]:
# Support multiple datapoints being pushed in the same request
if "$each" not in master_changes["$push"][dp.attr]:
saved_dp = master_changes["$push"][dp.attr]
master_changes["$push"][dp.attr] = {"$each": [saved_dp]}
master_changes["$push"][dp.attr]["$each"].append(
{"t1": dp.t1, "t2": dp.t2, "v": v, "c": dp.c}
)
else:
# Otherwise just push one datapoint
master_changes["$push"][dp.attr] = {"t1": dp.t1, "t2": dp.t2, "v": v, "c": dp.c}
master_changes["pushes"][dp.attr].append(
{"t1": dp.t1, "t2": dp.t2, "v": v, "c": dp.c}
)

# Push new data of timeseries
if attr_spec.t == AttrType.TIMESERIES:
if dp.attr in master_changes["$push"]:
# Support multiple datapoints being pushed in the same request
if "$each" not in master_changes["$push"][dp.attr]:
saved_dp = master_changes["$push"][dp.attr]
master_changes["$push"][dp.attr] = {"$each": [saved_dp]}
master_changes["$push"][dp.attr]["$each"].append(
{"t1": dp.t1, "t2": dp.t2, "v": v}
)
else:
# Otherwise just push one datapoint
master_changes["$push"][dp.attr] = {"t1": dp.t1, "t2": dp.t2, "v": v}
master_changes["pushes"][dp.attr].append({"t1": dp.t1, "t2": dp.t2, "v": v})

if new_entity:
master_changes["$set"]["#hash"] = HASH(f"{etype}:{eid}")
master_changes["$set"]["#time_created"] = datetime.now()

master_col = self._master_col_name(etype)
try:
self._db.get_collection(master_col, write_concern=WriteConcern(w=1)).update_one(
{"_id": eid}, master_changes, upsert=True
)
self.log.debug(f"Updated master record of {etype} {eid}: {master_changes}")
except Exception as e:
raise DatabaseError(f"Update of master record failed: {e}\n{dps}") from e
with self._master_buffer_locks[etype]:
if eid in self._master_buffers[etype]:
for attr, push_dps in master_changes["pushes"].items():
if attr in self._master_buffers[etype][eid]["pushes"]:
self._master_buffers[etype][eid]["pushes"][attr].extend(push_dps)
else:
self._master_buffers[etype][eid]["pushes"][attr] = push_dps
self._master_buffers[etype][eid]["$set"].update(master_changes["$set"])
else:
self._master_buffers[etype][eid] = master_changes

def _push_raw(self):
"""Pushes raw data to master collections."""
"""Pushes datapoints to raw collections."""
for etype, lock in self._raw_buffer_locks.items():
begin = time.time()
with lock:
Expand All @@ -423,6 +412,53 @@ def _push_raw(self):
except Exception as e:
raise DatabaseError(f"Insert of datapoints failed: {e}\n{dps}") from e

def _push_master(self):
"""Push master changes to database."""
for etype, lock in self._raw_buffer_locks.items():
master_col = self._db.get_collection(
self._master_col_name(etype), write_concern=WriteConcern(w=1)
)
begin = time.time()
with lock:
locked = time.time()
master_changes = self._master_buffers[etype]
self._master_buffers[etype] = {}
if not master_changes:
continue
try:
updates = [
UpdateOne(
{"_id": eid},
{
"$push": {
attr: push_dps[0] if len(push_dps) == 1 else {"$each": push_dps}
for attr, push_dps in changes["pushes"].items()
},
"$set": changes["$set"],
},
upsert=True,
)
for eid, changes in master_changes.items()
]
updates_ready = time.time()
res = master_col.bulk_write(updates, ordered=False)
end = time.time()
self.log.debug(
"Updated %s master records in %.3fs, "
"%.3fus lock, %.3fms update prep, %.3fs update",
len(master_changes),
end - begin,
(locked - begin) * 1000_000,
(updates_ready - locked) * 1000,
end - updates_ready,
)
for error in res.bulk_api_result.get("writeErrors", []):
self.log.error("Error in bulk write: %s", error)
except Exception as e:
raise DatabaseError(
f"Update of master records failed: {e}\n{master_changes}"
) from e

def update_master_records(self, etype: str, eids: list[str], records: list[dict]) -> None:
"""Replace master records of `etype`:`eid` with the provided `records`.
Expand Down

0 comments on commit d75c544

Please sign in to comment.