1
2
3 """
4 An alternative implementation of multiprocessing.Pool.map that is faster,
5 more robust and which also works for class methods and non-pickleable
6 functions.
7
8 Usage
9 =====
10
11 Interface is the same as the map function. The first argument is the name
12 of a function that takes only a single argument. The second argument is an
13 iterable sequence for which every item will become an argument to the
14 function given in the first argument::
15 from wsatools.Parallel import parallel_map
16 results = parallel_map(self.runOnData, [1,2,3])
17
18 @author: R.S. Collins
19 @org: WFAU, IfA, University of Edinburgh
20
21 @note: Gratuitously taken from this site:
22 U{http://www.astropython.org/snippet/2010/3/Parallel-map-using-multiprocessing}
23 """
24
25 import multiprocessing
26 import numpy
27
28
29 -def worker(f, ii, chunk, out_q, err_q, _lock):
30 """
31 A worker function that maps an input function over a
32 slice of the input iterable.
33
34 :param f : callable function that accepts argument from iterable
35 :param ii : process ID
36 :param chunk: slice of input iterable
37 :param out_q: thread-safe output queue
38 :param err_q: thread-safe queue to populate on exception
39 :param _lock: thread-safe lock to protect a resource
40 ( useful in extending parallel_map() )
41 """
42 vals = []
43
44
45 for val in chunk:
46 try:
47 result = f(val)
48 except Exception as e:
49 err_q.put(e)
50 return
51
52 vals.append(result)
53
54
55 out_q.put((ii, vals))
56
57
58
60 """
61 A function that executes populated processes and processes
62 the resultant array. Checks error queue for any exceptions.
63
64 :param procs: list of Process objects
65 :param out_q: thread-safe output queue
66 :param err_q: thread-safe queue to populate on exception
67 :param num : length of resultant array
68
69 """
70
71 die = (lambda vals : [val.terminate() for val in vals
72 if val.exitcode is None])
73
74 try:
75 for proc in procs:
76 proc.start()
77
78 for proc in procs:
79 proc.join()
80
81 except Exception as e:
82
83 die(procs)
84 raise e
85
86 if not err_q.empty():
87
88 die(procs)
89 raise err_q.get()
90
91
92
93 results = [None] * num;
94 while not out_q.empty():
95 idx, result = out_q.get()
96 results[idx] = result
97
98
99 return list(numpy.concatenate(results))
100
101
102
104 """
105 A parallelised version of the native Python map function that utilises the
106 Python multiprocessing module to divide and conquer sequence. Example:
107
108 >>> parallel_map(lambda x: x**2, [1,2,3])
109 [1, 4, 9]
110
111 parallel_map does not yet support multiple argument sequences.
112
113 :param function: callable function that accepts argument from iterable
114 :param sequence: iterable sequence
115 :param numcores: number of cores to use
116
117 """
118 if not callable(function):
119 raise TypeError("input function '%s' is not callable" % repr(function))
120
121 if not numpy.iterable(sequence):
122 raise TypeError("input '%s' is not iterable" % repr(sequence))
123
124 size = len(sequence)
125
126 if numcores is None:
127 numcores = multiprocessing.cpu_count()
128
129 if size is 1 or numcores is 1:
130 return map(function, sequence)
131
132
133
134
135
136 manager = multiprocessing.Manager()
137
138
139
140
141 out_q = manager.Queue()
142 err_q = manager.Queue()
143 lock = manager.Lock()
144
145
146
147 if size < numcores:
148 numcores = size
149
150
151 sequence = numpy.array_split(sequence, numcores)
152
153 procs = [multiprocessing.Process(target=worker,
154 args=(function, ii, chunk, out_q, err_q,
155 lock))
156 for ii, chunk in enumerate(sequence)]
157
158 return run_tasks(procs, err_q, out_q, numcores)
159
160
161
162 if __name__ == "__main__":
163 import doctest
164 doctest.testmod()
165
166
167