I have a directed acyclic graph in networkx
. Each node represents a task and a nodes' predecessors are task dependencies (a given task cannot execute until its' dependencies have executed).
I'd like to 'execute' the graph in an asynchronous task queue, similar to what celery
offers (so that I can poll jobs for their status, retrieve results etc). Celery doesnt offer the ability to create DAG's (as far as I know) and having the ability to move on to a task
as soon as all dependencies are complete would be crucial (a DAG may have multiple paths and even if one task is slow/blocking, it may be possible to move on to other tasks etc).
Are there any simple examples as to how I could achieve this, or perhaps even integrate networkx
with celery
?
1 Answers
Answers 1
I think this function may help:
# The graph G is represened by a dictionnary following this pattern: # G = { vertex: [ (successor1: weight1), (successor2: weight2),... ] } def progress ( G, start ): Q = [ start ] # contain tasks to execute done = [ ] # contain executed tasks while len (Q) > 0: # still there tasks to execute ? task = Q.pop(0) # pick up the oldest one ready = True for T in G: # make sure all predecessors are executed for S, w in G[T]: if S == task and and S not in done:# found not executed predecessor ready = False break if not ready : break if not ready: Q.appen(task) # the task is not ready for execution else: done.appen(task) # execute the task for S, w in G[task]:# and explore all its successors Q.append(S)
0 comments:
Post a Comment