tasks

task-oriented execution for Python

Author: Forest Bond
Copyright: © 2005-2010 Forest Bond
License:GNU Free Documentation License (http://www.gnu.org/licenses/fdl.html)

Contents

Overview

The tasks module is intended for use in programs that perform actions in sequence. A common example of this is an installer or setup utility. The tasks module makes it very easy to assemble a set of tasks to run and execute each in turn while tracking state changes for each task.

Tasks are represented by task objects. The tasks module defines several classes whose instances represent various types of tasks with differing behavior.

Task State

Task objects possess a number of read-only attributes that reflect their current state.

These attributes are important execution details:

progress
An integer or float that indicates the percent of the task that has been completed.
status
A textual description of the current state of the task.

The following boolean flags indicate specific conditions:

finished
A boolean; indicates whether or not the task has run and reached a terminal state.
completed
A boolean; indicates if the task has run and finished successfully.
failed
A boolean; indicates if the task has run and finished unsuccessfully.
paused
A boolean; indicates if the task is currently paused.
cancelled
A boolean; indicates if the task has terminated without finishing.
running
A boolean; indicates whether or not the task is currently being executed.

These boolean flags are not mutually exclusive, and all are required to adequately represent the current state of the task. The task's state can also be represented as a tuple that specifies which of the above flags are True, current_state. So, if a particular task is running, the current_state attribute will be a tuple containing the string "running". If the task completes successfully, it will be a tuple containing the strings "finished" and "completed".

As we will see shortly, tasks can be interacted with during execution. Some of these interactions will be directives for which state changes can be anticipated. Thus, assuming no external force acts to change it, the final state of a particular task is deterministic at a given point in time. The final_state attribute represents this state whose value is a tuple much like the current_state attribute. Additionally, the terminal attribute is a boolean flag indicating whether or not the task's final state would result in the task becoming finished.

Finally, the following attribute stores additional information in the event of task failure:

failure
If failed is True, this may contain an object indicating the nature of the failure. If no reason was provided by the task, this attribute will be None.

Running Tasks and Processing State Changes

Task instances are run by calling the run method, which is actually a generator function that yields data structures representing task state changes. Thus, task instances are properly executed by calling the run method and iterating over the result. Each iteration yields a list of new task changes, which should be assumed to have occured simultaneously.

Instances of the TaskChange class are used to represent individual task state changes. TaskChange objects have the following properties:

parameter
The name of the task property that is changing.
value
The new value for that property.
oldvalue
The value of that property prior to the change.
task
The task for which the change is occuring.

While executing a task, each list of TaskChange objects can be viewed as a packet in a stream of state information, or as a series of events that can be handled with an event loop:

for task_changes in mytask.run():
    for task_change in task_changes:
        if task_change.parameter == 'progress':
            print 'Task %s is now %.1f%% complete.' % (
              task_change.task, task_change.value)
        elif task_change.parameter == 'running':
            if task_change.value:
                print 'Task %s is now running.' % task_change.task
            else:
                print 'Task %s is no longer running.' % task_change.task

Interacting With Running Tasks

During execution, directives ("instructions") can be issued that will help to determine the future state of the task. Internally, these instructions are appended to a queue and handled in a normal execution context (inside the run generator). Deferring instruction processing to be handled in run ensures that events are handled in the correct order, despite inevitable lag in the task event stream.

Instructions are issued to the running task using the following methods:

fail
Causes the task to terminate with failure status.
cancel
Causes the task to terminate without finishing.
complete
Causes the task to terminate successfully.
pause
Causes the task to suspend execution indefinitely. The iterator returned by the run method will emit an empty list of task changes forever until the task is subsequently resumed.
resume
Causes the task to resume execution, terminating the paused state.

The terminal instructions (fail, cancel, and complete) may be sent when the task is paused or has a pending pause instruction. However, the task must be resumed before these instructions will have any affect (a paused task does not change state). Note also that if a task is currently or will be paused and has not been sent a subsequent resume instruction, pending terminal instructions will not be reflected by the final_state attribute–until the task is sent a resume instruction, the final state of the task is "paused".

Instruction Handling

After calling an instruction-sending method, task execution continues normally. The iterator returned by method run will continue to emit task change groups. Generally, you should continue to consume task changes until the iterator ends. Some tasks in particular may need to perform clean-up actions before ceasing execution. This can only be done if the caller continues to consume task changes.

Here is an example of using the cancel method:

iterator = mytask.run()

# Consume a single task changes group so that the task begins running.
task_changes = iterator.next()

# Request that task be cancelled.
mytask.cancel()

# Continue to process task changes.
# When the task has actually been cancelled, a task change indicating this
# will be emitted.
for task_changes in iterator:
    for task_change in task_changes:
        if (
          task_change.parameter == 'cancelled'
        ) and (
          task_change.value
        ):
            print 'Task is cancelled.'

The exception to this rule is the pause method. When pause is called, a pause instruction is sent to the task and will be processed as task execution continues. When that happens, a TaskChange will be emitted indicating that the pause parameter is True. Further attempts to consume task change groups from the task changes iterator will produce an empty list of task changes. This will be the case indefinitely, unless the resume method is called. Once the resume instruction has been processed (again, as part of normal iterator-based task execution), new task changes will again be emitted.

Here is an example of pausing and resuming a task:

iterator = mytask.run()
task_changes = iterator.next()

# Request that task be paused.
mytask.pause()

try:
    for task_changes in iterator:
        for task_change in task_changes:
            if (
              task_change.parameter == 'paused'
            ) and (
              task_change.value
            ):
                raise StopIteration()
except StopIteration:
    pass

# iterator.next() will now return [] now matter how many times we call it

mytask.resume()
for task_changes in iterator:
    for task_change in task_changes:
        pass

# mytask is now completed

When dealing with ComplexTasks, the following notes should be observed:

  • subtasks can be completed or failed directly. This works as you would expect.
  • subtasks can be cancelled directly. This will be treated as a failure by the parent task (if the subtask is critical, it causes the parent to fail, etc.).
  • subtasks should never be paused or resumed directly. Instead, call pause or resume directly on the root parent task. Calling pause or resume on a subtask will lead to an exception when the task change is seen by the parent task.

InstructionErrors

If an instruction is issued that is impossible to fulfill due to the current state of the task, an InstructionError is raised immediately. However, note that, while a task may initially accept an instruction, it may not necessarily be possible for the instruction to be carried out.

Internally, instructions are fulfilled by iterating over values returned by the following methods (for which the default implementations are generators):

  • do_fail
  • do_cancel
  • do_complete
  • do_pause
  • do_resume

Sub-classes can override and wrap these methods to customize handling of instructions. Each method should return an iterator over sequences of TaskChanges, the same as the run method. For instance, the following task class would provide a status update prior to cancellation:

>>> from tasks import Task
>>> class TaskWithCancellationNotification(Task):
...     def do_cancel(self):
...         yield [self.change('status', 'cancelling...')]
...         for task_changes in super(
...           TaskWithCancellationNotification, self
...         ).do_cancel():
...             yield task_changes

Defining Tasks

All tasks are instances of the Task class or a subclass thereof. A Task is the most general implementation. It represents a single atomic task that performs the function implemented by its do_run method. The default implementation simply raises NotImplementedError, and must be overridden by a subclass.

The ComplexTask class expands on Task. ComplexTasks are tasks that are made up of one or more subtasks. The subtasks that must be performed are passed to the ComplexTask initializer.

The Task and ComplexTask initializers accept some common keyword parameters:

description
Usually a short sentence explaining the purpose of the task.
critical
A boolean indicating whether or not failure of this task represents a critical failure. In general, a critical failure can be handled differently by callers. For instance, complex tasks halt execution immediately when a critical subtask fails. This parameter can also be given the special value DEFER (a constant defined by the tasks module) to indicate that, while the task is critical, failure should not cause execution to halt immediately.
sufficient
A boolean indicating that completion of this task is sufficient to consider its parent completed as well. This largely only makes sense in the context of ComplexTask structures, but other callers are free to interpret it in the most appropriate way.
progress_weight
A float indicating how progress values for different tasks should be compared. Essentially, this serves as a rough indication of how much longer it tasks to complete one task versus another. For instance, if task A takes twice as long as task B to complete, task A could reasonably have a progress weight of 2.0, while the progress weight for task B would be 1.0. This makes it possible to produce more accurate estimations of total progress for completion of multiple tasks.

Task Subclasses

Typically, a subclass is defined that minimally provides a do_run implementation, like this:

>>> from tasks import Task
>>> class MyTask(Task):
...     def do_run(self):
...         yield []

The Task initializer requires one positional arguments: a name for the task. To create an instance of the MyTask class, do this:

>>> my_task = MyTask('my_task')

do_run should return an iterator whose members are lists of TaskChange objects indicating state changes for the task.

Most of the time, task changes will be yielded when the task metadata has changed in some way, like a status change or progress update. However, the yield statement also serves as the primary mechanism by which control is temporarily returned to the caller. As a result, if the do_run implementation engages in some activity that would cause a long period of time to go by without any yield statements executed, additionally yield statements should be inserted. These additional statements should yield an empty list.

A simple do_run implementation follows:

>>> import time
>>> from tasks import TaskChange
>>> class MyTask(Task):
...     def do_run(task):
...         self.status = 'Sleeping...'
...         yield [TaskChange('status', self.status, self)]
...         for x in range(10):
...             time.sleep(0.1)
...             old_progress = self.progress
...             self.progress = old_progress + 10.0
...             yield [TaskChange('progress', self.progress, self)]
...         yield self.finish()

Note that all of the instruction-issuing methods discussed previously can be called from within do_run like the finish method was in this example (yieling the result of the call).

Actually, the example do_run implementation above is not as concise as it could be. The change convenience method, available as an attribute of Task objects, allows changes to be made and TaskChange instances to be generated at the same time. Its use is simple; for instance, the above example can be rewritten as:

>>> import time
>>> class MyTask(Task):
...     def do_run(self):
...         yield [self.change('status', 'Sleeping...')]
...         for x in range(10):
...             time.sleep(0.1)
...             yield [self.change('progress', self.progress + 10.0)]
...         yield self.finish()

Further, this particular implementation can be made even simpler by relying on the built-in clean-up mechanisms that run automatically after do_run returns. Any task that hasn't explicitly terminated is considered finished and, to preserve the integrity of the stream of changes, the necessary task changes are introduced automatically. As a result, the task function needn't explicitly call finish:

>>> import time
>>> class MyTask(Task):
...     def do_run(self):
...         yield [self.change('status', 'Sleeping...')]
...         for x in range(10):
...             time.sleep(0.1)
...             yield [self.change('progress', self.progress + 10.0)]

Indicating Task Failures

Failures are indicated much as one would expect:

>>> class MyTask(Task):
...     def do_run(self):
...         yield [self.change('status', 'Printing "foo"')]
...         print 'foo'
...         yield [
...           self.change('status', 'Finished printing.'),
...           self.change('progress', 50),
...         ]
...
...         yield [self.change('status', 'Reading file "bar"')]
...         try:
...             f = open('bar', 'r')
...             try:
...                 f.read()
...             finally:
...                 f.close()
...         except (IOError, OSError):
...             yield self.fail('Failed to read file "bar"')
...         yield [self.change('status', 'Finished reading file.')]

After fail is called, do_run iteration will cease. The final status update will never occur in that case.

ComplexTask Objects

ComplexTasks need not be subclassed to be useful. Simply initialize a ComplexTask instance with a list of subtasks to be executed sequentially. For example:

>>> import time
>>> from tasks import ComplexTask, Task
>>> class Sleep10Task(Task):
...     def do_run(self):
...         yield [self.change('status', 'Sleeping')]
...         for x in range(10):
...             yield [self.change('progress', self.progress + 10.0)]
...             time.sleep(0.1)
>>> sleep30 = ComplexTask('sleep30', subtasks = [
...   Sleep10Task('sleep10_1'),
...   Sleep10Task('sleep10_2'),
...   Sleep10Task('sleep10_3'),
... ])
>>> all_task_changes = []
>>> for task_changes in sleep30.run():
...     all_task_changes = all_task_changes + task_changes
>>> for task_change in all_task_changes[:9]:
...     print task_change
sleep30 running: False => True
sleep30 running_subtask: None => <Sleep10Task sleep10_1>
sleep30 status: '' => 'running sleep10_1'
sleep10_1 running: False => True
sleep10_1 status: '' => 'Sleeping'
sleep10_1 progress: 0.0 => 10.0
sleep30 progress: 0.0 => 3.3333333333333335
sleep10_1 progress: 10.0 => 20.0
sleep30 progress: 3.3333333333333335 => 6.666666666666667
>>> for task_change in all_task_changes[-9:]:
...     print task_change
sleep30 progress: 96.666666666666629 => 99.999999999999957
sleep10_3 completed: False => True
sleep10_3 finished: False => True
sleep10_3 running: True => False
sleep30 running_subtask: <Sleep10Task sleep10_3> => None
sleep30 progress: 99.999999999999957 => 100.0
sleep30 completed: False => True
sleep30 finished: False => True
sleep30 running: True => False

More Information

See the tests included with the source distribution for more complete examples of usage.