JohanH
9 years agoOccasional Contributor
Starting threads in python!?
Hi,
Have anyone used pythons threading.Thread and Queue in TestComplete?
I can not get it to work, test runner hangs on Thread.start()
Here is an example of what I'm trying to accomplish
def ForEachInParallell(items, func, args, num_worker_threads=8): def worker(args): while True: i = q.get() func(i) q.task_done() q = Queue() for n in range(min(max(4, num_worker_threads), 16)): t = Thread(target=worker, kwargs=args) t.daemon = True t.start() # here the debugger hangs for item in items: q.put(item) q.join()
After it hangs I have to kill TestComplete and restart...
BR, Johan
Ok, got answers back from developers that Log and other TestComplete "global" objects is not thread safe and even they require to be executed from the main thread. Code to block missuse is scheduled for version 11.30.
Well, well... I do not give up that easily. Here is a recipie (that needs to be completed) to accomplish just what I am after:
from queue import Queue, Empty from threading import current_thread, Thread from time import sleep from random import randint def run(): '''Test function''' def func(item, Log): '''Function that emulates the work that needs to be done for each item''' sleep(randint(1, 5)) # emulate some task Log.Message(item) name = current_thread().getName() Log.Message("{name}: Start!".format(**locals())) source = [] for i in range(20): source.append("This is dummy item nbr {i}".format(**locals())) for_each(source, func) # do the work Log.Message("{name}: Done!".format(**locals())) def for_each(source, func, num_worker_threads=8): '''This method launches num_worker_threads, worker threads that calls the func function for each item in source''' class ThreadDone(object): '''Thread termination marker class''' pass class LogEntry(object): '''Information container to pass a Log entry from a worker thread to the main thread''' def __init__(self, entry, *args, **kwargs): self.entry = entry self.args = args self.kwargs = kwargs def execute(self): '''Executes the Log entry, should only be called from the main thread''' try: getattr(Log, self.entry)(*self.args, **self.kwargs) except NameError: pass # do some error handling!? def worker(handler, in_q, out_q): '''Internal thread worker function''' class LogWrapper(object): '''Wrapper that exposes the Log interface and pases the calls to the main thread''' def __init__(self, queue): self.queue = queue def Message(self, messageText, additionalInformation='', priority=300, attr=None, picture=None, folderId=-1): name = current_thread().getName() messageText = "{name}: {messageText}".format(**locals()) self.queue.put( LogEntry('Message', messageText, additionalInformation, priority, attr, picture, folderId)) log = LogWrapper(out_q) try: while True: item = in_q.get(block=False) try: handler(item, log) # call the external worker except: # make sure to swallow any exceptions # as to not to terminate threads pass # exchange for addequate error handling finally: in_q.task_done() except Empty: # nothing more to do, terminate out_q.put(ThreadDone()) def start_workers(worker, num_worker_threads, in_queue, out_queue, func): worker_args = dict(handler=func, in_q=in_queue, out_q=out_queue) '''Start num_worker_threads worker threads and pass the func and in and out queues to the thread worker function''' for i in range(max(4, min(num_worker_threads, 16))): t = Thread(target=worker, kwargs=worker_args) t.daemon = True # set as backgound thread t.start() def join_workers(num_worker_threads, in_queue): '''Wait for all threads to complete, while handling all LogEntries''' while num_worker_threads: item = in_queue.get() if isinstance(item, ThreadDone): num_worker_threads -= 1 # track each terminated thread elif isinstance(item, LogEntry): item.execute() # write to the Log in_queue.task_done() inQ = Queue() outQ = Queue() for item in source: outQ.put(item) start_workers(worker, num_worker_threads, outQ, inQ, func) join_workers(num_worker_threads, inQ)
Hope that you find this usefull...
BR, Johan