| Author: | Forest Bond |
|---|---|
| Copyright: | © 2005-2010 Forest Bond |
| License: | GNU Free Documentation License (http://www.gnu.org/licenses/fdl.html) |
The tasks module is intended for use in programs that perform actions in sequence. A common example of this is an installer or setup utility. The tasks module makes it very easy to assemble a set of tasks to run and execute each in turn while tracking state changes for each task.
Tasks are represented by task objects. The tasks module defines several classes whose instances represent various types of tasks with differing behavior.
Task objects possess a number of read-only attributes that reflect their current state.
These attributes are important execution details:
The following boolean flags indicate specific conditions:
These boolean flags are not mutually exclusive, and all are required to adequately represent the current state of the task. The task's state can also be represented as a tuple that specifies which of the above flags are True, current_state. So, if a particular task is running, the current_state attribute will be a tuple containing the string "running". If the task completes successfully, it will be a tuple containing the strings "finished" and "completed".
As we will see shortly, tasks can be interacted with during execution. Some of these interactions will be directives for which state changes can be anticipated. Thus, assuming no external force acts to change it, the final state of a particular task is deterministic at a given point in time. The final_state attribute represents this state whose value is a tuple much like the current_state attribute. Additionally, the terminal attribute is a boolean flag indicating whether or not the task's final state would result in the task becoming finished.
Finally, the following attribute stores additional information in the event of task failure:
Task instances are run by calling the run method, which is actually a generator function that yields data structures representing task state changes. Thus, task instances are properly executed by calling the run method and iterating over the result. Each iteration yields a list of new task changes, which should be assumed to have occured simultaneously.
Instances of the TaskChange class are used to represent individual task state changes. TaskChange objects have the following properties:
While executing a task, each list of TaskChange objects can be viewed as a packet in a stream of state information, or as a series of events that can be handled with an event loop:
for task_changes in mytask.run():
for task_change in task_changes:
if task_change.parameter == 'progress':
print 'Task %s is now %.1f%% complete.' % (
task_change.task, task_change.value)
elif task_change.parameter == 'running':
if task_change.value:
print 'Task %s is now running.' % task_change.task
else:
print 'Task %s is no longer running.' % task_change.task
During execution, directives ("instructions") can be issued that will help to determine the future state of the task. Internally, these instructions are appended to a queue and handled in a normal execution context (inside the run generator). Deferring instruction processing to be handled in run ensures that events are handled in the correct order, despite inevitable lag in the task event stream.
Instructions are issued to the running task using the following methods:
The terminal instructions (fail, cancel, and complete) may be sent when the task is paused or has a pending pause instruction. However, the task must be resumed before these instructions will have any affect (a paused task does not change state). Note also that if a task is currently or will be paused and has not been sent a subsequent resume instruction, pending terminal instructions will not be reflected by the final_state attribute–until the task is sent a resume instruction, the final state of the task is "paused".
After calling an instruction-sending method, task execution continues normally. The iterator returned by method run will continue to emit task change groups. Generally, you should continue to consume task changes until the iterator ends. Some tasks in particular may need to perform clean-up actions before ceasing execution. This can only be done if the caller continues to consume task changes.
Here is an example of using the cancel method:
iterator = mytask.run()
# Consume a single task changes group so that the task begins running.
task_changes = iterator.next()
# Request that task be cancelled.
mytask.cancel()
# Continue to process task changes.
# When the task has actually been cancelled, a task change indicating this
# will be emitted.
for task_changes in iterator:
for task_change in task_changes:
if (
task_change.parameter == 'cancelled'
) and (
task_change.value
):
print 'Task is cancelled.'
The exception to this rule is the pause method. When pause is called, a pause instruction is sent to the task and will be processed as task execution continues. When that happens, a TaskChange will be emitted indicating that the pause parameter is True. Further attempts to consume task change groups from the task changes iterator will produce an empty list of task changes. This will be the case indefinitely, unless the resume method is called. Once the resume instruction has been processed (again, as part of normal iterator-based task execution), new task changes will again be emitted.
Here is an example of pausing and resuming a task:
iterator = mytask.run()
task_changes = iterator.next()
# Request that task be paused.
mytask.pause()
try:
for task_changes in iterator:
for task_change in task_changes:
if (
task_change.parameter == 'paused'
) and (
task_change.value
):
raise StopIteration()
except StopIteration:
pass
# iterator.next() will now return [] now matter how many times we call it
mytask.resume()
for task_changes in iterator:
for task_change in task_changes:
pass
# mytask is now completed
When dealing with ComplexTasks, the following notes should be observed:
If an instruction is issued that is impossible to fulfill due to the current state of the task, an InstructionError is raised immediately. However, note that, while a task may initially accept an instruction, it may not necessarily be possible for the instruction to be carried out.
Internally, instructions are fulfilled by iterating over values returned by the following methods (for which the default implementations are generators):
Sub-classes can override and wrap these methods to customize handling of instructions. Each method should return an iterator over sequences of TaskChanges, the same as the run method. For instance, the following task class would provide a status update prior to cancellation:
>>> from tasks import Task
>>> class TaskWithCancellationNotification(Task):
... def do_cancel(self):
... yield [self.change('status', 'cancelling...')]
... for task_changes in super(
... TaskWithCancellationNotification, self
... ).do_cancel():
... yield task_changes
All tasks are instances of the Task class or a subclass thereof. A Task is the most general implementation. It represents a single atomic task that performs the function implemented by its do_run method. The default implementation simply raises NotImplementedError, and must be overridden by a subclass.
The ComplexTask class expands on Task. ComplexTasks are tasks that are made up of one or more subtasks. The subtasks that must be performed are passed to the ComplexTask initializer.
The Task and ComplexTask initializers accept some common keyword parameters:
Typically, a subclass is defined that minimally provides a do_run implementation, like this:
>>> from tasks import Task >>> class MyTask(Task): ... def do_run(self): ... yield []
The Task initializer requires one positional arguments: a name for the task. To create an instance of the MyTask class, do this:
>>> my_task = MyTask('my_task')
do_run should return an iterator whose members are lists of TaskChange objects indicating state changes for the task.
Most of the time, task changes will be yielded when the task metadata has changed in some way, like a status change or progress update. However, the yield statement also serves as the primary mechanism by which control is temporarily returned to the caller. As a result, if the do_run implementation engages in some activity that would cause a long period of time to go by without any yield statements executed, additionally yield statements should be inserted. These additional statements should yield an empty list.
A simple do_run implementation follows:
>>> import time >>> from tasks import TaskChange
>>> class MyTask(Task):
... def do_run(task):
... self.status = 'Sleeping...'
... yield [TaskChange('status', self.status, self)]
... for x in range(10):
... time.sleep(0.1)
... old_progress = self.progress
... self.progress = old_progress + 10.0
... yield [TaskChange('progress', self.progress, self)]
... yield self.finish()
Note that all of the instruction-issuing methods discussed previously can be called from within do_run like the finish method was in this example (yieling the result of the call).
Actually, the example do_run implementation above is not as concise as it could be. The change convenience method, available as an attribute of Task objects, allows changes to be made and TaskChange instances to be generated at the same time. Its use is simple; for instance, the above example can be rewritten as:
>>> import time
>>> class MyTask(Task):
... def do_run(self):
... yield [self.change('status', 'Sleeping...')]
... for x in range(10):
... time.sleep(0.1)
... yield [self.change('progress', self.progress + 10.0)]
... yield self.finish()
Further, this particular implementation can be made even simpler by relying on the built-in clean-up mechanisms that run automatically after do_run returns. Any task that hasn't explicitly terminated is considered finished and, to preserve the integrity of the stream of changes, the necessary task changes are introduced automatically. As a result, the task function needn't explicitly call finish:
>>> import time
>>> class MyTask(Task):
... def do_run(self):
... yield [self.change('status', 'Sleeping...')]
... for x in range(10):
... time.sleep(0.1)
... yield [self.change('progress', self.progress + 10.0)]
Failures are indicated much as one would expect:
>>> class MyTask(Task):
... def do_run(self):
... yield [self.change('status', 'Printing "foo"')]
... print 'foo'
... yield [
... self.change('status', 'Finished printing.'),
... self.change('progress', 50),
... ]
...
... yield [self.change('status', 'Reading file "bar"')]
... try:
... f = open('bar', 'r')
... try:
... f.read()
... finally:
... f.close()
... except (IOError, OSError):
... yield self.fail('Failed to read file "bar"')
... yield [self.change('status', 'Finished reading file.')]
After fail is called, do_run iteration will cease. The final status update will never occur in that case.
ComplexTasks need not be subclassed to be useful. Simply initialize a ComplexTask instance with a list of subtasks to be executed sequentially. For example:
>>> import time >>> from tasks import ComplexTask, Task
>>> class Sleep10Task(Task):
... def do_run(self):
... yield [self.change('status', 'Sleeping')]
... for x in range(10):
... yield [self.change('progress', self.progress + 10.0)]
... time.sleep(0.1)
>>> sleep30 = ComplexTask('sleep30', subtasks = [
... Sleep10Task('sleep10_1'),
... Sleep10Task('sleep10_2'),
... Sleep10Task('sleep10_3'),
... ])
>>> all_task_changes = [] >>> for task_changes in sleep30.run(): ... all_task_changes = all_task_changes + task_changes
>>> for task_change in all_task_changes[:9]: ... print task_change sleep30 running: False => True sleep30 running_subtask: None => <Sleep10Task sleep10_1> sleep30 status: '' => 'running sleep10_1' sleep10_1 running: False => True sleep10_1 status: '' => 'Sleeping' sleep10_1 progress: 0.0 => 10.0 sleep30 progress: 0.0 => 3.3333333333333335 sleep10_1 progress: 10.0 => 20.0 sleep30 progress: 3.3333333333333335 => 6.666666666666667
>>> for task_change in all_task_changes[-9:]: ... print task_change sleep30 progress: 96.666666666666629 => 99.999999999999957 sleep10_3 completed: False => True sleep10_3 finished: False => True sleep10_3 running: True => False sleep30 running_subtask: <Sleep10Task sleep10_3> => None sleep30 progress: 99.999999999999957 => 100.0 sleep30 completed: False => True sleep30 finished: False => True sleep30 running: True => False
See the tests included with the source distribution for more complete examples of usage.