Skip to content

Commit

Permalink
HistoryManager: Customizable aggregation window.
Browse files Browse the repository at this point in the history
As any cron expression is allowed, the retroactive, per-day batching implementation was dropped.
Implemented option of not having an archive.
  • Loading branch information
xsedla1o committed Aug 9, 2023
1 parent d8b8531 commit e0cde8e
Showing 1 changed file with 23 additions and 33 deletions.
56 changes: 23 additions & 33 deletions dp3/history_management/history_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ def __init__(
# Schedule datapoint archivation
archive_config = self.config.datapoint_archivation
self.keep_raw_delta = archive_config.older_than
# TODO: Handle None as valid archive dir (simply delete datapoints)
self.log_dir = self._ensure_log_dir(archive_config.archive_dir)
if archive_config.archive_dir is not None:
self.log_dir = self._ensure_log_dir(archive_config.archive_dir)
else:
self.log_dir = None
registrar.scheduler_register(self.archive_old_dps, **archive_config.schedule.dict())

def delete_old_dps(self):
Expand Down Expand Up @@ -157,13 +159,9 @@ def archive_old_dps(self):
Archives old data points from raw collection.
Updates already saved archive files, if present.
TODO: FIX archive file naming and generalize for shorter archivation windows
Currently will overwrite existing archive files if run more than once a day.
"""

t_old = datetime.utcnow() - self.keep_raw_delta
t_old = t_old.replace(hour=0, minute=0, second=0, microsecond=0)
self.log.debug("Archiving all records before %s ...", t_old)

max_date, min_date, total_dps = self._get_raw_dps_summary(t_old)
Expand All @@ -174,18 +172,19 @@ def archive_old_dps(self):
"Found %s datapoints to archive in the range %s - %s", total_dps, min_date, max_date
)

n_days = (max_date - min_date).days + 1
for date, next_date in [
(min_date + timedelta(days=n), min_date + timedelta(days=n + 1)) for n in range(n_days)
]:
date_string = date.strftime("%Y%m%d")
day_datapoints = 0
date_logfile = self.log_dir / f"dp-log-{date_string}.json"
if self.log_dir is None:
self.log.debug("No archive directory specified, skipping archivation.")
else:
min_date_string = min_date.strftime("%Y%m%dT%H%M%S")
max_date_string = max_date.strftime("%Y%m%dT%H%M%S")
date_logfile = self.log_dir / f"dp-log-{min_date_string}--{max_date_string}.json"
datapoints = 0

with open(date_logfile, "w", encoding="utf-8") as logfile:
first = True

for etype in self.model_spec.entities:
result_cursor = self.db.get_raw(etype, after=date, before=next_date)
result_cursor = self.db.get_raw(etype, after=min_date, before=t_old)
for dp in result_cursor:
if first:
logfile.write(
Expand All @@ -196,23 +195,18 @@ def archive_old_dps(self):
logfile.write(
f",\n{json.dumps(self._reformat_dp(dp), cls=DatetimeEncoder)}"
)
day_datapoints += 1
datapoints += 1
logfile.write("\n]")
self.log.debug(
"%s: Archived %s datapoints to %s", date_string, day_datapoints, date_logfile
)
self.log.info("Archived %s datapoints to %s", datapoints, date_logfile)
compress_file(date_logfile)
os.remove(date_logfile)
self.log.debug("%s: Saved archive was compressed", date_string)
self.log.debug("Saved archive was compressed")

if not day_datapoints:
continue

deleted_count = 0
for etype in self.model_spec.entities:
deleted_res = self.db.delete_old_raw_dps(etype, next_date)
deleted_count += deleted_res.deleted_count
self.log.debug("%s: Deleted %s datapoints", date_string, deleted_count)
deleted_count = 0
for etype in self.model_spec.entities:
deleted_res = self.db.delete_old_raw_dps(etype, before=t_old)
deleted_count += deleted_res.deleted_count
self.log.info("Deleted %s datapoints", deleted_count)

@staticmethod
def _reformat_dp(dp):
Expand All @@ -233,12 +227,8 @@ def _get_raw_dps_summary(
date_ranges.append(range_summary)
if not date_ranges:
return None, None, 0
min_date = min(x["earliest"] for x in date_ranges).replace(
hour=0, minute=0, second=0, microsecond=0
)
max_date = max(x["latest"] for x in date_ranges).replace(
hour=0, minute=0, second=0, microsecond=0
)
min_date = min(x["earliest"] for x in date_ranges)
max_date = max(x["latest"] for x in date_ranges)
total_dps = sum(x["count"] for x in date_ranges)
return max_date, min_date, total_dps

Expand Down

0 comments on commit e0cde8e

Please sign in to comment.