Showing posts with label subprocess. Show all posts
Showing posts with label subprocess. Show all posts

Finding stuck or hot pixels in Nikon D80 images, a multiprocessing approach

Earlier I described a method to find hot or stuck pixels by determining the variance of (sub)pixels over a large set of photos. In this article we take a look at a way to farm out this work to multiple processes.

The parallel algorithm for calculating variance

The parallel algorithm allows us to combine calculated variances from separate sets. With the help of Python's multiprocessing module it is straight forward to implement our hot pixel finding algorithm to one that takes advantage of multiple cores:

import Image
import numpy as np
import sys
from glob import glob
from multiprocessing import Pool,current_process
from time import time

def process_pixels(shape,*args):
	first = True
	for filename in args:
		pic = Image.open(filename)
		if pix.shape != shape:
			print("shapes don't match")
			continue
		if first:
			first = False
			firstpix = pix
			n = np.zeros(firstpix.shape)
			mean = np.zeros(firstpix.shape)
			M2 = np.zeros(firstpix.shape)
			delta = np.zeros(firstpix.shape)
		n += 1
		delta = pix - mean
		mean += delta/n
		M2 += delta*(pix - mean)
	return M2
	
if __name__ == '__main__':
	global pool

	n=int(sys.argv[1])
	pool = Pool(n)
	filenames = []
	for a in sys.argv[2:]:
		filenames.extend(glob(a))
	
	shape = np.array(Image.open(filenames[0])).shape

	s=time()
	results = []
	for i in range(n):
	results.append(pool.apply_async(process_pixels,tuple([shape]+filenames[i::n])))
	for i in range(n):
		results[i]=results[i].get()
	M2 = sum(results)
	print(time()-s)
	
	mini = np.unravel_index(M2.argmin(),M2.shape)
	maxi = np.unravel_index(M2.argmax(),M2.shape)
	print('min',M2[mini],mini)
	print('max',M2[maxi],maxi)
	print('mean',np.mean(M2))

	sorti = M2.argsort(axis=None)
	print(sep="\n",*[(i,M2[i]) for i in [np.unravel_index(i,M2.shape) for i in sorti[:10]]])

	print(time()-s)

Note that because we return complete arrays (line 26) we gain almost nothing for small data sets because of the overhead of shuttling these large arrays (> 30 MByte) between processes. This is illustrated in the following graph with shows the elapsed time as a function of the number of processes, both for a small number of pictures (50) and a large number (480).

Some other notable issues: in the code we do not actually implement the parallel algorithm but we simple add together the variances. Because we're looking for a minimum variance we gain nothing by adding a constant value.

Memory usage is another thing to be aware of (and the reason that there is no entry for six cores in the graph. The algorithm we have implemented uses 5 arrays (the pixel data itself included). That makes for 10 megapixel X 3 colors X 5 arrays X 8 bytes data (because we use 64 bit floats by default) which makes for a whopping 1.2 Gigabyte of data per process or more than 6 Gig with 5 processes. With some other applications open a sixth process wouldn't fit on my test machine. Because we're adding pixel values in the range 0 - 255 we could probably gain a lot by using 32 bit floats or even 16 bit floats here.

A SQLite multiprocessing proxy, part 2

In a previous article we decided to use Python's multiprocessing module to leverage the power of multi-core machines. Our use case is all about web applications served by CherryPy and so multi-processing isn't the only interesting part: our application will be multi-threaded as well. IN this article we present a first implementation of a multi-threaded application that hands off the heavy lifting to a pool of subprocesses.

The design

The design is centered on the following concepts:

  • The main process consists of multiple threads,
  • The work is done by a pool of subprocesses,
  • Transferring data to and from the subprocesses is left to the pool manager
Schematically we can visualize it as follows:

Sample code

We start of by including the necessary components:

from multiprocessing import Pool,current_process
from threading import current_thread,Thread
from queue import Queue
import sqlite3 as dbapi
from time import time,sleep
from random import random
The most important ones we need are the Pool class from the multiprocessing module and the Thread class from the threading module. We also import queue.Queue to act as a task list for the threads. Note that the multiprocessing module has its own Queue implementation that is not only thread safe but can be used for inter process communication as well but we won't be using that one here but rely on a simpler paradigm as we will see.

The next step is to define a function that may be called by the threads.

def execute(sql,params=tuple()):
 global pool
 return pool.apply(task,(sql,params))
It takes a string argument with SQL code and an optional tuple of parameters just like the Cursor.execute() method in the sqlite3 module. It merely passes on these arguments to the apply() method of the multiprocessing.Pool instance that is referred to by the global pool variable. Together with SQL string and parameters a reference to the task() function is passed, which is defined below:
def task(sql,params):
 global connection
 c=connection.cursor()
 c.execute(sql,params)
 l=c.fetchall()
 return l
This function just executes the SQL and returns the results. It assumes the global variable connection contains a valid sqlite3.Connection instance, something that is taken care of by the connect function that will be passed as an initializer to any new subprocess:
def connect(*args):
 global connection
 connection = dbapi.connect(*args)

Before we initialize our pool of subprocess let's have a look at the core function of any thread we start in our main process:

def threadwork(initializer=None,kwargs={}):
 global tasks
 if not ( initializer is None) :
  initializer(**kwargs)
 while(True):
  (sql,params) = tasks.get()
  if sql=='quit': break
  r=execute(sql,params)
It calls an optional thread initializer first and then enters a semi infinite loop in line 5. This loops starts by fetching an item from the global tasks queue. Each item is a tuple consisting of a string and another tuple with parameters. If the string is equal to quit we do terminate the loop otherwise we simple pass on the SQL statement and any parameters to the execute function we encountered earlier, which will take care of passing it to the pool of subprocesses. We store the result of this query in the r variable even though we do nothing with it in this example.

For this simple example we also need an database that holds a table with some data we can play with. We initialize this table with rows containing random numbers. When we benchmark the code we can make this as large as we wish to get meaningful results; after all, our queries should take some time to complete otherwise there would be no need to use more processes.

def initdb(db,rows=10000):
 c=dbapi.connect(db)
 cr=c.cursor()
 cr.execute('drop table if exists data');
 cr.execute('create table data (a,b)')
 for i in range(rows):
  cr.execute('insert into data values(?,?)',(i,random()))
 c.commit()
 c.close()

The final pieces of code tie everything together:

if __name__ == '__main__':
 global pool
 global tasks
 
 tasks=Queue()
 db='/tmp/test.db'
 
 initdb(db,100000)
 
 nthreads=10
 
 for i in range(100):
  tasks.put(('SELECT count(*) FROM data WHERE b>?',(random(),)))
 for i in range(nthreads):
  tasks.put(('quit',tuple()))
 
 pool=Pool(2,connect,(db,))
 
 threads=[]
 for t in range(nthreads):
  th=Thread(target=threadwork,kwargs={'initializer':thread_initializer})
  threads.append(th)
  th.start()
 for th in threads:
  th.join()
After creating a queue in line 5 and initializing the database in line 8, the next step is to fill a queue with a fair number of tasks (line 12). The final tasks we add to the queue signal a thread to stop (line 14). We need as many of them as there will be threads.

In line 17 we initialize our pool of processes. Just two in this example, but in general the number should be equal to the number of cpu's in the system. If you omit this argument the number will default to exactly that. Next we create (line 21) and start (line 23) the number of threads we want. The target argument points to the function we defined earlier that does all the work, i.e. pops tasks from the queue and passes these on to the pool of processes. The final lines simply wait till all threads are finished.

What's next?

In a following article we will benchmark and analyze this code and see how we can improve on this design.

Talking to gnuplot by pipes

Drawing a graph shouldn't be difficult and Gnuplot indeed does make it simple to draw excellent graphs of all sorts of data. This little Python snippet shows how simple it is to talk to a Gnuplot subprocess from a Python program to draw a graph.

If you have an elaborate dataset that you want to explore in various ways it is probably easiest to run Gnuplot and refer to this file in the plot command. However in may other situations it is often more convenient to let Python talk to gnuplot via a pipe, writing commands and data directly to a gnuplot subprocess, with the possible added benefit that running Gnuplot as a complete separate process may utilize a multi-core processor better than Python can on its own.

Gnuplot is capable of interacting with another process over a pipe although on Windows you'll need not the main Gnuplot program but the executable pgnuplot.exe which is part of the distriubution.

Of course the wish to talk to Gnuplot this way is not a unique one. A fairly elaborate module exists but it doesn't look that actively maintained as it does not support Python 3.x. Moreover, it depends on the numpy package which is excellent but a bit heavy handed in many situations.

So what we are looking for is the simplest way to communicate with Gnuplot to draw a graph from an array of x,y values. The snippet below shows how this can be accomplished:

from subprocess import Popen,PIPE

gnuplot = r'C:\gp45-winbin\gp45-winbin\gnuplot\binary\pgnuplot'

data = [(x,x*x) for x in range(10)]

plot=Popen([gnuplot,'-persist'],stdin=PIPE,stdout=PIPE,stderr=PIPE)

plot.stdin.write(b"plot '-' with lines\n")
plot.stdin.write("\n".join("%f %f"%d for d in data).encode())
plot.stdin.write(b"\ne\n")
plot.stdin.flush()

The gnuplot variable points to the pipe capable gnuplot executable. On windows this is pgnuplot, on Unix-like operating systems the main executable will work just fine as well. The data variable hold a list of (x,y) tuples.

The important bit is in the Popen() call. The first argument is a list consisting of the program name and any options we wish to pass to this program. In this case we add the -persist option to prevent closing the window containing the graph. We also indicate that all input and output streams should be pipes.

The final statements show how we pass content to the input stream of the subprocess. Note that the write() method of such a file like object expects bytes, not a string, so we either provide bytes literals like b'' or use the encode() method of a string. The plot statement we send to Gnuplot is given '-' as the file name which will cause any subsequent lines sent to it to be interpreted as data, up to a line containing a single e.