I am trying to set a maximum run time for my celery jobs.
I am currently recovering from exceptions with a context manager. I ended up with code very similar to this snippet:
from celery.exceptions import SoftTimeLimitExceeded class Manager: def __enter__(self): return self def __exit__(self, error_type, error, tb): if error_type == SoftTimeLimitExceeded: logger.info('job killed.') # swallow the exception return True @task def do_foo(): with Manager(): run_task1() run_task2() run_task3()
What I expected:
If do_foo
times out in run_task1
, the logger logs, the SoftTimeLimitExceeded exception is swallowed, the body of the manager is skipped, the job ends without running run_task2
and run_task3
.
What I observe: do_foo
times out in run_task1
, SoftTimeLimitExceeded is raised, the logger logs, the SoftTimeLimitExceeded exception is swallowed but run_task2
and run_task3
are running nevertheless.
I am looking for an answer to following two questions:
Why is
run_task2
still executed when SoftTimeLimitExceeded is raised inrun_task1
in this setting?Is there an easy way to transform my code so that it can performs as expected?
1 Answers
Answers 1
Cleaning up the code
This code is pretty good; there's not much cleaning up to do.
- You shouldn't return
self
from__enter__
if the context manager isn't designed to be used with theas
keyword. is
should be used when checking classes, since they are singletons...- but you should prefer
issubclass
to properly emulate exception handling.
Implementing these changes gives:
from celery.exceptions import SoftTimeLimitExceeded class Manager: def __enter__(self): pass def __exit__(self, error_type, error, tb): if issubclass(error_type, SoftTimeLimitExceeded): logger.info('job killed.') # swallow the exception return True @task def do_foo(): with Manager(): run_task1() run_task2() run_task3()
Debugging
I created a mock environment for debugging:
class SoftTimeLimitExceeded(Exception): pass class Logger: info = print logger = Logger() del Logger def task(f): return f def run_task1(): print("running task 1") raise SoftTimeLimitExceeded def run_task2(): print("running task 2") def run_task_3(): print("running task 3")
Executing this and then your program gives:
>>> do_foo() running task 1 job killed.
This is the expected behaviour.
Hypotheses
I can think of two possibilities:
- Something in the chain, probably
run_task1
, is asynchronous. celery
is doing something weird.
I'll run with the second hypothesis because I can't test the former.
I've been bitten by the obscure behaviour of a combination between context managers, exceptions and coroutines before, so I know what sorts of problems it causes. This seems like one of them, but I'll have to look at celery
's code before I can go any further.
Edit: I can't make head nor tail of celery
's code, and searching hasn't turned up the code that raises SoftTimeLimitExceeded
to allow me to trace it backwards. I'll pass it on to somebody more experienced with celery
to see if they can work out how it works.
0 comments:
Post a Comment