Showing posts with label threadsafe. Show all posts
Showing posts with label threadsafe. Show all posts

More SQLite multithreading woes

In this article we revisit our thread safe persistent dictionary and encounter some irritating performance issues, both SQLite related and caused by Python itself.

SQLite.OperationalError, database is locked

When testing the persistentdict module with many simultaneous threads (more than twenty) I noticed a great number of errors: The many connections open to the same database caused SQLite to raise a lot of SQLite.OperationalError, database is locked exceptions. Getting decent performance with SQLite is by no means easy and because SQLite only locks complete database files and not just tables or rows there is a big chance that threads accessing the same database have to wait to get their turn.

sqlite3.connect, the check_same_thread parameter

Python 3.2 comes with a sqlite3 module that implements (but scarcely documents) a check_same_thread parameter that can be set to false to allow threads to use the same Connection object simultaneously. This is nice since this means we no longer have to implement all sorts of code to provide each thread with its own connection.

But we still have to regulate access to this connection because otherwise a commit in one thread may invalidate a longer running execute in another thread, leaving us with errors like sqlite3.InterfaceError: Cursor needed to be reset because of commit/rollback and can no longer be fetched from.

Python thread switching is really slow

Switching between threads, especially on multi core machines, has never been Python's strongest feature and making our persistent dict thread safe with a lock might hurt performance a lot, depending on what is going on in the threads itself (if there is a lot of I/O going on the impact might not be that big).

A new implementation

The code below shows the new implementation, with a single connection that may be shared by multiple threads. It works but it is really slow: with 40 threads I get just 10 to 20 dictionary assignments (d[1]=2) per second on my dual core Atom. That isn't the fastest machine around but those figures are ridiculously low. We will have to rethink our approach if we want to use SQLite in a multithreaded environment if we need any kind of performance!

"""
 persistentdict module $Revision: 98 $ $Date: 2011-07-23 14:01:04 +0200 (za, 23 jul 2011) $

 (c) 2011 Michel J. Anders

 This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see .

"""

from collections import UserDict
import sqlite3 as sqlite
from pickle import dumps,loads
import threading 

class PersistentDict(UserDict):

 """
 PersistentDict  a MutableMapping that provides a thread safe,
     SQLite backed persistent storage.
 
 db  name of the SQLite database file, e.g.
   '/tmp/persists.db', default to 'persistentdict.db'
 table name of the table that holds the persistent data,
   usefull if more than one persistent dictionary is
   needed. defaults to 'dict'
 
 PersistentDict tries to mimic the behaviour of the built-in
 dict as closely as possible. This means that keys should be hashable.
 
 Usage example:
 
 >>> from persistentdict import PersistentDict
 >>> a=PersistentDict()
 >>> a['number four'] = 4
 
 ... shutdown and then restart applicaion ...
 
 >>> from persistentdict import PersistentDict
 >>> a=PersistentDict()
 >>> print(a['number four'])
 4
 
 Tested with Python 3.2 but should work with 3.x and 2.7.x as well.
 
 run module directly to run test suite:
 
 > python PersistentDict.py
 
 """
 
 def __init__(self, dict=None, **kwargs):
  
  self.db    = kwargs.pop('db','persistentdict.db')
  self.table = kwargs.pop('table','dict')
  #self.local = threading.local()
  self.conn = None
  self.lock = threading.Lock()
  
  with self.lock:
   with self.connect() as conn:
    conn.execute('create table if not exists %s (hash unique not null,key,value);'%self.table)
    
  if dict is not None:
   self.update(dict)
  if len(kwargs):
   self.update(kwargs)
 
 def connect(self):
  if self.conn is None:
   self.conn = sqlite.connect(self.db,check_same_thread=False)
  return self.conn
   
 def __len__(self):
  with self.lock:
   cursor = self.connect().cursor()
   cursor.execute('select count(*) from %s'%self.table)
   return cursor.fetchone()[0]
 
 def __getitem__(self, key):
  with self.lock:
   cursor = self.connect().cursor()
   h=hash(key)
   cursor.execute('select value from %s where hash = ?'%self.table,(h,))
   try:
    return loads(cursor.fetchone()[0])
   except TypeError:
    if hasattr(self.__class__, "__missing__"):
     return self.__class__.__missing__(self, key)
   raise KeyError(key)
   
 def __setitem__(self, key, item):
  h=hash(key)
  with self.lock:
   with self.connect() as conn:
    conn.execute('insert or replace into %s values(?,?,?)'%self.table,(h,dumps(key),dumps(item)))

 def __delitem__(self, key):
  h=hash(key)
  with self.lock:
   with self.connect() as conn:
    conn.execute('delete from %s where hash = ?'%self.table,(h,))

 def __iter__(self):
  with self.lock:
   cursor = self.connect().cursor()
   cursor.execute('select key from %s'%self.table)
   rows = list(cursor.fetchall())
  for row in rows:
   yield loads(row[0])

 def __contains__(self, key):
  h=hash(key)
  with self.lock:
   cursor = self.connect().cursor()
   cursor.execute('select value from %s where hash = ?'%self.table,(h,))
   return not ( None is cursor.fetchone())

 # not implemented def __repr__(self): return repr(self.data)
 
 def copy(self):
  c = self.__class__(db=self.db)
  for key,item in self.items():
   c[key]=item
  return c

A SQLite multiprocessing proxy, part 2

In a previous article we decided to use Python's multiprocessing module to leverage the power of multi-core machines. Our use case is all about web applications served by CherryPy and so multi-processing isn't the only interesting part: our application will be multi-threaded as well. IN this article we present a first implementation of a multi-threaded application that hands off the heavy lifting to a pool of subprocesses.

The design

The design is centered on the following concepts:

  • The main process consists of multiple threads,
  • The work is done by a pool of subprocesses,
  • Transferring data to and from the subprocesses is left to the pool manager
Schematically we can visualize it as follows:

Sample code

We start of by including the necessary components:

from multiprocessing import Pool,current_process
from threading import current_thread,Thread
from queue import Queue
import sqlite3 as dbapi
from time import time,sleep
from random import random
The most important ones we need are the Pool class from the multiprocessing module and the Thread class from the threading module. We also import queue.Queue to act as a task list for the threads. Note that the multiprocessing module has its own Queue implementation that is not only thread safe but can be used for inter process communication as well but we won't be using that one here but rely on a simpler paradigm as we will see.

The next step is to define a function that may be called by the threads.

def execute(sql,params=tuple()):
 global pool
 return pool.apply(task,(sql,params))
It takes a string argument with SQL code and an optional tuple of parameters just like the Cursor.execute() method in the sqlite3 module. It merely passes on these arguments to the apply() method of the multiprocessing.Pool instance that is referred to by the global pool variable. Together with SQL string and parameters a reference to the task() function is passed, which is defined below:
def task(sql,params):
 global connection
 c=connection.cursor()
 c.execute(sql,params)
 l=c.fetchall()
 return l
This function just executes the SQL and returns the results. It assumes the global variable connection contains a valid sqlite3.Connection instance, something that is taken care of by the connect function that will be passed as an initializer to any new subprocess:
def connect(*args):
 global connection
 connection = dbapi.connect(*args)

Before we initialize our pool of subprocess let's have a look at the core function of any thread we start in our main process:

def threadwork(initializer=None,kwargs={}):
 global tasks
 if not ( initializer is None) :
  initializer(**kwargs)
 while(True):
  (sql,params) = tasks.get()
  if sql=='quit': break
  r=execute(sql,params)
It calls an optional thread initializer first and then enters a semi infinite loop in line 5. This loops starts by fetching an item from the global tasks queue. Each item is a tuple consisting of a string and another tuple with parameters. If the string is equal to quit we do terminate the loop otherwise we simple pass on the SQL statement and any parameters to the execute function we encountered earlier, which will take care of passing it to the pool of subprocesses. We store the result of this query in the r variable even though we do nothing with it in this example.

For this simple example we also need an database that holds a table with some data we can play with. We initialize this table with rows containing random numbers. When we benchmark the code we can make this as large as we wish to get meaningful results; after all, our queries should take some time to complete otherwise there would be no need to use more processes.

def initdb(db,rows=10000):
 c=dbapi.connect(db)
 cr=c.cursor()
 cr.execute('drop table if exists data');
 cr.execute('create table data (a,b)')
 for i in range(rows):
  cr.execute('insert into data values(?,?)',(i,random()))
 c.commit()
 c.close()

The final pieces of code tie everything together:

if __name__ == '__main__':
 global pool
 global tasks
 
 tasks=Queue()
 db='/tmp/test.db'
 
 initdb(db,100000)
 
 nthreads=10
 
 for i in range(100):
  tasks.put(('SELECT count(*) FROM data WHERE b>?',(random(),)))
 for i in range(nthreads):
  tasks.put(('quit',tuple()))
 
 pool=Pool(2,connect,(db,))
 
 threads=[]
 for t in range(nthreads):
  th=Thread(target=threadwork,kwargs={'initializer':thread_initializer})
  threads.append(th)
  th.start()
 for th in threads:
  th.join()
After creating a queue in line 5 and initializing the database in line 8, the next step is to fill a queue with a fair number of tasks (line 12). The final tasks we add to the queue signal a thread to stop (line 14). We need as many of them as there will be threads.

In line 17 we initialize our pool of processes. Just two in this example, but in general the number should be equal to the number of cpu's in the system. If you omit this argument the number will default to exactly that. Next we create (line 21) and start (line 23) the number of threads we want. The target argument points to the function we defined earlier that does all the work, i.e. pops tasks from the queue and passes these on to the pool of processes. The final lines simply wait till all threads are finished.

What's next?

In a following article we will benchmark and analyze this code and see how we can improve on this design.

A SQLite multiprocessing proxy

This is the first article in a series on improving the performance of Python web applications by leveraging the possibilities of the multiprocessing module. We'll focus on CherryPy and SQLite but the conclusions should be general enough for any Python based platform

Use case

Due to well known restrictions in the most common Python implementation, multithreading solutions will probably not help to solve performance issues (with the possible exception of serving slow network connections). The multiprocessing module offers an API similar to the threading module and might be an alternative when we want to divide the workload on a multicore machine.

The use case we're interested in is a CherryPy server that serves many requests, backed by a SQLite database. CherryPy is multithreaded by design and this approach is sensible as a web server may spend more time waiting for data to be transmitted over relatively slow network connections than actually doing work.

CherryPy however is also an excellent framework to host web applications and many web applications rely on some sort of database back-end. SQLite is a good choice for such a back-end as it comes bundled with Python (reducing the number of external dependencies), is easy to use and performs well enough. With some tricks it will even play nice in a multithreaded environment.

A disadvantage of using SQLite is that we do not have a separate database server: the SQLite engine is part of the same process that runs the Python interpreter. This means that it has the same handicap as any multithreaded application on CPython (the most common implementation of Python) and will not benefit from any extra cores or processors available on the server.

Now we could switch to MySQL or any other stand-alone database back-end but this would add quite an amount to the maintenance burden of our web application. Wouldn't it be nice if we could devise a way to use SQLite together with the multiprocessing module to have the best of both worlds: the ease of use of SQLite and the performance benefits of a stand-alone database server?

In this series of articles I will explore the possibilities and hopefully will come up with a solution that will provide:

  • a dbapi proxy (we'll use sqlite3 module but it should be general enough for any dbapi compliant database)
  • that will use the multiprocessing module to increase performance and
  • can be used from a multithreaded environment.
It would be nice if the API closely resembles the dbapi (but that is not an absolute requirement).

In the next article in this series I will explore the options to make threads and processes play nice, focusing on inter process communication.

A SQLite thread safe password store, revisited

In a previous article I showed how to implement a thread safe persistent password store that was based on SQLite. In this article a reimplementation of that module is presented base on the persistentdict module.

A SQLite thread safe password store

As you can see in the code presented below, we can put our PersistentDict class developed earlier to good use. Because we use two instances of PersistentDict (lines 45, 46) to store the salt and the hashed passwords instead of interacting with a SQLite database ourselves, the code is much cleaner and therefore easier to maintain.

'''
 dbpassword.py Copyright 2011, Michel J. Anders

 $Revision: 70 $ $Date: 2011-06-10 16:34:28 +0200 (vr, 10 jun 2011) $
 
 This program is free software: you can redistribute it
 and/or modify it under the terms of the GNU General Public
 License as published by the Free Software Foundation,
 either version 3 of the License, or (at your option) any
 later version.

 This program is distributed in the hope that it will be 
 useful, but WITHOUT ANY WARRANTY; without even the implied
 warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
 PURPOSE. See the GNU General Public License for more
 details.

 You should have received a copy of the GNU General Public
 License along with this program.  If not, see 
 www.gnu.org/licenses.
'''

import hashlib
from random import SystemRandom as sr
from persistentdict import PersistentDict

class dbpassword:

 @staticmethod
 def hashpassword(name,salt,plaintextpassword,n=10):
  if n<1 : raise ValueError("n < 1")
  d = hashlib.new(name,(salt+plaintextpassword).encode()).digest()
  while n:
   n -= 1
   d = hashlib.new(name,d).digest()
  return hashlib.new(name,d).hexdigest()

 @staticmethod
 def getsalt(randombits=64):
  if randombits<16 : raise ValueError("randombits < 16")
  return "%016x"%sr().getrandbits(randombits)

 def __init__(self,db='password.db',
    secure_hash='sha256',iterations=1000,saltbits=64):
  self.saltdict = PersistentDict(db=db,table='salt')
  self.pwdict = PersistentDict(db=db,table='password')
  self.secure_hash = secure_hash
  self.iterations = iterations
  self.saltbits = 64
  
 def update(self,user,plaintextpassword):
  salt=dbpassword.getsalt(self.saltbits)
  self.saltdict[user]=salt
  self.pwdict[user]=dbpassword.hashpassword(
     self.secure_hash,salt,plaintextpassword,
     self.iterations)

 def check(self,user,plaintextpassword):
  salt=self.saltdict[user]
  return self.pwdict[user]==dbpassword.hashpassword(
   self.secure_hash,salt,plaintextpassword,
   self.iterations)

A Python module providing thread safe SQLite backed persistent storage

In a previous article I wrote about some research I did on Python modules that could provide me with a persistent storage solution for dictionaries. I didn't quite find what I needed especially as none of the modules provided a thread safe solution. In the end I decided to write my own.

Thread safe, SQLite backed persistent storage

The module I wrote, persistentdict, is is freely available on my website along with some notes and examples. It also has a fairly extensive test suite. It provides a single class PersistentDict that behaves almost exactly like a native Python dict but stores its keys and values in a SQLite table instead of keeping it in memory.

A SQLite thread safe password store

Prompted by one of the reviewers of my upcoming book I decided I needed a simple, thread safe, and reasonably secure password store backed by SQLite.

The design criteria were straight forward and based in part on Storing passwords - done right! and the practical recommendations onPythonSecurity.org:

  • based on SQLite
  • allow for a reasonable amount of threads (its intended use is within a CherryPy application)
  • able to use a salt with a configurable number of random bits
  • able to apply key stretching with a configurable number of iterations
  • use any secure hash algorithm from Python's hashlib module
A few simple test show that the recommended defaults, i.e. 64 bits of randomness in the salt and a 1000 iterations on the hash will allow for a password in check in well under a second even on my humble netbook with an atom processor. Obviously the hashing algorithms in Python's underlying OpenSSL library are very efficient. But we do have to choose some default that strikes a reasonable balance between providing a single user that tries to log in a good response and slowing down a brute force attack. For now we stick with 1000 iterations as the default but feel free to use 10000 or even more.

Example

from dbpassword import dbpassword

dbpw = dbpassword('/var/password.db')

# later, from any thread

dbpw.update(user,plaintextpassword) # update or set a new password


if dbpw.check(user,plaintextpassword) :
     ... do stuff ...
else:
     ... warn off user ...

The dbpassword module

Warning! I am not a cryptographer so I cannot guarantee the following code is safe enough for your needs.


'''
    dbpassword.py Copyright 2011, Michel J. Anders

    This program is free software: you can redistribute it
    and/or modify it under the terms of the GNU General Public
    License as published by the Free Software Foundation,
    either version 3 of the License, or (at your option) any
    later version.

    This program is distributed in the hope that it will be 
    useful, but WITHOUT ANY WARRANTY; without even the implied
    warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
    PURPOSE. See the GNU General Public License for more
    details.

    You should have received a copy of the GNU General Public
    License along with this program.  If not, see 
    .
'''
import sqlite3
import hashlib
from random import SystemRandom as sr
import threading

class dbpassword:

    @staticmethod
    def hashpassword(name,salt,plaintextpassword,n=10):
        if n<1 : raise ValueError("n < 1")
        d = hashlib.new(name,(salt+plaintextpassword).encode()).digest()
        while n:
            n -= 1
            d = hashlib.new(name,d).digest()
        return hashlib.new(name,d).hexdigest()

    @staticmethod
    def getsalt(randombits=64):
        if randombits<16 : raise ValueError("randombits < 16")
        return "%016x"%sr().getrandbits(randombits)

    def __connect(self):
        if not hasattr(self.local,'con') or self.local.con is None:
            self.local.con = sqlite3.connect(self.db)
            self.local.con.create_function('crypt',2,
                lambda s,p:dbpassword.hashpassword(
                  self.secure_hash,s,p,self.iterations))
        return self.local.con
    
    def __init__(self,db,
                   secure_hash='sha256',iterations=1000,saltbits=64):
        self.db = db
        self.local = threading.local()
        self.secure_hash = secure_hash
        self.iterations = iterations
        self.saltbits = 64
        with self.__connect() as con:
            cursor=con.cursor()
            sql='create table if not exists pwdb (user unique, salt, password)'
            cursor.execute(sql)    
    
    def update(self,user,plaintextpassword):
        with self.__connect() as con:
            cursor=con.cursor()
            sql1='insert or replace into pwdb (user,salt) values(?,?)'
            sql2='update pwdb set password=? where user = ?'
            salt=dbpassword.getsalt(self.saltbits)
            cursor.execute(sql1,(user,salt))
            cursor.execute(sql2,(dbpassword.hashpassword(
                    self.secure_hash,salt,plaintextpassword,
                    self.iterations),user))

    def check(self,user,plaintextpassword):
        cursor=self.__connect().cursor()
        sql='select user from pwdb where user = ? and crypt(salt,?) = password'
        cursor.execute(sql,(user,plaintextpassword))
        found=list(cursor) # can only create a list form this iterator once!
        return len(found)==1

Python thread safe cache class

Every so often the need arises to create a thread safe cache solution. This is my stab at a simple yet fully functional implementation that maintains the essential dictionary semantics, is thread safe and has a fixed, configurable size, for example in a multithreaded http server like CherryPy.

Although many dictionary operation like getting an item are reported to be atomic and therefore thread safe, this is actually an implementation specific feature of the widely used CPython implementation. And even so, adding keys or iterating over the keys in the dictionary might not be thread safe at all. We must therefore use some sort of locking mechanism to ensure no two threads try to modify the cache at the same time. (For more information check this discussion.)

The Cache class shown here features a configurable size and if the number of entries is too big it removes the oldest entry. We do not have to maintain a explicit usage administration for that because we make use of the properties of the OrderedDict class which remembers the order in which keys are inserted and sports a popitem() method that will remove the first (or last) item inserted.

from collections import OrderedDict
from threading import Lock

class Cache:
    def __init__(self,size=100):
        if int(size)<1 :
            raise AttributeError('size < 1 or not a number')
        self.size = size
        self.dict = OrderedDict()
        self.lock = Lock()

    def __getitem__(self,key):
        with self.lock:
            return self.dict[key]

    def __setitem__(self,key,value):
        with self.lock:
            while len(self.dict) >= self.size:
                self.dict.popitem(last=False)
            self.dict[key]=value

    def __delitem__(self,key):
        with self.lock:
            del self.dict[key]

Due to the functionality of the OrderedDict class we use, the implementation is very concise. The __init__() method merely checks whether the size attribute makes any sense and creates an instance of an OrderedDict and a Lock.

The with statements used in the remaining methods wait for the acquisition of the lock and guarantee that the lock is released even if an exception is raised. The __getitem__() method merely tries to retrieve a value by trying the key on the ordered dictionary after acquiring a lock.

The __setitem__() method removes as many items within its while loop to reduce the size to below the preset amount and then adds the new value. The popitem() method of an OrderedDict removes the least recently added key/value pair if it's last argument is set to False.

The __delitem__() also merely passes on the control to the underlying dictionary. Together these methods allow for any instance of our Cache class to be used like any other dictionary as the example code below illustrates:

>>> from cache import Cache
>>> c=Cache(size=3)
>>> c['key1']="one"
0
>>> c['key2']="two"
1
>>> c['key3']="three"
2
>>> c['key4']="four"
3
>>> c['key4']
'four'
>>> c['key1']
Traceback (most recent call last):
  File "", line 1, in 
  File "cache.py", line 13, in __getitem__
    return self.dict[key]
KeyError: 'key1'

Of course this doesn't show off the thread safety but it does show that the semantics are pretty much like that of a regular dictionary. If needed this class even be extended with suitable iterators/view like keys() and items() but for most caches this probably isn't necessary.