Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler/default scheduled #4871

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions fiftyone/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"""

import argparse
import warnings
from collections import defaultdict
from datetime import datetime
import json
Expand All @@ -19,8 +18,6 @@

import argcomplete
from bson import ObjectId
import humanize
import pytz
from tabulate import tabulate
import webbrowser

Expand Down Expand Up @@ -3105,7 +3102,7 @@ def _launch_delegated_local():
print("Delegated operation service running")
print("\nTo exit, press ctrl + c")
while True:
dos.execute_queued_operations(limit=1, log=True)
dos.execute_scheduled_operations(limit=1, log=True)
time.sleep(0.5)
except KeyboardInterrupt:
pass
Expand Down
11 changes: 8 additions & 3 deletions fiftyone/factory/repos/delegated_operation_doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import logging
from datetime import datetime
from fiftyone.internal.util import is_remote_service

from fiftyone.operators.executor import (
ExecutionContext,
Expand Down Expand Up @@ -36,17 +37,21 @@ def __init__(
)
self.run_state = (
ExecutionRunState.QUEUED
) # default to queued state on create
if is_remote_service()
else ExecutionRunState.SCHEDULED
) # if running locally use SCHEDULED otherwise QUEUED
self.run_link = None
self.queued_at = datetime.utcnow()
self.queued_at = datetime.utcnow() if is_remote_service() else None
self.updated_at = datetime.utcnow()
self.status = None
self.dataset_id = None
self.started_at = None
self.pinned = False
self.completed_at = None
self.failed_at = None
self.scheduled_at = None
self.scheduled_at = (
datetime.utcnow() if not is_remote_service() else None
)
self.result = None
self.id = None
self._doc = None
Expand Down
1 change: 1 addition & 0 deletions fiftyone/internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
"""

from .secrets import *
from .util import is_remote_service
34 changes: 34 additions & 0 deletions fiftyone/internal/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
FiftyOne internal utilities.

| Copyright 2017-2024, Voxel51, Inc.
| `voxel51.com <https://voxel51.com/>`_
|
"""


def is_remote_service():
"""Whether the SDK is running in a remote service context.

Returns:
True/False
"""
return has_encryption_key() and has_api_key()


def has_encryption_key():
"""Whether the current environment has an encryption key.

Returns:
True/False
"""
return False


def has_api_key():
"""Whether the current environment has an API key.

Returns:
True/False
"""
return False
10 changes: 5 additions & 5 deletions fiftyone/operators/delegated.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def list_operations(
**kwargs,
)

def execute_queued_operations(
def execute_scheduled_operations(
self,
operator=None,
delegation_target=None,
Expand All @@ -336,7 +336,7 @@ def execute_queued_operations(
log=False,
**kwargs,
):
"""Executes queued delegated operations matching the given criteria.
"""Executes scheduled delegated operations matching the given criteria.

Args:
operator (None): the optional name of the operator to execute all
Expand All @@ -355,16 +355,16 @@ def execute_queued_operations(
else:
paging = None

queued_ops = self.list_operations(
scheduled_ops = self.list_operations(
operator=operator,
dataset_name=dataset_name,
delegation_target=delegation_target,
run_state=ExecutionRunState.QUEUED,
run_state=ExecutionRunState.SCHEDULED,
paging=paging,
**kwargs,
)

for op in queued_ops:
for op in scheduled_ops:
self.execute_operation(operation=op, log=log)

def count(self, filters=None, search=None):
Expand Down
Loading
Loading