In a previous article we decided to use Python's multiprocessing
module to leverage the power of multi-core machines. Our use case is all about web applications served by CherryPy and so multi-processing isn't the only interesting part: our application will be multi-threaded as well. IN this article we present a first implementation of a multi-threaded application that hands off the heavy lifting to a pool of subprocesses.
The design
The design is centered on the following concepts:
- The main process consists of multiple threads,
- The work is done by a pool of subprocesses,
- Transferring data to and from the subprocesses is left to the pool manager
Schematically we can visualize it as follows:
Sample code
We start of by including the necessary components:
from multiprocessing import Pool,current_process
from threading import current_thread,Thread
from queue import Queue
import sqlite3 as dbapi
from time import time,sleep
from random import random
The most important ones we need are the
Pool
class from the
multiprocessing
module and the
Thread
class from the
threading
module. We also import
queue.Queue
to act as a task list for the threads. Note that the
multiprocessing
module has its own
Queue
implementation that is not only thread safe but can be used for inter process communication as well but we won't be using that one here but rely on a simpler paradigm as we will see.
The next step is to define a function that may be called by the threads.
def execute(sql,params=tuple()):
global pool
return pool.apply(task,(sql,params))
It takes a string argument with SQL code and an optional tuple of parameters just like the
Cursor.execute()
method in the
sqlite3
module. It merely passes on these arguments to the
apply()
method of the
multiprocessing.Pool
instance that is referred to by the global
pool
variable. Together with SQL string and parameters a reference to the
task()
function is passed, which is defined below:
def task(sql,params):
global connection
c=connection.cursor()
c.execute(sql,params)
l=c.fetchall()
return l
This function just executes the SQL and returns the results. It assumes the global variable
connection
contains a valid
sqlite3.Connection
instance, something that is taken care of by the
connect
function that will be passed as an initializer to any new subprocess:
def connect(*args):
global connection
connection = dbapi.connect(*args)
Before we initialize our pool of subprocess let's have a look at the core function of any thread we start in our main process:
def threadwork(initializer=None,kwargs={}):
global tasks
if not ( initializer is None) :
initializer(**kwargs)
while(True):
(sql,params) = tasks.get()
if sql=='quit': break
r=execute(sql,params)
It calls an optional thread initializer first and then enters a semi infinite loop in line 5. This loops starts by fetching an item from the global
tasks
queue. Each item is a tuple consisting of a string and another tuple with parameters. If the string is equal to
quit
we do terminate the loop otherwise we simple pass on the SQL statement and any parameters to the
execute
function we encountered earlier, which will take care of passing it to the pool of subprocesses. We store the result of this query in the r variable even though we do nothing with it in this example.
For this simple example we also need an database that holds a table with some data we can play with. We initialize this table with rows containing random numbers. When we benchmark the code we can make this as large as we wish to get meaningful results; after all, our queries should take some time to complete otherwise there would be no need to use more processes.
def initdb(db,rows=10000):
c=dbapi.connect(db)
cr=c.cursor()
cr.execute('drop table if exists data');
cr.execute('create table data (a,b)')
for i in range(rows):
cr.execute('insert into data values(?,?)',(i,random()))
c.commit()
c.close()
The final pieces of code tie everything together:
if __name__ == '__main__':
global pool
global tasks
tasks=Queue()
db='/tmp/test.db'
initdb(db,100000)
nthreads=10
for i in range(100):
tasks.put(('SELECT count(*) FROM data WHERE b>?',(random(),)))
for i in range(nthreads):
tasks.put(('quit',tuple()))
pool=Pool(2,connect,(db,))
threads=[]
for t in range(nthreads):
th=Thread(target=threadwork,kwargs={'initializer':thread_initializer})
threads.append(th)
th.start()
for th in threads:
th.join()
After creating a queue in line 5 and initializing the database in line 8, the next step is to fill a queue with a fair number of tasks (line 12). The final tasks we add to the queue signal a thread to stop (line 14). We need as many of them as there will be threads.
In line 17 we initialize our pool of processes. Just two in this example, but in general the number should be equal to the number of cpu's in the system. If you omit this argument the number will default to exactly that. Next we create (line 21) and start (line 23) the number of threads we want. The target
argument points to the function we defined earlier that does all the work, i.e. pops tasks from the queue and passes these on to the pool of processes. The final lines simply wait till all threads are finished.
What's next?
In a following article we will benchmark and analyze this code and see how we can improve on this design.