#1
  1. No Profile Picture
    Registered User
    Devshed Newbie (0 - 499 posts)

    Join Date
    Apr 2005
    Posts
    8
    Rep Power
    0

    A threading challenge


    Hi, I am trying to thread a long function that will process some work and then insert data into mysql. The long funtion does not need to return any anthing and uses data from mysql as a que. Basically, at the beginning it does a pop from this que until it's empty. Some how, after about 5000 threads; it takes the thread longer to process the work that normally gets done in 1-2 seconds.
    First, I think I am using the thread inefficiently where new threads are created constantly, where as I should be reusing them. I am trying to figure out how I can create a thread pool with the queue module, but I am having a really hard time figure out. I looked at other example on the net, and other implementation requires that there be an "input" to the queue and an "output" to the queue. Some even have a callback function. All I really want is a thread pool that execute my long function until completion and then do it again and only stops when my data list from mysql is empty.

    The following code is my program where multiple threads gets instantiated and started, but slows down dramatically after 3-5 thousands such threads have been ran.

    Does anyone know how I can integrate a thread pool in here that will just execute my long function OR if anything is wrong with the semi-psuedo code below:

    PHP Code:
    class WorkerThread(Thread):
        
    def __init__(self,feedObj,statusObj,new_feed):
            
    Thread.__init__(self)
        
    def run(self):
            
    MyLongFunction()

        
    #function doesn't return anything
        
    def MyLongFunction(self):
            for 
    i in range(100):
                
    get work to do from mysql table
                process work
                update mysql


    class RunMyThread(Thread):
        
    def __init__(self):
            
    logging.basicConfig(level=logging.DEBUG,
                   
    format='%(name)-15s: %(asctime)s %(message)-8s',
                   
    datefmt='%a, %d %b %Y %H:%M:%S',
                   
    filename='MYTHREADLOG.LOG',filemode='a')    
        
    def runThread(self):
            while 
    not self.feedsObj.isEmpty():
                
    feedThread1 WorkerThread()  
                
    feedThread2 WorkerThread()          
                
    feedThread1.start()
                
    feedThread2.start()
                
    feedThread1.join()
                
    feedThread2.join() 
  2. #2
  3. No Profile Picture
    Registered User
    Devshed Newbie (0 - 499 posts)

    Join Date
    Apr 2005
    Posts
    8
    Rep Power
    0
    Maybe the problem above is too involved. I have another question though, does anyone know if once I create a thread in my main program. While the thread is executing, does my main program that I created the thread from continue executing and then exits or does it wait for the threads to finish?

    I am having an issue where I opened up a file for the logging module in the main program and then passing that reference to the child thread. For some reason, I get an stream close error on the loggin file. I suspect that the main program exited and closed the logging stream, while the child threads still requires it.

    Thanks,
  4. #3
  5. No Profile Picture
    Registered User
    Devshed Newbie (0 - 499 posts)

    Join Date
    Apr 2005
    Posts
    8
    Rep Power
    0
    Ok, if anyone cares...I am posting my finding as I am trying to figure this problem out...hopefully it may help someone else not do what I do.
    PHP Code:
    class WorkerThread(Thread):
        
    def __init__(self,feedObj,statusObj,new_feed):
            
    Thread.__init__(self)
        
    def run(self):
            while 
    not self.feedObj.isEmpty():
                    
    MyLongFunction() 
    So, I sort of figure out my problem. In order to create a thread pool or recycle your thread, you need to do a while loop from a que type object to process your function. In anycase, there need to be a stopping case for the thread. But then this is only half the solution.

    In another class that would be the parent class creating the WorkerThread. I am trying to instantiate the WorkerThread dynamically without having to create an instance of the WorkerThread object.

    Ex1. Creating an instance
    PHP Code:
                feedThread1 WorkerThread()  
                
    feedThread2 WorkerThread()          
                
    feedThread1.start()
                
    feedThread2.start()
                
    feedThread1.join()
                
    feedThread2.join() 
    This is limited by the number that you create, but has no problem where the parent thread ends earlier than the children thread. A better way is to be able to specify the number of threads to create at run-time

    Ex2. Without creating an instance

    PHP Code:
           for i in range(num_threads)
                 
    WorkerThread(feedObj,statusObj,new_feed).start() 
    However, with Ex2. , the main threads ends before the children threads ends. So you need a way to let the parent thread wait until the children thread to end. The only wait I know how is using the threading module in the parent thread, but I don't think this is the right solution.

    PHP Code:
    while(threading.Thread.activeCount()>1): 
    For some reason Ex2. runs twice as long as Ex1. where the threads have been instantiated. If anyone know of a better method of allow the parent thread to wait for the children, please let me know. And also, why Ex2. would run twice as long as Ex1., I would like to that too?
  6. #4
  7. Mini me.
    Devshed Novice (500 - 999 posts)

    Join Date
    Nov 2003
    Location
    Cambridge, UK
    Posts
    783
    Rep Power
    13
    The while loop is a very agressive wait - I guess it steals a lot of CPU time.

    How about you use the join method - something like:

    Code:
    activeT = []
    for i in range(num_threads):
                 activeT.append(WorkerThread(feedObj,statusObj,new_feed))
                 activeT[-1].start()
    
    for i in range(num_threads):
                 activeT[i].join()
    grim

    Comments on this post

    • CyBerHigh agrees
  8. #5
  9. No Profile Picture
    Registered User
    Devshed Newbie (0 - 499 posts)

    Join Date
    Apr 2005
    Posts
    8
    Rep Power
    0
    Originally Posted by Grim Archon
    The while loop is a very agressive wait - I guess it steals a lot of CPU time.

    How about you use the join method - something like:

    Code:
    activeT = []
    for i in range(num_threads):
                 activeT.append(WorkerThread(feedObj,statusObj,new_feed))
                 activeT[-1].start()
    
    for i in range(num_threads):
                 activeT[i].join()
    grim
    Thanks - that works. I was trying to do a join also but it gave me an error about the WorkerThread needs to be an instance first before I can call join. But then I didn't use it in an array as you had done and that had been my error. I think have been too narrow minded in thinking that I had to name a variable for each instances of an object and all along it can exist in an array. Thanks much--this is cool.

IMN logo majestic logo threadwatch logo seochat tools logo