Forum Discussion

JohanH's avatar
JohanH
Occasional Contributor
9 years ago
Solved

Starting threads in python!?

  Hi,   Have anyone used pythons threading.Thread and Queue in TestComplete?   I can not get it to work, test runner hangs on Thread.start()   Here is an example of what I'm trying to accomplis...
  • JohanH's avatar
    JohanH
    9 years ago

    Ok, got answers back from developers that Log and other TestComplete "global" objects is not thread safe and even they require to be executed from the main thread. Code to block missuse is scheduled for version 11.30.

     

    Well, well... I do not give up that easily. Here is a recipie (that needs to be completed) to accomplish just what I am after:

    from queue import Queue, Empty
    from threading import current_thread, Thread
    from time import sleep
    from random import randint
    
    
    def run():
        '''Test function'''
    
        def func(item, Log):
            '''Function that emulates the work that needs to be done
            for each item'''
    
            sleep(randint(1, 5))  # emulate some task
            Log.Message(item)
    
        name = current_thread().getName()
        Log.Message("{name}: Start!".format(**locals()))
    
        source = []
        for i in range(20):
            source.append("This is dummy item nbr {i}".format(**locals()))
    
        for_each(source, func)  # do the work
    
        Log.Message("{name}: Done!".format(**locals()))
    
    
    def for_each(source, func, num_worker_threads=8):
        '''This method launches num_worker_threads, worker threads that calls
        the func function for each item in source'''
    
        class ThreadDone(object):
            '''Thread termination marker class'''
            pass
    
        class LogEntry(object):
            '''Information container to pass a Log entry from a worker thread to
            the main thread'''
    
            def __init__(self, entry, *args, **kwargs):
                self.entry = entry
                self.args = args
                self.kwargs = kwargs
    
            def execute(self):
                '''Executes the Log entry, should only be called from
                the main thread'''
                try:
                    getattr(Log, self.entry)(*self.args, **self.kwargs)
                except NameError:
                    pass  # do some error handling!?
    
        def worker(handler, in_q, out_q):
            '''Internal thread worker function'''
    
            class LogWrapper(object):
                '''Wrapper that exposes the Log interface and pases the calls to
                the main thread'''
    
                def __init__(self, queue):
                    self.queue = queue
    
                def Message(self, messageText, additionalInformation='',
                            priority=300, attr=None, picture=None, folderId=-1):
                    name = current_thread().getName()
                    messageText = "{name}: {messageText}".format(**locals())
                    self.queue.put(
                        LogEntry('Message', messageText,
                                 additionalInformation, priority, attr,
                                 picture, folderId))
    
            log = LogWrapper(out_q)
            try:
                while True:
                    item = in_q.get(block=False)
                    try:
                        handler(item, log)  # call the external worker
                    except:
                        # make sure to swallow any exceptions
                        # as to not to terminate threads
                        pass  # exchange for addequate error handling
                    finally:
                        in_q.task_done()
            except Empty:  # nothing more to do, terminate
                out_q.put(ThreadDone())
    
        def start_workers(worker, num_worker_threads, in_queue, out_queue, func):
            worker_args = dict(handler=func, in_q=in_queue, out_q=out_queue)
            '''Start num_worker_threads worker threads and pass the func and
            in and out queues to the thread worker function'''
            
            for i in range(max(4, min(num_worker_threads, 16))):
                t = Thread(target=worker, kwargs=worker_args)
                t.daemon = True  # set as backgound thread
                t.start()
    
        def join_workers(num_worker_threads, in_queue):
            '''Wait for all threads to complete, while handling all LogEntries'''
            
            while num_worker_threads:
                item = in_queue.get()
                if isinstance(item, ThreadDone):
                    num_worker_threads -= 1  # track each terminated thread
                elif isinstance(item, LogEntry):
                    item.execute()  # write to the Log
                in_queue.task_done()
    
        inQ = Queue()
        outQ = Queue()
        for item in source:
            outQ.put(item)
    
        start_workers(worker, num_worker_threads, outQ, inQ, func)
        join_workers(num_worker_threads, inQ)

    Hope that you find this usefull...

     

    BR, Johan