Showing posts with label security. Show all posts
Showing posts with label security. Show all posts

Function annotations in Python, checking parameters in a web application server , part II

In a previous article I illustrated how we could put Python's function annotations to good use to provide use with a syntactically elegant way to describe which kind of parameter content would be acceptable to our simple web application framework. In this article the actual implementation is presented along with some notes on its usage.

A simple Python application server with full argument checking

The applicationserver module provides a ApplicationRequestHandler class that derives from the Python provided BaseHTTPRequestHandler class. For now all we do is provide a do_GET() method, i.e. we don't bother with HTTP POST methods yet, although it wouldn't take too much effort if we would.

The core of the code is presented below:

class ApplicationRequestHandler(BaseHTTPRequestHandler):
 
 def do_GET(self):
  url=urlsplit(self.path)
  path=url.path.strip('/')
  parts=path.split('/')
  if parts[0]=='':
   parts=['index']
   
  ob=self.application
  try:
   for p in parts:
    if hasattr(ob,p):
     ob=getattr(ob,p)
    else:
     raise AttributeError('unknown path '+ url.path)
  except AttributeError as e:
   self.send_error(404,str(e))
   return
   
  if ('return' in ob.__annotations__ and 
   issubclass(ob.__annotations__['return'],IsExposed)):
   try:
    kwargs=self.parse_args(url.query,ob.__annotations__)
   except Exception as e:
    self.send_error(400,str(e))
    return
    
   try:
    result=ob(**kwargs)
   except Exception as e:
    self.send_error(500,str(e))
    return
  else:
   self.send_error(404,'path not exposed'+ url.path)
   return
   
  self.wfile.write(result.encode())

 @staticmethod
 def parse_args(query,annotations):
  kwargs=defaultdict(list)
  if query != '':
   for p in query.split('&'):
    (name,value)=p.split('=',1)
    if not name in annotations:
     raise KeyError(name+' not annotated')
    else:
     kwargs[name].append(annotations[name](value))
   for k in kwargs:
    if len(kwargs[k])==1:
     kwargs[k]=kwargs[k][0]
  return kwargs
  
The first thing we do in the do_GET() method is splitting the path into separate components. The path is provided in the path member and is already stripped of hostname and query parameters. We strip from it any leading or trailing slashes as well (line 5). If there are no path components we will look for a method called index.

The next step is to check each path component and see if its a member of the application we registered with the handler. If it is, we retrieve it and check whether the next component is a member of this new object. If any of these path components is missing we raise an error (line 16).

If all parts of the path can be resolved as members, we check whether the final path points to an executable with a return allocation. If this return allocation is defined and equal to our IsExposed class (line 22) we are willing to execute it, otherwise we raise an error.

The next step is to check each argument, so we pass the query part of the URL to the static parse_args() method that will return us a dictionary of values if all values checked out ok. If so we call the method that we found earlier with these arguments and if all went well, write its result to the output stream that will deliver the content to the client.

The parse_args() method is not very complicated: It creates a default dictionary whose default will be an empty list. This way we can create a list of values if the query consists of more than one part with the same name. Then we split the query on the & character (line 44) and split each part in a name and a value part (these are separated by a = character). Next we check if the name is present in the annotations dictionary and if not raise a KeyError.

If we did find the name in the annotations dictionary its associated value should be an executable that we pass the value from the query part (line 48). The result of this check (or conversion) is appended to the list in the default dictionary. If the value in the annotation is not an executable an exception will be raised that will not be caught. Likewise will any exception within this callable bubble up to the calling code. The final lines (line 50-53) reduce those entries in the default dictionary that consist of a list with just a single item to just that item before returning.

Conclusion

Python function annotations can be used for many purposes and this example code show a rather elegant (I think) example that let's us specify in clear language what we expect of functions that are part of web applications, which makes it quite easy too catch many input errors in an way that can be adapted to almost any input pattern.

A SQLite thread safe password store

Prompted by one of the reviewers of my upcoming book I decided I needed a simple, thread safe, and reasonably secure password store backed by SQLite.

The design criteria were straight forward and based in part on Storing passwords - done right! and the practical recommendations onPythonSecurity.org:

  • based on SQLite
  • allow for a reasonable amount of threads (its intended use is within a CherryPy application)
  • able to use a salt with a configurable number of random bits
  • able to apply key stretching with a configurable number of iterations
  • use any secure hash algorithm from Python's hashlib module
A few simple test show that the recommended defaults, i.e. 64 bits of randomness in the salt and a 1000 iterations on the hash will allow for a password in check in well under a second even on my humble netbook with an atom processor. Obviously the hashing algorithms in Python's underlying OpenSSL library are very efficient. But we do have to choose some default that strikes a reasonable balance between providing a single user that tries to log in a good response and slowing down a brute force attack. For now we stick with 1000 iterations as the default but feel free to use 10000 or even more.

Example

from dbpassword import dbpassword

dbpw = dbpassword('/var/password.db')

# later, from any thread

dbpw.update(user,plaintextpassword) # update or set a new password


if dbpw.check(user,plaintextpassword) :
     ... do stuff ...
else:
     ... warn off user ...

The dbpassword module

Warning! I am not a cryptographer so I cannot guarantee the following code is safe enough for your needs.


'''
    dbpassword.py Copyright 2011, Michel J. Anders

    This program is free software: you can redistribute it
    and/or modify it under the terms of the GNU General Public
    License as published by the Free Software Foundation,
    either version 3 of the License, or (at your option) any
    later version.

    This program is distributed in the hope that it will be 
    useful, but WITHOUT ANY WARRANTY; without even the implied
    warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
    PURPOSE. See the GNU General Public License for more
    details.

    You should have received a copy of the GNU General Public
    License along with this program.  If not, see 
    .
'''
import sqlite3
import hashlib
from random import SystemRandom as sr
import threading

class dbpassword:

    @staticmethod
    def hashpassword(name,salt,plaintextpassword,n=10):
        if n<1 : raise ValueError("n < 1")
        d = hashlib.new(name,(salt+plaintextpassword).encode()).digest()
        while n:
            n -= 1
            d = hashlib.new(name,d).digest()
        return hashlib.new(name,d).hexdigest()

    @staticmethod
    def getsalt(randombits=64):
        if randombits<16 : raise ValueError("randombits < 16")
        return "%016x"%sr().getrandbits(randombits)

    def __connect(self):
        if not hasattr(self.local,'con') or self.local.con is None:
            self.local.con = sqlite3.connect(self.db)
            self.local.con.create_function('crypt',2,
                lambda s,p:dbpassword.hashpassword(
                  self.secure_hash,s,p,self.iterations))
        return self.local.con
    
    def __init__(self,db,
                   secure_hash='sha256',iterations=1000,saltbits=64):
        self.db = db
        self.local = threading.local()
        self.secure_hash = secure_hash
        self.iterations = iterations
        self.saltbits = 64
        with self.__connect() as con:
            cursor=con.cursor()
            sql='create table if not exists pwdb (user unique, salt, password)'
            cursor.execute(sql)    
    
    def update(self,user,plaintextpassword):
        with self.__connect() as con:
            cursor=con.cursor()
            sql1='insert or replace into pwdb (user,salt) values(?,?)'
            sql2='update pwdb set password=? where user = ?'
            salt=dbpassword.getsalt(self.saltbits)
            cursor.execute(sql1,(user,salt))
            cursor.execute(sql2,(dbpassword.hashpassword(
                    self.secure_hash,salt,plaintextpassword,
                    self.iterations),user))

    def check(self,user,plaintextpassword):
        cursor=self.__connect().cursor()
        sql='select user from pwdb where user = ? and crypt(salt,?) = password'
        cursor.execute(sql,(user,plaintextpassword))
        found=list(cursor) # can only create a list form this iterator once!
        return len(found)==1