Package wsatools :: Module Parallel
[hide private]

Source Code for Module wsatools.Parallel

  1  #------------------------------------------------------------------------------ 
  2  #$Id: Parallel.py 10000 2013-08-12 22:39:54Z RossCollins $ 
  3  """ 
  4     An alternative implementation of multiprocessing.Pool.map that is faster, 
  5     more robust and which also works for class methods and non-pickleable  
  6     functions. 
  7   
  8     Usage 
  9     ===== 
 10   
 11     Interface is the same as the map function. The first argument is the name 
 12     of a function that takes only a single argument. The second argument is an 
 13     iterable sequence for which every item will become an argument to the  
 14     function given in the first argument:: 
 15         from wsatools.Parallel import parallel_map 
 16         results = parallel_map(self.runOnData, [1,2,3]) 
 17   
 18     @author: R.S. Collins 
 19     @org:    WFAU, IfA, University of Edinburgh 
 20   
 21     @note: Gratuitously taken from this site: 
 22     U{http://www.astropython.org/snippet/2010/3/Parallel-map-using-multiprocessing} 
 23  """ 
 24  #------------------------------------------------------------------------------ 
 25  import multiprocessing 
 26  import numpy 
 27  #------------------------------------------------------------------------------ 
 28   
29 -def worker(f, ii, chunk, out_q, err_q, _lock):
30 """ 31 A worker function that maps an input function over a 32 slice of the input iterable. 33 34 :param f : callable function that accepts argument from iterable 35 :param ii : process ID 36 :param chunk: slice of input iterable 37 :param out_q: thread-safe output queue 38 :param err_q: thread-safe queue to populate on exception 39 :param _lock: thread-safe lock to protect a resource 40 ( useful in extending parallel_map() ) 41 """ 42 vals = [] 43 44 # iterate over slice 45 for val in chunk: 46 try: 47 result = f(val) 48 except Exception as e: 49 err_q.put(e) 50 return 51 52 vals.append(result) 53 54 # output the result and task ID to output queue 55 out_q.put((ii, vals))
56 57 #------------------------------------------------------------------------------ 58
59 -def run_tasks(procs, err_q, out_q, num):
60 """ 61 A function that executes populated processes and processes 62 the resultant array. Checks error queue for any exceptions. 63 64 :param procs: list of Process objects 65 :param out_q: thread-safe output queue 66 :param err_q: thread-safe queue to populate on exception 67 :param num : length of resultant array 68 69 """ 70 # function to terminate processes that are still running. 71 die = (lambda vals : [val.terminate() for val in vals 72 if val.exitcode is None]) 73 74 try: 75 for proc in procs: 76 proc.start() 77 78 for proc in procs: 79 proc.join() 80 81 except Exception as e: 82 # kill all slave processes on ctrl-C 83 die(procs) 84 raise e 85 86 if not err_q.empty(): 87 # kill all on any exception from any one slave 88 die(procs) 89 raise err_q.get() 90 91 # Processes finish in arbitrary order. Process IDs double 92 # as index in the resultant array. 93 results = [None] * num; 94 while not out_q.empty(): 95 idx, result = out_q.get() 96 results[idx] = result 97 98 # Remove extra dimension added by array_split 99 return list(numpy.concatenate(results))
100 101 #------------------------------------------------------------------------------ 102
103 -def parallel_map(function, sequence, numcores=None):
104 """ 105 A parallelised version of the native Python map function that utilises the 106 Python multiprocessing module to divide and conquer sequence. Example: 107 108 >>> parallel_map(lambda x: x**2, [1,2,3]) 109 [1, 4, 9] 110 111 parallel_map does not yet support multiple argument sequences. 112 113 :param function: callable function that accepts argument from iterable 114 :param sequence: iterable sequence 115 :param numcores: number of cores to use 116 117 """ 118 if not callable(function): 119 raise TypeError("input function '%s' is not callable" % repr(function)) 120 121 if not numpy.iterable(sequence): 122 raise TypeError("input '%s' is not iterable" % repr(sequence)) 123 124 size = len(sequence) 125 126 if numcores is None: 127 numcores = multiprocessing.cpu_count() 128 129 if size is 1 or numcores is 1: 130 return map(function, sequence) 131 132 # Returns a started SyncManager object which can be used for sharing 133 # objects between processes. The returned manager object corresponds 134 # to a spawned child process and has methods which will create shared 135 # objects and return corresponding proxies. 136 manager = multiprocessing.Manager() 137 138 # Create FIFO queue and lock shared objects and return proxies to them. 139 # The managers handles a server process that manages shared objects that 140 # each slave process has access to. Bottom line -- thread-safe. 141 out_q = manager.Queue() 142 err_q = manager.Queue() 143 lock = manager.Lock() 144 145 # if sequence is less than numcores, only use len sequence number of 146 # processes 147 if size < numcores: 148 numcores = size 149 150 # group sequence into numcores-worth of chunks 151 sequence = numpy.array_split(sequence, numcores) 152 153 procs = [multiprocessing.Process(target=worker, 154 args=(function, ii, chunk, out_q, err_q, 155 lock)) 156 for ii, chunk in enumerate(sequence)] 157 158 return run_tasks(procs, err_q, out_q, numcores)
159 160 #------------------------------------------------------------------------------ 161 162 if __name__ == "__main__": 163 import doctest 164 doctest.testmod() 165 166 #------------------------------------------------------------------------------ 167