diff --git a/dp3/database/database.py b/dp3/database/database.py index 1fb09075..b17d99b7 100644 --- a/dp3/database/database.py +++ b/dp3/database/database.py @@ -144,11 +144,15 @@ def __init__( self._raw_buffer_locks = {etype: threading.Lock() for etype in model_spec.entities} self._raw_buffers = defaultdict(list) + self._master_buffer_locks = {etype: threading.Lock() for etype in model_spec.entities} + self._master_buffers = defaultdict(dict) + self._sched = Scheduler() seconds = ",".join( f"{int(i)}" for i in range(60) if int(i - process_index) % min(num_processes, 3) == 0 ) self._sched.register(self._push_raw, second=seconds) + self._sched.register(self._push_master, second=seconds) self.log.info("Database successfully initialized!") @@ -184,6 +188,7 @@ def stop(self) -> None: """Stops the database sync, push remaining datapoints.""" self._sched.stop() self._push_raw() + self._push_master() def register_on_entity_delete( self, f_one: Callable[[str, str], None], f_many: Callable[[str, list[str]], None] @@ -343,7 +348,7 @@ def insert_datapoints( self._raw_buffers[etype].extend(dps_dicts) # Update master document - master_changes = {"$push": {}, "$set": {}} + master_changes = {"pushes": defaultdict(list), "$set": {}} for dp in dps: attr_spec = self._db_schema_config.attr(etype, dp.attr) @@ -358,47 +363,31 @@ def insert_datapoints( # Push new data of observation if attr_spec.t == AttrType.OBSERVATIONS: - if dp.attr in master_changes["$push"]: - # Support multiple datapoints being pushed in the same request - if "$each" not in master_changes["$push"][dp.attr]: - saved_dp = master_changes["$push"][dp.attr] - master_changes["$push"][dp.attr] = {"$each": [saved_dp]} - master_changes["$push"][dp.attr]["$each"].append( - {"t1": dp.t1, "t2": dp.t2, "v": v, "c": dp.c} - ) - else: - # Otherwise just push one datapoint - master_changes["$push"][dp.attr] = {"t1": dp.t1, "t2": dp.t2, "v": v, "c": dp.c} + master_changes["pushes"][dp.attr].append( + {"t1": dp.t1, "t2": dp.t2, "v": v, "c": dp.c} + ) # Push new data of timeseries if attr_spec.t == AttrType.TIMESERIES: - if dp.attr in master_changes["$push"]: - # Support multiple datapoints being pushed in the same request - if "$each" not in master_changes["$push"][dp.attr]: - saved_dp = master_changes["$push"][dp.attr] - master_changes["$push"][dp.attr] = {"$each": [saved_dp]} - master_changes["$push"][dp.attr]["$each"].append( - {"t1": dp.t1, "t2": dp.t2, "v": v} - ) - else: - # Otherwise just push one datapoint - master_changes["$push"][dp.attr] = {"t1": dp.t1, "t2": dp.t2, "v": v} + master_changes["pushes"][dp.attr].append({"t1": dp.t1, "t2": dp.t2, "v": v}) if new_entity: master_changes["$set"]["#hash"] = HASH(f"{etype}:{eid}") master_changes["$set"]["#time_created"] = datetime.now() - master_col = self._master_col_name(etype) - try: - self._db.get_collection(master_col, write_concern=WriteConcern(w=1)).update_one( - {"_id": eid}, master_changes, upsert=True - ) - self.log.debug(f"Updated master record of {etype} {eid}: {master_changes}") - except Exception as e: - raise DatabaseError(f"Update of master record failed: {e}\n{dps}") from e + with self._master_buffer_locks[etype]: + if eid in self._master_buffers[etype]: + for attr, push_dps in master_changes["pushes"].items(): + if attr in self._master_buffers[etype][eid]["pushes"]: + self._master_buffers[etype][eid]["pushes"][attr].extend(push_dps) + else: + self._master_buffers[etype][eid]["pushes"][attr] = push_dps + self._master_buffers[etype][eid]["$set"].update(master_changes["$set"]) + else: + self._master_buffers[etype][eid] = master_changes def _push_raw(self): - """Pushes raw data to master collections.""" + """Pushes datapoints to raw collections.""" for etype, lock in self._raw_buffer_locks.items(): begin = time.time() with lock: @@ -423,6 +412,53 @@ def _push_raw(self): except Exception as e: raise DatabaseError(f"Insert of datapoints failed: {e}\n{dps}") from e + def _push_master(self): + """Push master changes to database.""" + for etype, lock in self._raw_buffer_locks.items(): + master_col = self._db.get_collection( + self._master_col_name(etype), write_concern=WriteConcern(w=1) + ) + begin = time.time() + with lock: + locked = time.time() + master_changes = self._master_buffers[etype] + self._master_buffers[etype] = {} + if not master_changes: + continue + try: + updates = [ + UpdateOne( + {"_id": eid}, + { + "$push": { + attr: push_dps[0] if len(push_dps) == 1 else {"$each": push_dps} + for attr, push_dps in changes["pushes"].items() + }, + "$set": changes["$set"], + }, + upsert=True, + ) + for eid, changes in master_changes.items() + ] + updates_ready = time.time() + res = master_col.bulk_write(updates, ordered=False) + end = time.time() + self.log.debug( + "Updated %s master records in %.3fs, " + "%.3fus lock, %.3fms update prep, %.3fs update", + len(master_changes), + end - begin, + (locked - begin) * 1000_000, + (updates_ready - locked) * 1000, + end - updates_ready, + ) + for error in res.bulk_api_result.get("writeErrors", []): + self.log.error("Error in bulk write: %s", error) + except Exception as e: + raise DatabaseError( + f"Update of master records failed: {e}\n{master_changes}" + ) from e + def update_master_records(self, etype: str, eids: list[str], records: list[dict]) -> None: """Replace master records of `etype`:`eid` with the provided `records`.