Sunday, April 24, 2016

Networkx as a task queue?

Leave a Comment

I have a directed acyclic graph in networkx. Each node represents a task and a nodes' predecessors are task dependencies (a given task cannot execute until its' dependencies have executed).

I'd like to 'execute' the graph in an asynchronous task queue, similar to what celery offers (so that I can poll jobs for their status, retrieve results etc). Celery doesnt offer the ability to create DAG's (as far as I know) and having the ability to move on to a task as soon as all dependencies are complete would be crucial (a DAG may have multiple paths and even if one task is slow/blocking, it may be possible to move on to other tasks etc).

Are there any simple examples as to how I could achieve this, or perhaps even integrate networkx with celery?

1 Answers

Answers 1

I think this function may help:

  # The graph G is represened by a dictionnary following this pattern:   # G = { vertex: [ (successor1: weight1), (successor2: weight2),...   ]  }   def progress ( G, start ):      Q = [ start ] # contain tasks to execute      done = [ ]    # contain executed tasks      while len (Q) > 0: # still there tasks to execute ?         task = Q.pop(0) # pick up the oldest one          ready = True         for T in G:     # make sure all predecessors are executed            for S, w in G[T]:               if S == task and and S not in done:# found not executed predecessor                   ready = False                  break            if not ready : break         if not ready:            Q.appen(task) # the task is not ready for execution         else:            done.appen(task) # execute the task            for S, w in G[task]:# and explore all its successors               Q.append(S) 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment