Skip to content

Commit

Permalink
owneuralnetwork: Atomic disconnect from task state update notifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
ales-erjavec authored and markotoplak committed Mar 23, 2018
1 parent e7c89a2 commit 9f1da67
Showing 1 changed file with 55 additions and 32 deletions.
87 changes: 55 additions & 32 deletions Orange/widgets/model/owneuralnetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,39 @@
import concurrent.futures

from AnyQt.QtWidgets import QApplication, qApp
from AnyQt.QtCore import Qt, QThread
from AnyQt.QtCore import pyqtSlot as Slot
from AnyQt.QtCore import Qt, QThread, QObject
from AnyQt.QtCore import pyqtSlot as Slot, pyqtSignal as Signal

from Orange.data import Table
from Orange.modelling import NNLearner
from Orange.widgets import gui
from Orange.widgets.settings import Setting
from Orange.widgets.utils.owlearnerwidget import OWBaseLearner

from Orange.widgets.utils.concurrent import (
ThreadExecutor, FutureWatcher, methodinvoke
)
from Orange.widgets.utils.concurrent import ThreadExecutor, FutureWatcher


class Task:
class Task(QObject):
"""
A class that will hold the state for an learner evaluation.
"""
future = ... # type: concurrent.futures.Future
watcher = ... # type: FutureWatcher
done = Signal(object)
progressChanged = Signal(float)

future = None # type: concurrent.futures.Future
watcher = None # type: FutureWatcher
cancelled = False # type: bool

def __init__(self, parent=None, **kwargs):
super().__init__(parent, **kwargs)

def setFuture(self, future):
if self.future is not None:
raise RuntimeError("future is already set")
self.future = future
self.watcher = FutureWatcher(future, parent=self)
self.watcher.done.connect(self.done)

def cancel(self):
"""
Cancel the task.
Expand All @@ -39,8 +50,14 @@ def cancel(self):
self.future.cancel()
concurrent.futures.wait([self.future])

def emitProgressUpdate(self, value):
self.progressChanged.emit(value)

def isInterruptionRequested(self):
return self.cancelled


class CancelThreadException(BaseException):
class CancelTaskException(BaseException):
pass


Expand Down Expand Up @@ -142,35 +159,42 @@ def __update(self):
self.cancel()
assert self._task is None

self.setBlocking(True)

self._task = task = Task()

# A thread safe way to invoke a method
set_progress = methodinvoke(self, "setProgressValue", (float,))

max_iter = self.learner.kwargs["max_iter"]

# Setup the task state
task = Task()
lastemitted = 0.

def callback(iteration):
if task.cancelled:
raise CancelThreadException() # this stop the thread
set_progress(iteration/max_iter*100)
nonlocal task # type: Task
nonlocal lastemitted
if task.isInterruptionRequested():
raise CancelTaskException()
progress = round(iteration / max_iter * 100)
if progress != lastemitted:
task.emitProgressUpdate(progress)
lastemitted = progress

# copy to set the callback so that the learner output is not modified
# (currently we can not pass callbacks to learners __call__)
learner = copy.copy(self.learner)
learner.callback = callback

def build_model(data, learner):
return learner(data)
try:
return learner(data)
except CancelTaskException:
return None

build_model_func = partial(build_model, self.data, learner)

self.progressBarInit()
task.setFuture(self._executor.submit(build_model_func))
task.done.connect(self._task_finished)
task.progressChanged.connect(self.setProgressValue)

task.future = self._executor.submit(build_model_func)
task.watcher = FutureWatcher(task.future)
task.watcher.done.connect(self._task_finished)
self._task = task
self.progressBarInit()
self.setBlocking(True)

@Slot(concurrent.futures.Future)
def _task_finished(self, f):
Expand All @@ -184,10 +208,9 @@ def _task_finished(self, f):
assert self._task is not None
assert self._task.future is f
assert f.done()

self.setBlocking(False)

self._task.deleteLater()
self._task = None
self.setBlocking(False)
self.progressBarFinished()

try:
Expand All @@ -210,12 +233,12 @@ def cancel(self):
if self._task is not None:
self._task.cancel()
assert self._task.future.done()
# disconnect the `_task_finished` slot
self._task.watcher.done.disconnect(self._task_finished)
# disconnect from the task
self._task.done.disconnect(self._task_finished)
self._task.progressChanged.disconnect(self.setProgressValue)
self._task.deleteLater()
self._task = None
# threads use signals to run functions in the main thread and some
# can still be quoued (perhaps change)
qApp.processEvents()

self.progressBarFinished()
self.setBlocking(False)

Expand Down

0 comments on commit 9f1da67

Please sign in to comment.