From e0cde8e63467617c6755333a08dc063ee6424afb Mon Sep 17 00:00:00 2001 From: OndrejSedlacek Date: Wed, 9 Aug 2023 14:23:06 +0200 Subject: [PATCH] HistoryManager: Customizable aggregation window. As any cron expression is allowed, the retroactive, per-day batching implementation was dropped. Implemented option of not having an archive. --- dp3/history_management/history_manager.py | 56 ++++++++++------------- 1 file changed, 23 insertions(+), 33 deletions(-) diff --git a/dp3/history_management/history_manager.py b/dp3/history_management/history_manager.py index fdb90c52..14d563c9 100644 --- a/dp3/history_management/history_manager.py +++ b/dp3/history_management/history_manager.py @@ -111,8 +111,10 @@ def __init__( # Schedule datapoint archivation archive_config = self.config.datapoint_archivation self.keep_raw_delta = archive_config.older_than - # TODO: Handle None as valid archive dir (simply delete datapoints) - self.log_dir = self._ensure_log_dir(archive_config.archive_dir) + if archive_config.archive_dir is not None: + self.log_dir = self._ensure_log_dir(archive_config.archive_dir) + else: + self.log_dir = None registrar.scheduler_register(self.archive_old_dps, **archive_config.schedule.dict()) def delete_old_dps(self): @@ -157,13 +159,9 @@ def archive_old_dps(self): Archives old data points from raw collection. Updates already saved archive files, if present. - - TODO: FIX archive file naming and generalize for shorter archivation windows - Currently will overwrite existing archive files if run more than once a day. """ t_old = datetime.utcnow() - self.keep_raw_delta - t_old = t_old.replace(hour=0, minute=0, second=0, microsecond=0) self.log.debug("Archiving all records before %s ...", t_old) max_date, min_date, total_dps = self._get_raw_dps_summary(t_old) @@ -174,18 +172,19 @@ def archive_old_dps(self): "Found %s datapoints to archive in the range %s - %s", total_dps, min_date, max_date ) - n_days = (max_date - min_date).days + 1 - for date, next_date in [ - (min_date + timedelta(days=n), min_date + timedelta(days=n + 1)) for n in range(n_days) - ]: - date_string = date.strftime("%Y%m%d") - day_datapoints = 0 - date_logfile = self.log_dir / f"dp-log-{date_string}.json" + if self.log_dir is None: + self.log.debug("No archive directory specified, skipping archivation.") + else: + min_date_string = min_date.strftime("%Y%m%dT%H%M%S") + max_date_string = max_date.strftime("%Y%m%dT%H%M%S") + date_logfile = self.log_dir / f"dp-log-{min_date_string}--{max_date_string}.json" + datapoints = 0 with open(date_logfile, "w", encoding="utf-8") as logfile: first = True + for etype in self.model_spec.entities: - result_cursor = self.db.get_raw(etype, after=date, before=next_date) + result_cursor = self.db.get_raw(etype, after=min_date, before=t_old) for dp in result_cursor: if first: logfile.write( @@ -196,23 +195,18 @@ def archive_old_dps(self): logfile.write( f",\n{json.dumps(self._reformat_dp(dp), cls=DatetimeEncoder)}" ) - day_datapoints += 1 + datapoints += 1 logfile.write("\n]") - self.log.debug( - "%s: Archived %s datapoints to %s", date_string, day_datapoints, date_logfile - ) + self.log.info("Archived %s datapoints to %s", datapoints, date_logfile) compress_file(date_logfile) os.remove(date_logfile) - self.log.debug("%s: Saved archive was compressed", date_string) + self.log.debug("Saved archive was compressed") - if not day_datapoints: - continue - - deleted_count = 0 - for etype in self.model_spec.entities: - deleted_res = self.db.delete_old_raw_dps(etype, next_date) - deleted_count += deleted_res.deleted_count - self.log.debug("%s: Deleted %s datapoints", date_string, deleted_count) + deleted_count = 0 + for etype in self.model_spec.entities: + deleted_res = self.db.delete_old_raw_dps(etype, before=t_old) + deleted_count += deleted_res.deleted_count + self.log.info("Deleted %s datapoints", deleted_count) @staticmethod def _reformat_dp(dp): @@ -233,12 +227,8 @@ def _get_raw_dps_summary( date_ranges.append(range_summary) if not date_ranges: return None, None, 0 - min_date = min(x["earliest"] for x in date_ranges).replace( - hour=0, minute=0, second=0, microsecond=0 - ) - max_date = max(x["latest"] for x in date_ranges).replace( - hour=0, minute=0, second=0, microsecond=0 - ) + min_date = min(x["earliest"] for x in date_ranges) + max_date = max(x["latest"] for x in date_ranges) total_dps = sum(x["count"] for x in date_ranges) return max_date, min_date, total_dps