Sunday, September 24, 2017

Python exit from all running threads on truthy condition

Leave a Comment

I am using threads to check a header status code from an API url. How can i break loop/stop all other threads if condition is true. Please check following code..

import logging, time, threading, requests  #: Log items logging.basicConfig(format='%(asctime)s %(levelname)s : %(message)s', level=logging.INFO)  class EppThread(threading.Thread):     def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None):         threading.Thread.__init__(self, group=group, target=target, name=name, verbose=verbose)         self.args = args      def run(self):         startTime = time.time()         url = self.args[0]         limit = self.args[1]          for i in range(limit):             response = requests.get(url)             if response.status_code != 200:                 break                 #Exit other similar threads (with same url)             else:                 print('Thread {0} - success'.format(thread.getName()))           print('process completed')          # Send Email   number_of_threads = 5 number_of_requests = 100  urls = ['https://v1.api.com/example', 'https://v2.api.com/example']  if __name__ == '__main__':     startTime = time.time()      for url in urls:         threads = []         for i in range(number_of_threads):             et = EppThread(name = "{0}-Thread-{1}".format(name, i + 1), args=(url, number_of_requests))             threads.append(et)             et.start()      # Check if execution time is not greater than 1 minute       while len(threads) > 0 and (time.time() - startTime) < 60:         time.sleep(0.5)         for thread in threads:             if not thread.isAlive():                 threads.remove(thread)                 print('Thread {0} terminated'.format(thread.getName()))      os._exit(1) 

Please suggest some better ways that stops code execution if condition gets true in any running thread.

Thanks for your help.

3 Answers

Answers 1

An important thing to note here is that when the run method of a Thread is complete, the Thread is set to dead and garbage collected. So all we really need is a boolean class variable that breaks that loop. Class variables are the same for all objects instantiated from that class and subclasses; so once we set it, all of the objects in our class will act the same way:

import logging, time, threading, requests  #: Log items logging.basicConfig(format='%(asctime)s %(levelname)s : %(message)s', level=logging.INFO)   class EppThread(threading.Thread):     kill = False  # new Boolean class variable     url = 'https://v1.api.com/example'  # keep this in mind for later      def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None):         threading.Thread.__init__(self, group=group, target=target, name=name, verbose=verbose)         self.args = args      def run(self):         limit = self.args[0]          for i in range(limit):             response = requests.get(self.url)             if response.status_code != 200:                 self.kill = True  # ends this loop on all Threads since it's changing a class variable              else:                 print('Thread {0} - success'.format(self.getName()))  # changed to self.getName()              if self.kill:  # if kill is True, break the loop, send the email, and finish the Thread                 break          print('process completed')         # Send Email   number_of_threads = 5 number_of_requests = 100  if __name__ == '__main__':     startTime = time.time()      threads = []     for i in range(number_of_threads):         et = EppThread(name="{0}-Thread-{1}".format(name, i + 1), args=(number_of_requests))         threads.append(et)         et.start()      # Check if execution time is not greater than 1 minute     while threads and time.time() - startTime < 60:  # removed len() due to implicit Falsiness of empty containers in Python         time.sleep(0.5)         for thread in threads:             if not thread.isAlive():                 threads.remove(thread)                 print('Thread {0} terminated'.format(thread.getName()))      EppThread.kill = True 

Now when any of the EppThreads has a bad connection it sets the class variable to True, which makes all of the other EppThreads break the loop as well. I also added EppThread.kill = True at the end so it'll break the request loops more cleanly if you exceed 1 minute run time.

Lastly, I added the url class variable. This is because you expressed interest in running different urls simultaneously and only kill the ones that specifically have a bad connection. All you have to do at this point is subclass EppThread and overwrite kill and url.

class EppThread2(EppThread):     kill = False     url = 'https://v2.example.com/api?$awesome=True' 

Then you can instantiate EppThread2 and add it to the threads list and everything should work as you want it to.

Answers 2

You could create an event object that's shared between all your threads that share the same url. When you run into an error in the thread, set the event. Then, in your run loop check for the event. If it has happend, kill the thread by breaking the loop.

Here's a version of your example modified to use the Event.

import logging, time, threading, requests  #: Log items logging.basicConfig(format='%(asctime)s %(levelname)s : %(message)s', level=logging.INFO)  class EppThread(threading.Thread):     def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None, bad_status=None):         threading.Thread.__init__(self, group=group, target=target, name=name, verbose=verbose)         self.args = args         self.bad_status = bad_status      def run(self):         startTime = time.time()         url = self.args[0]         limit = self.args[1]          for i in range(limit):             if self.bad_status.is_set():                 # break the loop on errors in any thread.                 break             response = requests.get(url)             if response.status_code != 200:                 # Set the event when an error occurs                 self.bad_status.set()                 break                 #Exit other similar threads (with same url)             else:                 print('Thread {0} - success'.format(thread.getName()))           print('process completed')          # Send Email   number_of_threads = 5 number_of_requests = 100  urls = ['https://v1.api.com/example', 'https://v2.api.com/example']  if __name__ == '__main__':     startTime = time.time()      for url in urls:         threads = []         # Create an event for each URL         bad_status = threading.Event()         for i in range(number_of_threads):             et = EppThread(name = "{0}-Thread-{1}".format(name, i + 1), args=(url, number_of_requests), bad_status=bad_status)             threads.append(et)             et.start()      # Check if execution time is not greater than 1 minute     while len(threads) > 0 and (time.time() - startTime) < 60:         time.sleep(0.5)         for thread in threads:             if not thread.isAlive():                 threads.remove(thread)                 print('Thread {0} terminated'.format(thread.getName()))  os._exit(1) 

The threading.Event class works for both threads and processes. So, if at somepoint you wanted to switch to using Process it would "just work".

Answers 3

Import sys. Here is an example:

import sys list = [] if len(list) < 1:      sys.exit('You don't have any items in your list') 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment