Forum Discussion

JohanH's avatar
JohanH
Occasional Contributor
9 years ago
Solved

Starting threads in python!?

  Hi,

 

Have anyone used pythons threading.Thread and Queue in TestComplete?

 

I can not get it to work, test runner hangs on Thread.start()

 

Here is an example of what I'm trying to accomplish

def ForEachInParallell(items, func, args, num_worker_threads=8):

  def worker(args):
    while True:
      i = q.get()
      func(i)
      q.task_done()

  q = Queue()

  for n in range(min(max(4, num_worker_threads), 16)):
    t = Thread(target=worker, kwargs=args)
    t.daemon = True
    t.start()  # here the debugger hangs

  for item in items:
    q.put(item)

  q.join()

After it hangs I have to kill TestComplete and restart...

  BR, Johan

  • Ok, got answers back from developers that Log and other TestComplete "global" objects is not thread safe and even they require to be executed from the main thread. Code to block missuse is scheduled for version 11.30.

     

    Well, well... I do not give up that easily. Here is a recipie (that needs to be completed) to accomplish just what I am after:

    from queue import Queue, Empty
    from threading import current_thread, Thread
    from time import sleep
    from random import randint
    
    
    def run():
        '''Test function'''
    
        def func(item, Log):
            '''Function that emulates the work that needs to be done
            for each item'''
    
            sleep(randint(1, 5))  # emulate some task
            Log.Message(item)
    
        name = current_thread().getName()
        Log.Message("{name}: Start!".format(**locals()))
    
        source = []
        for i in range(20):
            source.append("This is dummy item nbr {i}".format(**locals()))
    
        for_each(source, func)  # do the work
    
        Log.Message("{name}: Done!".format(**locals()))
    
    
    def for_each(source, func, num_worker_threads=8):
        '''This method launches num_worker_threads, worker threads that calls
        the func function for each item in source'''
    
        class ThreadDone(object):
            '''Thread termination marker class'''
            pass
    
        class LogEntry(object):
            '''Information container to pass a Log entry from a worker thread to
            the main thread'''
    
            def __init__(self, entry, *args, **kwargs):
                self.entry = entry
                self.args = args
                self.kwargs = kwargs
    
            def execute(self):
                '''Executes the Log entry, should only be called from
                the main thread'''
                try:
                    getattr(Log, self.entry)(*self.args, **self.kwargs)
                except NameError:
                    pass  # do some error handling!?
    
        def worker(handler, in_q, out_q):
            '''Internal thread worker function'''
    
            class LogWrapper(object):
                '''Wrapper that exposes the Log interface and pases the calls to
                the main thread'''
    
                def __init__(self, queue):
                    self.queue = queue
    
                def Message(self, messageText, additionalInformation='',
                            priority=300, attr=None, picture=None, folderId=-1):
                    name = current_thread().getName()
                    messageText = "{name}: {messageText}".format(**locals())
                    self.queue.put(
                        LogEntry('Message', messageText,
                                 additionalInformation, priority, attr,
                                 picture, folderId))
    
            log = LogWrapper(out_q)
            try:
                while True:
                    item = in_q.get(block=False)
                    try:
                        handler(item, log)  # call the external worker
                    except:
                        # make sure to swallow any exceptions
                        # as to not to terminate threads
                        pass  # exchange for addequate error handling
                    finally:
                        in_q.task_done()
            except Empty:  # nothing more to do, terminate
                out_q.put(ThreadDone())
    
        def start_workers(worker, num_worker_threads, in_queue, out_queue, func):
            worker_args = dict(handler=func, in_q=in_queue, out_q=out_queue)
            '''Start num_worker_threads worker threads and pass the func and
            in and out queues to the thread worker function'''
            
            for i in range(max(4, min(num_worker_threads, 16))):
                t = Thread(target=worker, kwargs=worker_args)
                t.daemon = True  # set as backgound thread
                t.start()
    
        def join_workers(num_worker_threads, in_queue):
            '''Wait for all threads to complete, while handling all LogEntries'''
            
            while num_worker_threads:
                item = in_queue.get()
                if isinstance(item, ThreadDone):
                    num_worker_threads -= 1  # track each terminated thread
                elif isinstance(item, LogEntry):
                    item.execute()  # write to the Log
                in_queue.task_done()
    
        inQ = Queue()
        outQ = Queue()
        for item in source:
            outQ.put(item)
    
        start_workers(worker, num_worker_threads, outQ, inQ, func)
        join_workers(num_worker_threads, inQ)

    Hope that you find this usefull...

     

    BR, Johan

7 Replies

  • JohanH's avatar
    JohanH
    Occasional Contributor

    Ok, some more digging.
    I created a small test script and ran it in python (using Eclipse and the prompt).

    from queue import Queue
    from threading import current_thread, Thread
    from time import sleep
    from random import randint
    
    
    def run():
        source = []
        for i in range(20):
            source.append("This is dummy item nbr {i}".format(**locals()))
    
        run_test(source)
        print("Done!")
    
    
    def run_test(source, num_worker_threads=8):
        def worker():
            name = current_thread().getName()
            while True:
                item = q.get()
                sleep(randint(1, 5))  # emulate some task
                print("{name}: {item}".format(**locals()))
                q.task_done()
    
        q = Queue()
        for item in source:
            q.put(item)
    
        for i in range(max(4, min(num_worker_threads, 16))):
            t = Thread(target=worker)
            t.daemon = True
            t.start()  # here it hangs on the second pass
    
        q.join()
    
    
    if __name__ == '__main__':
        run()

    This yields the following output:
    Thread-7: This is dummy item nbr 6
    Thread-3: This is dummy item nbr 2
    Thread-8: This is dummy item nbr 7
    Thread-1: This is dummy item nbr 0
    Thread-6: This is dummy item nbr 5
    Thread-2: This is dummy item nbr 1
    Thread-7: This is dummy item nbr 8
    Thread-8: This is dummy item nbr 10
    Thread-1: This is dummy item nbr 11
    Thread-4: This is dummy item nbr 3
    Thread-5: This is dummy item nbr 4
    Thread-8: This is dummy item nbr 15
    Thread-6: This is dummy item nbr 12
    Thread-3: This is dummy item nbr 9
    Thread-2: This is dummy item nbr 13
    Thread-8: This is dummy item nbr 19
    Thread-5: This is dummy item nbr 18
    Thread-4: This is dummy item nbr 17
    Thread-7: This is dummy item nbr 14
    Thread-1: This is dummy item nbr 16
    Done!

     

    Executing the run() function from TestComplete, hangs TestComplete. Pausing or Stopping the execution does not respond.

    Stepping the script in the debugger it hangs on the second pass on row 31, 't.start()'.

     

    I will file this with support as Tanya suggested.

     

      BR, Johan

    • JohanH's avatar
      JohanH
      Occasional Contributor

      Ok, got answers back from developers that Log and other TestComplete "global" objects is not thread safe and even they require to be executed from the main thread. Code to block missuse is scheduled for version 11.30.

       

      Well, well... I do not give up that easily. Here is a recipie (that needs to be completed) to accomplish just what I am after:

      from queue import Queue, Empty
      from threading import current_thread, Thread
      from time import sleep
      from random import randint
      
      
      def run():
          '''Test function'''
      
          def func(item, Log):
              '''Function that emulates the work that needs to be done
              for each item'''
      
              sleep(randint(1, 5))  # emulate some task
              Log.Message(item)
      
          name = current_thread().getName()
          Log.Message("{name}: Start!".format(**locals()))
      
          source = []
          for i in range(20):
              source.append("This is dummy item nbr {i}".format(**locals()))
      
          for_each(source, func)  # do the work
      
          Log.Message("{name}: Done!".format(**locals()))
      
      
      def for_each(source, func, num_worker_threads=8):
          '''This method launches num_worker_threads, worker threads that calls
          the func function for each item in source'''
      
          class ThreadDone(object):
              '''Thread termination marker class'''
              pass
      
          class LogEntry(object):
              '''Information container to pass a Log entry from a worker thread to
              the main thread'''
      
              def __init__(self, entry, *args, **kwargs):
                  self.entry = entry
                  self.args = args
                  self.kwargs = kwargs
      
              def execute(self):
                  '''Executes the Log entry, should only be called from
                  the main thread'''
                  try:
                      getattr(Log, self.entry)(*self.args, **self.kwargs)
                  except NameError:
                      pass  # do some error handling!?
      
          def worker(handler, in_q, out_q):
              '''Internal thread worker function'''
      
              class LogWrapper(object):
                  '''Wrapper that exposes the Log interface and pases the calls to
                  the main thread'''
      
                  def __init__(self, queue):
                      self.queue = queue
      
                  def Message(self, messageText, additionalInformation='',
                              priority=300, attr=None, picture=None, folderId=-1):
                      name = current_thread().getName()
                      messageText = "{name}: {messageText}".format(**locals())
                      self.queue.put(
                          LogEntry('Message', messageText,
                                   additionalInformation, priority, attr,
                                   picture, folderId))
      
              log = LogWrapper(out_q)
              try:
                  while True:
                      item = in_q.get(block=False)
                      try:
                          handler(item, log)  # call the external worker
                      except:
                          # make sure to swallow any exceptions
                          # as to not to terminate threads
                          pass  # exchange for addequate error handling
                      finally:
                          in_q.task_done()
              except Empty:  # nothing more to do, terminate
                  out_q.put(ThreadDone())
      
          def start_workers(worker, num_worker_threads, in_queue, out_queue, func):
              worker_args = dict(handler=func, in_q=in_queue, out_q=out_queue)
              '''Start num_worker_threads worker threads and pass the func and
              in and out queues to the thread worker function'''
              
              for i in range(max(4, min(num_worker_threads, 16))):
                  t = Thread(target=worker, kwargs=worker_args)
                  t.daemon = True  # set as backgound thread
                  t.start()
      
          def join_workers(num_worker_threads, in_queue):
              '''Wait for all threads to complete, while handling all LogEntries'''
              
              while num_worker_threads:
                  item = in_queue.get()
                  if isinstance(item, ThreadDone):
                      num_worker_threads -= 1  # track each terminated thread
                  elif isinstance(item, LogEntry):
                      item.execute()  # write to the Log
                  in_queue.task_done()
      
          inQ = Queue()
          outQ = Queue()
          for item in source:
              outQ.put(item)
      
          start_workers(worker, num_worker_threads, outQ, inQ, func)
          join_workers(num_worker_threads, inQ)

      Hope that you find this usefull...

       

      BR, Johan

      • TanyaYatskovska's avatar
        TanyaYatskovska
        SmartBear Alumni (Retired)

        Thanks for your efforts, Johanh :)

        I hope your solution will help community members!