Author: | Dave Kuhlman |
---|---|
Contact: | dkuhlman (at) davekuhlman (dot) org |
Address: | http://www.reifywork.com |
Revision: | 1.1a |
Date: | September 03, 2015 |
Copyright: | Copyright (c) 2013 Dave Kuhlman. All Rights Reserved. This software is subject to the provisions of the MIT License http://www.opensource.org/licenses/mit-license.php. |
---|---|
Abstract: | This document is an introduction, guide, and how-to on multiprocessing, parallel processing, and distributed processing programming in Python using several different technologies, for example, XML-RPC, IPython parallel processing, and Erlang+Erlport. |
Contents
This document is a survey of several different ways of implementing multiprocessing systems in Python. It attempts to provide a small amount of guidance on when it is appropriate and useful to use these different approaches, and when not.
We all have multi-core machines. It's easy to imagine a home will multiple computers and devices of several different kinds connected on a LAN (local area network) through Ethernet or wireless connections. Most (soon all) of those devices have multiple cores. And, yet most of that power is wasted while many of those cores are idle.
So, why do we all have machines with so many unused cores. Because Intel and AMD must compete, and to do so, must give us what appear to be faster machines. They can't give us more cycles (per second), since, if they did, our machines would melt. So, they give us additional cores. They number of transistors goes up, and Moore's law (technically) holds true, but for most of us, that power is largely unused and unusable.
Enough ranting ... The alternatives and options discussed in this document are all intended to solve that problem. We have tools that are looking for uses. We need to learn how to put them to fuller use so that next year we can justify buying yet another machine with more cores to add to our home networks.
My central goal in writing this document is to enable and encourage more of us to write the software that puts those machines and cores to work.
And, note Larry Wall's three virtues of programming, in particular:
"Impatience: The anger you feel when the computer is being lazy. This makes you write programs that don't just react to your needs, but actually anticipate them. Or at least pretend to."
For each of the above alternatives I'll try to cover: (1) appropriate (and inappropriate) uses; (2) possible use cases; (3) some how-to instruction; and example code.
Notice that we will be paying special attention to one specific multiprocessing programming pattern. We want a scheme in which (1) there are multiple servers; (2) there are multiple clients; (3) any client can submit a task (function call) to be evaluated by any available server. You might thing of this pattern as using a pool of servers (processes) to which clients can submit (often compute intensive) function calls.
XML-RPC is a simple and easy way to get distributed processing. With it, you cat request that a function be called in a Python process on a remote machine and that the result be returned to you.
We'll use the modules in the Python standard library.
On the server side, we implement conventional Python functions, and then register them with an XML-RPC server. Here is a simple, sample server:
#!/usr/bin/env python """ Synopsis: A simple XML-RPC server. """ #import xmlrpclib from SimpleXMLRPCServer import SimpleXMLRPCServer import inspect class Methods(object): def multiply(self, x, y): return x * y def is_even(n): """Return True if n is even.""" return n % 2 == 0 def is_odd(n): """Return True if n is odd.""" return n % 2 == 1 def listMethods(): """Return a list of supported method names.""" return Supported_methods.keys() def methodSignature(method_name): """Return the signature of a method.""" if method_name in Supported_methods: func = Supported_methods[method_name] return inspect.getargspec(func).args else: return 'Error. Function "{}" not supported.'.format(method_name) def methodHelp(method_name): """Return the doc string for a method.""" if method_name in Supported_methods: func = Supported_methods[method_name] return func.__doc__ else: return 'Error. Function "{}" not supported.'.format(method_name) Supported_methods = { 'is_even': is_even, 'is_odd': is_odd, 'listMethods': listMethods, 'methodSignature': methodSignature, 'methodHelp': methodHelp, } def start(): node = '192.168.0.7' port = 8000 server = SimpleXMLRPCServer((node, port)) print "Listening on {} at port {} ...".format(node, port) for name, func in Supported_methods.items(): server.register_function(func, name) methods = Methods() multiply = methods.multiply server.register_function(multiply, 'multiply') server.register_function(listMethods, "listMethods") server.serve_forever() def main(): start() if __name__ == '__main__': main()
And, on the client side, it's simply a matter of creating a "proxy" and doing what looks like a standard Python function call through that proxy. Here is a simple, sample client:
#!/usr/bin/env python """ Synopsis: A simple XML-RPC client. """ import xmlrpclib def discover_methods(proxy): method_names = proxy.listMethods() for method_name in method_names: sig = proxy.methodSignature(method_name) help = proxy.methodHelp(method_name) print 'Method -- {}'.format(method_name) print ' Signature: {}'.format(sig) print ' Help : {}'.format(help) def request(proxy, ival): ret_ival = str(proxy.is_even(ival)) print "{0} is even: {1}".format(ival, ret_ival) def main(): node = '192.168.0.7' port = 8000 url = "http://{}:{}".format(node, port) proxy = xmlrpclib.ServerProxy(url) print "Requests sent to {} at port {} ...".format(node, port) discover_methods(proxy) for ival in range(10): request(proxy, ival) answer = proxy.multiply(5, 3) print 'multiply answer: {}'.format(answer) if __name__ == '__main__': main()
Notes:
If you only want to access this XML-RPC server only from the local machine, then you might create the server with the following:
server = SimpleXMLRPCServer(('localhost', 8000))
And, in the client, create the proxy with the following:
proxy = xmlrpclib.ServerProxy(http://localhost:8000)
Notice that in the server, we can expose a method from within a class, also.
FYI, I've been able to run the above XML-RPC scripts across my LAN. In fact, I've run the server on one of my desktop machines, and I connect via WiFi from the client on my Android smart phone using QPython. For more information about QPython see: http://qpython.com/.
There is documentation here: http://ipython.org/ipython-doc/dev/parallel/index.html
One easy way to install Python itself and IPython, SciPy, Numpy, etc. is to install the Anaconda toolkit. You can find out about it here: http://www.continuum.io/ and here https://store.continuum.io/cshop/anaconda/.
We'd like to know how to submit tasks for parallel execution. Here is a bit of instruction on how to do it.
Create the cluster. Use the ipcluster executable from the IPython parallel processing. Example:
$ ipcluster start -n 4
Create a client and a load balanced view. Example:
client = Client() view = client.load_balanced_view()
Submit several tasks. Example:
r1 = view.apply(f1, delay, value1, value2) r2 = view.apply(f1, delay, value1 + 1, value2 + 1)
Get the results. Example:
print r1.result, r2.result
Here is the code. Example:
from IPython.parallel import Client def test(view, delay, value1, value2): r1 = view.apply(f1, delay, value1, value2) r2 = view.apply(f1, delay, value1 + 1, value2 + 1) r3 = view.apply(f1, delay, value1 + 2, value2 + 2) r4 = view.apply(f1, delay, value1 + 3, value2 + 3) print 'waiting ...' return r1.result, r2.result, r3.result, r4.result def f1(t, x, y): import time time.sleep(t) r = x + y + 3 return r def main(): client = Client() view = client.load_balanced_view() results = test(view, 5, 3, 4) print 'results:', results if __name__ == '__main__': main()
Notes:
This example asks parallel python to execute four function calls in parallel in four separate processes.
Because these function calls are executed in separate processes, they avoid conflict over Python's GIL (global interpreter lock).
We started the cluster with the default scheduler scheme, which is "least load". For other schemes do the following and look for "scheme":
$ ipcontroller help
Also see: http://ipython.org/ipython-doc/dev/parallel/parallel_task.html#schedulers
Submitting jobs to be run on IPython engines on a remote machine turns out, in some cases at least, to be very easy. Do the following:
Start the IPython controller and engines on the remote machine. For example:
$ ipcluster start -n 4
Copy your client profile ~/.ipython/profile_default/security/ipcontroller-client.json from the remote machine to the security/ directory under the profile you will be using on the local machine.
When you create your client, use something like the following:
client = Client(sshserver='your_user_name@192.168.0.7')
But change the user name and IP address to that of the remote machine.
There is more information on using IPython parallel computing with remote hosts here: http://ipython.org/ipython-doc/dev/parallel/parallel_process.html#using-the-ipcontroller-and-ipengine-commands
You can also create parallel functions by using a Python decorator. Example:
from IPython.parallel import Client import numpy as np Client = Client(sshserver='remote_user_name.168.0.7') Dview = Client[:] @Dview.parallel(block=True) def parallel_multiply(a, b): return a * b def main(): array1 = np.random.random((64, 48)) for count in range(10): result_remote = parallel_multiply(array1, array1) print result_remote if __name__ == '__main__': main()
For more information on IPython remote function decorators, see: http://ipython.org/ipython-doc/dev/parallel/parallel_multiengine.html#remote-function-decorators
The python standard library contains the module multiprocessing. That module (it's actually a Python package or a library that acts like a module) contains some reasonable support for creating and running multiple processes implemented in Python and for communicating between those processes using Queues and Pipes (also in the multiprocessing module). You can learn more about that module here: https://docs.python.org/2/library/multiprocessing.html
Be aware that the multiprocessing module creates separate operating system processes. Each one runs in its own memory space; each one has its own Python interpreter; each one has its own GIL (global interpreter lock); each one has its own copies of imported modules; and each module in each of these multiple processes has its own copies of global variables.
The documentation has examples. And, here is some sample code that is a little more complex:
#!/usr/bin/env python """ synopsis: Example of the use of the Python multiprocessing module. usage: python multiprocessing_module_01.py """ import argparse import operator from multiprocessing import Process, Queue import numpy as np import py_math_01 def run_jobs(args): """Create several processes, start each one, and collect the results. """ queue01 = Queue() queue02 = Queue() queue03 = Queue() queue04 = Queue() m = 4 n = 3 process01 = Process(target=f_multiproc, args=(queue01, 'process01', m, n)) process02 = Process(target=f_multiproc, args=(queue02, 'process02', m, n)) process03 = Process(target=f_multiproc, args=(queue03, 'process03', m, n)) process04 = Process(target=f_multiproc, args=(queue04, 'process04', m, n)) process01.start() process02.start() process03.start() process04.start() raw_input('Check for existence of multiple processes, then press Enter') process01.join() process02.join() process03.join() process04.join() raw_input('Check to see if they disappeared, then press Enter') print queue01.get() print queue02.get() print queue03.get() print queue04.get() def f_multiproc(queue, processname, m, n): seed = reduce(operator.add, [ord(x) for x in processname], 0) np.random.seed(seed) result = py_math_01.test_01(m, n) result1 = result.tolist() result2 = 'Process name: {}\n{}\n-----'.format(processname, result1) queue.put(result2) def main(): parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,) args = parser.parse_args() run_jobs(args) if __name__ == '__main__': #import ipdb; ipdb.set_trace() main()
The above code does the following:
Some benefits of using the multiprocessing module:
Information about Parallel Python is here: http://www.parallelpython.com/
Here is a description from the Parallel Python Web site:
PP is a python module which provides mechanism for parallel execution of python code on SMP (systems with multiple processors or cores) and clusters (computers connected via network).
It is light, easy to install and integrate with other python software.
PP is an open source and cross-platform module written in pure python
Features:
- Parallel execution of python code on SMP and clusters
- Easy to understand and implement job-based parallelization technique (easy to convert serial application in parallel)
- Automatic detection of the optimal configuration (by default the number of worker processes is set to the number of effective processors)
- Dynamic processors allocation (number of worker processes can be changed at run-time)
- Low overhead for subsequent jobs with the same function (transparent caching is implemented to decrease the overhead)
- Dynamic load balancing (jobs are distributed between processors at run-time)
- Fault-tolerance (if one of the nodes fails tasks are rescheduled on others)
- Auto-discovery of computational resources
- Dynamic allocation of computational resources (consequence of auto-discovery and fault-tolerance)
- SHA based authentication for network connections
- Cross-platform portability and interoperability (Windows, Linux, Unix, Mac OS X)
- Cross-architecture portability and interoperability (x86, x86-64, etc.)
- Open source
The examples provided with the distribution work well. But, the project does not seem very active.
Here is a quote:
ØMQ in a Hundred Words
ØMQ (also known as ZeroMQ, 0MQ, or zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fan-out, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multi-core applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPLv3 open source. [Pieter Hintjens; http://zguide.zeromq.org/page:all]
pyzmq, which provides zmq, is the Python bindings for ZeroMQ.
Note that ZeroMQ is underneath IPython parallel. So, it may be appropriate to think of IPython parallel computing as a high level wrapper around ZeroMQ.
There is a good set of examples written in a number of different languages for ZeroMQ. To get them, download the ZeroMQ guide (https://github.com/imatix/zguide.git), then (for us Python programmers) look in zguide/examples/Python.
In order to use pyzmq and to run the examples, you will need to install:
For my testing with Python, I used the Anaconda Python distribution, which contains support for zmq.
We should note that with ZeroMQ, our programming is in some sense using the Actor model, as does Erlang. This is the Actor model in the sense that (1) we are creating separate processes which do not share (in memory) resources and (2) we communicate between those processes by sending messages and waiting on message queues. ZeroMQ differs from Erlang, with respect to the Actor model in the following ways:
Here is a "Hello, World" server that uses pyzmq:
#!/usr/bin/env python """ Hello World server in Python Binds REP socket to tcp://*:5555 Expects b"Hello" from client, replies with b"World" """ import sys import time import zmq def main(): args = sys.argv[1:] if len(args) != 1: sys.exit('usage: python hwserver.py <label>') label = args[0] context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") count = 0 while True: count += 1 # Wait for next request from client message = socket.recv() print("Received request: {} {}".format(message, count)) # Do some 'work' time.sleep(1) # Send reply back to client socket.send(b"{} {} {}".format(label, message, count)) main()
And, here is the "Hello, World" client using pyzmq:
#!/usr/bin/env python """ Hello World client in Python Connects REQ socket to tcp://localhost:5555 Sends "Hello" to server, expects "World" back """ import sys import zmq def main(): args = sys.argv[1:] if len(args) != 1: sys.exit('usage: python hwserver.py <label>') label = args[0] context = zmq.Context() # Socket to talk to server print("Connecting to hello world server...") socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") # Do 10 requests, waiting each time for a response for request in range(10): print("Sending request %s ..." % request) socket.send(b"Hello from {}".format(label)) # Get the reply. message = socket.recv() print("Received reply %s [ %s ]" % (request, message)) main()
If you start hwserver.py in one (bash) session and hwclient.py in another session, you should see the server and the client echoing each other in their respective sessions.
However, if you start one instance of hwserver.py and multiple instances of hwclient.py, you will notice a longer delay between each echo. That's because multiple clients are waiting on a single server. Notice the delay (time.sleep(1)) in the server. Our next challenge is to run the server in multiple processes so that the load from multiple clients will balanced across multiple servers. We could use IPython multiple processing to do that. But, there are ways to accomplish something similar with ZeroMQ itself. See, for example, the documentation on A Load Balancing Message Broker.
One significant benefit of using ZeroMQ is that we can write different processes in different languages. Thus, we can, for example, implement a process in Node.js that sends messages to and requests services from a process written in Python.
In this example, we will use ZeroMQ to accomplish (at least) two things:
Debugging -- A few clues:
In our example, the Node.js module makes multiple requests in the form of ZeroMQ messages that go to a "broker", which passes them along to a Python worker module. If we start up more than one worker processes, these requests will be forwarded, round-robin style, to one or another worker.
Here is our Node.js client:
#!/usr/bin/env node /* A ZeroMQ client implemented in Node.js that requests an XML service from a separte process. */ // Hello World client in Node.js // Connects REQ socket to tcp://localhost:5559 // Sends "Hello" to server, expects "World" back var fileList = [ ['Data/data01.xml', 'person', ], ['Data/data01.xml', null, ], ['Data/data02.xml', null, ], ['Data/data03.xml', null, ], ['Data/missing_file.xml', null, ], ['Data/data04.xml', null, ], ['Data/data05.xml', null, ], ['Data/data06.xml', null, ], ]; function run() { var zmq = require('zmq'), requester = zmq.socket('req'), maxNbr = fileList.length; requester.connect('tcp://localhost:5559'); var replyNbr = 0; requester.on('message', function(msg) { var content = msg.toString(); // console.log('content:', content); content = JSON.parse(content); console.log('got reply', replyNbr, 'file name:', content[0], 'filter: ', content[1], ' count:', content[2]); replyNbr += 1; if (replyNbr >= maxNbr) { console.log('finished and closing socket'); requester.close(); } }); for (var idx = 0; idx < maxNbr; ++idx) { var filename = fileList[idx][0], filter = fileList[idx][1], payload = {filename: filename, filter: filter}; payload = JSON.stringify(payload); requester.send(payload); } } run();
And, this is the Python broker that acts like an intermediary between clients and one or more workers:
#!/usr/bin/env python """ A broker/intermediary implemented in Python that forwards messages from clients to any one of connected workers. """ # Simple request-reply broker # # Author: Lev Givon <lev(at)columbia(dot)edu> import zmq def run(): # Prepare our context and sockets context = zmq.Context() frontend = context.socket(zmq.ROUTER) backend = context.socket(zmq.DEALER) frontend.bind("tcp://*:5559") backend.bind("tcp://*:5560") # Initialize poll set poller = zmq.Poller() poller.register(frontend, zmq.POLLIN) poller.register(backend, zmq.POLLIN) # Switch messages between sockets count = 0 while True: count += 1 socks = dict(poller.poll()) if socks.get(frontend) == zmq.POLLIN: message = frontend.recv_multipart() #import ipdb; ipdb.set_trace() print '{}. broker: frontend --> backend msg: "{}"'.format( count, message[2]) backend.send_multipart(message) if socks.get(backend) == zmq.POLLIN: message = backend.recv_multipart() print '{}. broker: backend --> frontend msg len: "{}"'.format( count, len(message[2])) frontend.send_multipart(message) if __name__ == '__main__': run()
Finally, here is the Python worker that actually uses Lxml to provide XML processing capabilities:
#!/usr/bin/env python """ A ZeroMQ worker implemented in Python that: - receives a message that identifies an XML document; - uses Lxml to parse the document and cound the elements in it; - sends a reply message that contains the count of the elements in the document. """ import zmq from lxml import etree #import json import re def count_elements(root, tagfilter_pat): if (root.tag is not etree.Comment and (tagfilter_pat is None or tagfilter_pat.search(root.tag)is not None)): count = 1 else: count = 0 for node in root.iterdescendants(): if (node.tag is not etree.Comment and (tagfilter_pat is None or tagfilter_pat.search(node.tag)is not None)): count += 1 return count def run(): context = zmq.Context() socket = context.socket(zmq.REP) socket.connect("tcp://localhost:5560") while True: payload = socket.recv() #import ipdb; ipdb.set_trace() payload = json.loads(payload) filename = payload['filename'] tagfilter = payload.get('filter') if tagfilter is not None: tagfilter_pat = re.compile(tagfilter) else: tagfilter_pat = None print("Received request -- filename: %s filter: %s" % ( filename, tagfilter, )) try: doc = etree.parse(filename) except IOError: doc = None if doc is None: count = -1 else: root = doc.getroot() count = count_elements(root, tagfilter_pat) print 'sending -- name: {} count: {}'.format(filename, count) payload = (filename, tagfilter, count) #payload = json.dumps(payload) #socket.send(payload) socket.send_json(payload) if __name__ == '__main__': run()
Notes:
The previous example sent a task to a worker, even if that worker was not yet finished with its previous task. In this next example, the broker will forward a request to a worker only if that worker has signaled that it is finished with it's previous task, if it had one, and that it is ready for its next task.
Again, so as to show how to request services implemented in Python from Node.js, our client is written in Node.js and the broker and workers are written in Python.
Here is the Node.js client:
#!/usr/bin/env node /* Load-balancing broker Node.js client for the load-balancing broker/worker. Send requests asking worker to count elements in an XML document. Optionally send a pattern used to filter elements to me counted. */ var zmq = require('zmq') , frontAddr = 'tcp://127.0.0.1:12346' , log = console.log; var fileList = [ // [ filename, filter-pattern ] ['Data/data02.xml', null, ], ['Data/data03.xml', null, ], ['Data/data01.xml', 'person', ], ['Data/data01.xml', null, ], ['Data/missing_file.xml', null, ], ['Data/data04.xml', null, ], ['Data/data05.xml', null, ], ['Data/data06.xml', null, ], ['Data/data01.xml', 'person', ], ['Data/data01.xml', null, ], ['Data/data02.xml', null, ], ['Data/data03.xml', null, ], ['Data/missing_file.xml', null, ], ['Data/data04.xml', null, ], ['Data/data05.xml', null, ], ['Data/data06.xml', null, ], ['Data/data14.xml', null, ], ]; function clientProcess(ident) { var sock = zmq.socket('req') , maxNbr = fileList.length ; sock.identity = "Client-" + ident; sock.connect(frontAddr); // Start listening for replies. var replyNbr = 0; sock.on('message', function(payload) { replyNbr += 1; var args = Array.apply(null, arguments); data = JSON.parse(payload); log(replyNbr + '. client received filename: ' + data.filename + ' count: ' + data.count); if (replyNbr >= maxNbr) { log('client finished'); sock.close(); } }); // Send requests to the broker. for (var idx = 0; idx < fileList.length; ++idx) { var filename = fileList[idx][0], filter = fileList[idx][1], payload = {filename: filename, filter: filter}; payload = JSON.stringify(payload); var lineno = idx + 1; log(lineno + '. client sending ' + payload); sock.send(payload); } } function main() { var args = process.argv; if (args.length != 3) { process.stderr.write('usage: test03_client.js <client_number>\n'); process.exit(1); } var ident = args[2]; clientProcess(ident); } main();
Notes:
Here is the broker written in Python:
#!/usr/bin/env python """ Load-balancing broker Clients implemented by test03_client.js and test03_client.py. Workers implemented in test03_worker.py. """ import zmq FrontAddress = 'tcp://127.0.0.1:12346' BackAddress = 'tcp://127.0.0.1:12345' def main(): """Load balancer main loop.""" # Prepare context and sockets context = zmq.Context.instance() frontend = context.socket(zmq.ROUTER) frontend.bind(FrontAddress) backend = context.socket(zmq.ROUTER) backend.bind(BackAddress) # Initialize main loop state workers = [] poller = zmq.Poller() # Only poll for requests from backend until workers are available poller.register(backend, zmq.POLLIN) print 'broker waiting' while True: sockets = dict(poller.poll()) if backend in sockets: # Handle worker activity on the backend request = backend.recv_multipart() worker, empty, client = request[:3] print 'broker received request: {}'.format(request) if not workers: # Poll for clients now that a worker is available poller.register(frontend, zmq.POLLIN) workers.append(worker) if client != b"READY" and len(request) > 3: # If client reply, send rest back to frontend empty, reply = request[3:] frontend.send_multipart([client, b"", reply]) if frontend in sockets: # Get next client request, route to last-used worker data = frontend.recv_multipart() print 'broker received request: {}'.format(data) client, empty, request = data worker = workers.pop(0) backend.send_multipart([worker, b"", client, b"", request]) if not workers: # Don't poll clients if no workers are available poller.unregister(frontend) # Clean up backend.close() frontend.close() context.term() if __name__ == "__main__": main()
And, here is the worker, also written in Python:
#!/usr/bin/env python """ Worker process for the load-balancing broker. A worker process implemented in Python for the load-balancing broker. - Receives request message. - Counts elements in XML document, possibly filtering elements by tag. - Sends reply message with results. """ import sys import re import zmq import json from lxml import etree BackAddress = 'tcp://127.0.0.1:12345' def count_elements(root, tagfilter_pat): if (root.tag is not etree.Comment and (tagfilter_pat is None or tagfilter_pat.search(root.tag)is not None)): count = 1 else: count = 0 for node in root.iterdescendants(): if (node.tag is not etree.Comment and (tagfilter_pat is None or tagfilter_pat.search(node.tag)is not None)): count += 1 return count def worker_task(ident): """Worker task, using a REQ socket to do load-balancing.""" socket = zmq.Context().socket(zmq.REQ) socket.identity = u"Worker-{}".format(ident).encode("ascii") socket.connect(BackAddress) # Tell broker we're ready for work socket.send(b"READY") print 'Worker {} sent READY'.format(ident) while True: address, empty, request = socket.recv_multipart() payload = request.decode('ascii') payload = json.loads(payload) filename = payload['filename'] tagfilter = payload.get('filter') if tagfilter is not None: tagfilter_pat = re.compile(tagfilter) else: tagfilter_pat = None try: doc = etree.parse(filename) except IOError: doc = None if doc is None: count = -1 else: root = doc.getroot() count = count_elements(root, tagfilter_pat) payload = { 'filename': filename, 'filter': tagfilter, 'count': count, } payload = json.dumps(payload) print 'worker {} sending payload: {}'.format(ident, payload) socket.send_multipart([address, b'', payload]) def main(): args = sys.argv[1:] if len(args) != 1: sys.exit('usage: test03_worker.py <task_number>') ident = args[0] worker_task(ident) if __name__ == "__main__": main()
Notes:
This section discusses several strategies for running parallel python processes behind a Web server.
When we implement a Web site with Nodejs, Nodejs gives us parallel processing almost with no extra effort. This is because, although a Nodejs Web server handles all requests in a single thread, we can use the Nodejs Cluster module to distribute the handling of requests across multiple processes. Nodejs uses a separate process for each HTTP request (Web socket and AJAX requests not included?). Thus if we use the Nodejs cluster add-on, we get separate, parallel processes and load balancing.
Web application development is not a goal of this document, but there is plenty help and lots of docs at http://nodejs.org and sites that it links to.
What's left to do is to call Python. Since Nodejs is written in JavaScript, this requires some kind of foreign function call. One solution would be to use a message based system, for example ZeroMQ (http://zeromq.org/). zerorpc, which is a package built on top of ZeroMQ, looks hopeful (see: http://zerorpc.dotcloud.com/).
Here is an example of JavaScript (running under Nodejs, say), calling a method in a class written in Python:
var zerorpc = require("zerorpc"); var client = new zerorpc.Client(); client.connect("tcp://127.0.0.1:4242"); client.invoke("hello", "RPC", function(error, res, more) { console.log(res); });
And, here is Python code that could be called by the above:
import zerorpc class HelloRPC(object): def hello(self, name): return "Hello, %s" % name s = zerorpc.Server(HelloRPC()) s.bind("tcp://0.0.0.0:4242") s.run()
What's left to do is to make sure (1) that each Nodejs process has its own Python process (so that compute intensive, long-running Python code (for example, those that result in complex calls to Numpy/SciPy) do not wait on each other and become slowed down by conflict over the same Python GIL (global interpreter lock) and (2) that the Python processes, once started, stay alive, because starting a process is slow.
Erlang does multiprocessing; Erlang enables us to communicate between processes; Erlang with Erlport enables us to create and communicate with Python processes. So, why not try multiprocessing in Python with an Erlang controller of some kind?
A few clarifications:
We'll look at several examples in this document:
All of our examples will use the same Python code. Here it is:
#!/usr/bin/env python """ Synopsis: Sample math functions for use with Erlang and Erlport. Details: test_01 -- Solve the continuous algebraic Riccati equation, or CARE, defined as (A'X + XA - XBR^-1B'X+Q=0) directly using a Schur decomposition method. """ import numpy as np from scipy import linalg from erlport.erlterms import Atom #import json def test_01(m, n): a = np.random.random((m, m)) b = np.random.random((m, n)) q = np.random.random((m, m)) r = np.random.random((n, n)) print '(test_01) m: {} n: {}'.format(m, n, ) result = linalg.solve_continuous_are(a, b, q, r) return result def run(m=4, n=3): result = test_01(m, n) #print result #json_result = json.dumps(result.tolist()) return (Atom('ok'), result.tolist()) def main(): run() if __name__ == '__main__': main()
Notes:
And, here is a simple Erlang program that uses that Python sample with the help of Erlport:
-module(erlport_01). -export([main/0, show_list/2]). main() -> {ok, Pid} = python:start(), {ok, Result} = python:call(Pid, 'py_math_01', main, []), show_list(Result, 1), ok. show_list([], _) -> ok; show_list([Item|Items], Count) -> io:format("~p. Item: ~p~n", [Count, Item]), show_list(Items, Count + 1).
Notes:
In the Erlang interactive shell erl we can compile and then run this as follows:
11> c(erlport_01). {ok,erlport_01} 12> erlport_01:main(). 1. Item: [12.74527763335136,-4.514001033364517,-7.4452420386061835, 5.7441252569184345] 2. Item: [-5.795658295009697,3.897769387542307,4.148522353989249, -3.1221815191228965] 3. Item: [-7.157830191325373,4.088737828859971,8.493144407323305, -6.348281687731655] 4. Item: [3.996836318360595,-2.353597255054639,-3.098202007414951, 3.5956798233304914] ok
In this example, we implement a pool of Erlang+Python processes so that we can request a process from the pool of processes (and wait until one is available, if necessary), use it, and then return it to the pool. The processes in the pool are actually Erlang processes, however, each of those Erlang processes holds (remembers the PID or process identifier) of a Python process. We create each Python process with Erlport.
Here is our Erlang code that implements the pool of processes:
-module(erlport_04). -export([ init/0, start/3, stop/0, rpc/1 ]). init() -> ets:new(pipelinetable01, [named_table]), ok. % % Args: % NumProcesses -- (int) number of processes to put in the pool. % PythonModule -- (atom) the name of the Python module. % ProcessWaitTime -- (int) number of milliseconds to wait if all processes % are busy. % start(NumProcesses, PythonModule, ProcessWaitTime) -> PyProcPids = start_python_processes(NumProcesses, PythonModule, []), PoolPid = spawn(fun() -> pool_loop(PyProcPids, ProcessWaitTime) end), ets:insert(pipelinetable01, {poolpid, PoolPid}), ok. stop() -> rpc(stop_python), ok. rpc(Request) -> case Request of {call_python, Function, Args} -> [{poolpid, PoolPid} | _] = ets:lookup(pipelinetable01, poolpid), PoolPid ! {pop, self()}, receive {ok, PyProcPid} -> PyProcPid ! {call_python, self(), {Function, Args}}, receive {ok, Result} -> PoolPid ! {push, self(), PyProcPid}, case Result of {ok, Result1} -> {ok, Result1}; _ -> unknown_result end; Msg -> {unknown_response, Msg} end; _ -> error end; get_pypid -> [{poolpid, PoolPid} | _] = ets:lookup(pipelinetable01, poolpid), PoolPid ! {pop, self()}, receive {ok, PyProcPid} -> {ok, PyProcPid} end; {put_pypid, PyProcPid} -> [{poolpid, PoolPid} | _] = ets:lookup(pipelinetable01, poolpid), PoolPid ! {push, self(), PyProcPid}, receive ok -> ok end; stop_python -> [{poolpid, PoolPid} | _] = ets:lookup(pipelinetable01, poolpid), PoolPid ! {stop, self()}, receive ok -> ok end end. pool_loop(PyProcPids, ProcessWaitTime) -> receive {push, _From, Proc} -> PyProcPids1 = [Proc | PyProcPids], pool_loop(PyProcPids1, ProcessWaitTime); {pop, From} -> case PyProcPids of [] -> % Give it a chance to return a process to the pool. timer:sleep(ProcessWaitTime), self() ! {pop, From}, pool_loop(PyProcPids, ProcessWaitTime); [PyProcPid | PyProcPids1] -> From ! {ok, PyProcPid}, pool_loop(PyProcPids1, ProcessWaitTime) end; {stop, From} -> stop_python_processes(PyProcPids), From ! ok, ok end. python_loop(PyPid, PythonModule) -> receive {call_python, From, {Function, Args}} -> Result = python:call(PyPid, PythonModule, Function, Args), From ! {ok, Result}, python_loop(PyPid, PythonModule); {stop, From} -> python:stop(PyPid), From ! ok end. start_python_processes(0, _, PyProcPids) -> PyProcPids; start_python_processes(N, PythonModule, PyProcPids) -> {ok, PyPid} = python:start(), PyProcPid = spawn(fun() -> python_loop(PyPid, PythonModule) end), io:format("Started Erlang/Python process -- PyProcPid: ~p~n", [PyProcPid]), start_python_processes(N - 1, PythonModule, [PyProcPid | PyProcPids]). stop_python_processes([]) -> ok; stop_python_processes([PyProcPid|PyProcPids]) -> io:format("Stopping Erlang/Python process -- PyProcPid: ~p~n", [PyProcPid]), PyProcPid ! {stop, self()}, stop_python_processes(PyProcPids).
Notes:
And, here is an Erlang script that can be run from the command line and can be used to drive and test the above Erlang code:
#!/usr/bin/env escript %% vim:ft=erlang: %%! -sname magpie1 -setcookie dp01 main(["-h"]) -> usage(); main(["--help"]) -> usage(); main(Args) -> ArgsSpec = [ {"p", "processes", yes}, {"o", "outfile", yes} ], Args1 = erlopt:getopt(ArgsSpec, Args), %io:format("Args1: ~p~n", [Args1]), Opts = proplists:get_all_values(opt, Args1), Args2 = proplists:get_all_values(arg, Args1), %io:format("Opts: ~p~n", [Opts]), %io:format("Args2: ~p~n", [Args2]), NumProcs1 = proplists:get_value("p", Opts), NumProcs2 = proplists:get_value("processes", Opts), %io:format("NumProcs1: ~p NumProcs2: ~p~n", [NumProcs1, NumProcs2]), NumProcs = case NumProcs1 of undefined -> case NumProcs2 of undefined -> 2; _ -> list_to_integer(NumProcs2) end; _ -> list_to_integer(NumProcs1) end, OutFile1 = proplists:get_value("o", Opts), OutFile2 = proplists:get_value("outfile", Opts), OutFile = case OutFile1 of undefined -> case OutFile2 of undefined -> standard_io; _ -> {ok, OutFile3} = file:open(OutFile2, [write]), OutFile3 end; _ -> {ok, OutFile3} = file:open(OutFile1, [write]), OutFile3 end, {NumReps1, M1, N1} = case Args2 of [] -> {2, 4, 3}; [NumReps] -> {list_to_integer(NumReps), 4, 3}; [NumReps, M, N] -> {list_to_integer(NumReps), list_to_integer(M), list_to_integer(N)} end, run(NumProcs, NumReps1, M1, N1, OutFile), case OutFile of standard_io -> ok; _ -> file:close(OutFile), ok end. run(NumProcs, Count, M, N, IoDevice) -> io:format("NumProcs: ~p Count: ~p M: ~p N: ~p~n", [NumProcs, Count, M, N]), erlport_04:init(), erlport_04:start(NumProcs, py_math_01, 100), run_n(1, Count, M, N, IoDevice), erlport_04:stop(), ok. run_n(Count, Max, _, _, _) when Count > Max -> ok; run_n(Count, Max, M, N, IoDevice) -> %io:format("M: ~p N: ~p~n", [M, N]), Result = erlport_04:rpc({call_python, run, [M, N]}), io:format(IoDevice, "Result ~p:~n~p~n", [Count, Result]), run_n(Count + 1, Max, M, N, IoDevice). usage() -> io:format(standard_error, "usage:~n", []), io:format(standard_error, " $ erlport_04.escript [options] iters [m n]~n", []), io:format(standard_error, "options:~n", []), io:format(standard_error, " -p -- number of processes~n", []), io:format(standard_error, " -o filename -- output file name~n", []), io:format(standard_error, "arguments:~n", []), io:format(standard_error, " iters -- number of iterations to run~n", []), io:format(standard_error, " m n -- size of array to create~n", []), ok.
Notes:
You can test the above code by running the following:
$ ./erlport_04.escript 3 4 3 NumProcs: 2 Count: 3 M: 4 N: 3 Started Erlang/Python process -- PyProcPid: <0.39.0> Started Erlang/Python process -- PyProcPid: <0.41.0> Result 1: {ok,[[0.2597350443603386,0.8903581238544376,0.5228550551729187, -2.3417305007787257], [0.1943395864795484,0.3445498796542211,1.232814979418004, 1.1994281436306256], [-0.03154488685636464,0.33179939314319556,1.829732033028535, 0.9854826930442282], [1.0718123745676555,0.25710117274099364,1.7896961147779082, 6.970965264066136]]} (test_01) m: 4 n: 3 Result 2: {ok,[[21.039769292519303,20.393141829871603,-37.29447963582768, 0.2965148465091619], [1.9668685825947811,0.10150396421323271,-1.6867007920529111, 0.10472863629222694], [2.1560394759605814,0.5010587053323622,-1.7840165638277685, -0.17305258786094993], [24.58501266962403,22.892461157404806,-43.847443385224864, 1.8052772934572985]]} (test_01) m: 4 n: 3 Result 3: {ok,[[-0.20902511715668637,0.7778417615117266,0.9960684337538017, 1.1824488386010166], [0.08584635712529537,-0.9819482057272886,1.2448114851957999, 0.993406879690676], [1.1897115059332493,0.5189873231754997,-0.5711746123118333, -0.966994204829159], [1.5173355312750667,-0.3145814955274761,-0.6455456477102114, -1.5082534601988669]]} Stopping Erlang/Python process -- PyProcPid: <0.41.0> Stopping Erlang/Python process -- PyProcPid: <0.39.0>
What we try to gain in this example, over and above the previous example, is the ability to recover from the failure of one of the Erlang/Python processes. In this code, we ask that we be notified when one of the Erlang/Python processes fails so that we can (1) remove the old (dead) process from the pool and (2) create a new process and insert it into the pool.
Here is the code that does this:
-module(erlport_06). -export([ start/3, start_link/3, init/0, stop/1, restarter/1, rpc/1, pool_loop/3, python_loop/2 ]). % % Args: % NumProcesses -- (int) number of processes to put in the pool. % PythonModule -- (atom) the name of the Python module. % ProcessWaitTime -- (int) number of milliseconds to wait if all processes % are busy. % start(NumProcesses, PythonModule, ProcessWaitTime) -> init(), PyProcs = start_python_processes(NumProcesses, PythonModule, []), PoolPid = spawn(?MODULE, pool_loop, [ PyProcs, ProcessWaitTime, PythonModule]), ets:insert(pipelinetable01, {poolpid, PoolPid}), RestarterPid = spawn(?MODULE, restarter, [PoolPid]), RestarterPid. start_link(NumProcesses, PythonModule, ProcessWaitTime) -> init(), PyProcs = start_python_processes(NumProcesses, PythonModule, []), PoolPid = spawn(?MODULE, pool_loop, [ PyProcs, ProcessWaitTime, PythonModule]), ets:insert(pipelinetable01, {poolpid, PoolPid}), RestarterPid = spawn(?MODULE, restarter, [PoolPid]), RestarterPid. init() -> io:format("creating ETS table~n"), ets:new(pipelinetable01, [named_table]), ok. stop(RestarterPid) -> rpc(stop_python), RestarterPid ! shutdown, ok. rpc(Request) -> case Request of {call_python, Function, Args} -> io:format("call_python. F: ~p A: ~p~n", [Function, Args]), [{poolpid, PoolPid} | _] = ets:lookup(pipelinetable01, poolpid), PoolPid ! {pop, self()}, receive {ok, PyProcPid} -> PyProcPid ! {call_python, self(), {Function, Args}}, receive {ok, Result} -> PoolPid ! {push, self(), PyProcPid}, case Result of {ok, Result1} -> {ok, Result1}; _ -> unknown_result end; Msg -> {unknown_response, Msg} end; _ -> error1 end; get_pypid -> [{poolpid, PoolPid} | _] = ets:lookup(pipelinetable01, poolpid), PoolPid ! {pop, self()}, receive {ok, PyProcsPid} -> {ok, PyProcsPid} end; {put_pypid, PyProcsPid} -> [{poolpid, PoolPid} | _] = ets:lookup(pipelinetable01, poolpid), PoolPid ! {push, self(), PyProcsPid}, receive ok -> ok end; exit -> io:format("(rpc) 1. testing exit~n", []), [{poolpid, PoolPid} | _] = ets:lookup(pipelinetable01, poolpid), PoolPid ! {pop, self()}, receive {ok, PyProcPid} -> io:format("(rpc) 2. testing exit. P: ~p~n", [PyProcPid]), exit(PyProcPid, test_failure), ok; _ -> error2 end; stop_python -> [{poolpid, PoolPid} | _] = ets:lookup(pipelinetable01, poolpid), PoolPid ! {stop, self()}, receive ok -> ok end end. %~ monitor_loop() -> %~ receive %~ {'DOWN', Ref, process, Pid, Reason} -> %~ % remove this python process and start a new one to replace it. %~ io:format("Python process ~p because ~p crashed; restarting~n", %~ [Pid, Reason]), %~ [{poolpid, PoolPid} | _] = ets:lookup(pipelinetable01, poolpid), %~ PoolPid ! {remove_and_add, Ref, Pid}, %~ monitor_loop() %~ end. restarter(PoolPid) -> receive {'EXIT', _Pid, normal} -> % not a crash ok; {'EXIT', _From, shutdown} -> exit(shutdown); % manual termination, not a crash {'EXIT', PyProcPid, Reason} -> io:format("Restarting Py process ~p/~p~n",[PyProcPid, Reason]), % % Remove the old process that died from the pool. % Restart a new erlang/python process to replace the one that died. % Insert the new one in the pool. % PoolPid ! {restart, PyProcPid}, restarter(PoolPid); shutdown -> ok end. pool_loop(PyProcs, ProcessWaitTime, PythonModule) -> receive {push, _From, Proc} -> PyProcs1 = [Proc | PyProcs], pool_loop(PyProcs1, ProcessWaitTime, PythonModule); {pop, From} -> case PyProcs of [] -> % Give it a chance to return a process to the pool. timer:sleep(ProcessWaitTime), self() ! {pop, From}, pool_loop(PyProcs, ProcessWaitTime, PythonModule); [PyProc | PyProcs1] -> From ! {ok, PyProc}, pool_loop(PyProcs1, ProcessWaitTime, PythonModule) end; {restart, PyProcPid} -> case lists:member(PyProcPid, PyProcs) of true -> % remove the python process from the pool. PyProcs1 = proplists:delete(PyProcPid, PyProcs), % create a new python process. {ok, PyPid} = python:start(), PyProcPid1 = spawn_link( erlport_05_py, python_loop, [PyPid, PythonModule]), % add the new python process to the pool. PyProcs2 = [PyProcPid1 | PyProcs1], pool_loop(PyProcs2, ProcessWaitTime, PythonModule); false -> pool_loop(PyProcs, ProcessWaitTime, PythonModule) end; {stop, From} -> stop_python_processes(PyProcs), From ! ok, ok end. python_loop(PyPid, PythonModule) -> receive {call_python, From, {Function, Args}} -> Result = python:call(PyPid, PythonModule, Function, Args), From ! {ok, Result}, python_loop(PyPid, PythonModule); {stop, From} -> python:stop(PyPid), From ! ok end. start_python_processes(0, _, PyProcs) -> PyProcs; start_python_processes(N, PythonModule, PyProcs) -> {ok, PyPid} = python:start(), PyProcPid = spawn_link(?MODULE, python_loop, [PyPid, PythonModule]), %PyProcsPid = spawn(fun() -> % erlport_05_py:python_loop(PyPid, PythonModule) end), io:format("Started Erlang/Python process -- PyProcPid: ~p PyPid: ~p~n", [PyProcPid, PyPid]), start_python_processes(N - 1, PythonModule, [PyProcPid | PyProcs]). stop_python_processes([]) -> ok; stop_python_processes([PyProcPid | PyProcs]) -> io:format("Stopping Erlang/Python process -- PyProcPid: ~p~n", [PyProcPid]), PyProcPid ! {stop, self()}, stop_python_processes(PyProcs).
Notes:
And, here is the driver, an Erlang script that can be used to run the above code:
#!/usr/bin/env escript %% vim:ft=erlang: %%! -sname crow1 -setcookie dp01 main(["-h"]) -> usage(); main(["--help"]) -> usage(); main(Args) -> ArgsSpec = [ {"p", "processes", yes}, {"o", "outfile", yes} ], Args1 = erlopt:getopt(ArgsSpec, Args), Opts = proplists:get_all_values(opt, Args1), Args2 = proplists:get_all_values(arg, Args1), NumProcs1 = proplists:get_value("p", Opts), NumProcs2 = proplists:get_value("processes", Opts), NumProcs = case NumProcs1 of undefined -> case NumProcs2 of undefined -> 2; _ -> list_to_integer(NumProcs2) end; _ -> list_to_integer(NumProcs1) end, OutFile1 = proplists:get_value("o", Opts), OutFile2 = proplists:get_value("outfile", Opts), OutFile = case OutFile1 of undefined -> case OutFile2 of undefined -> standard_io; _ -> {ok, OutFile3} = file:open(OutFile2, [write]), OutFile3 end; _ -> {ok, OutFile3} = file:open(OutFile1, [write]), OutFile3 end, {NumReps1, M1, N1} = case Args2 of [] -> {2, 4, 3}; [NumReps] -> {list_to_integer(NumReps), 4, 3}; [NumReps, M, N] -> {list_to_integer(NumReps), list_to_integer(M), list_to_integer(N)} end, run(NumProcs, NumReps1, M1, N1, OutFile), case OutFile of standard_io -> ok; _ -> file:close(OutFile), ok end. run(NumProcs, Count, M, N, IoDevice) -> io:format("NumProcs: ~p Count: ~p M: ~p N: ~p~n", [NumProcs, Count, M, N]), RestarterPid = erlport_06:start(NumProcs, py_math_01, 100), run_n(1, Count, M, N, IoDevice), erlport_06:stop(RestarterPid), ok. run_n(Count, Max, _, _, _) when Count > Max -> ok; run_n(Count, Max, M, N, IoDevice) -> Result = erlport_06:rpc({call_python, run, [M, N]}), io:format(IoDevice, "Result ~p:~n~p~n", [Count, Result]), run_n(Count + 1, Max, M, N, IoDevice). usage() -> io:format(standard_error, "usage:~n", []), io:format(standard_error, " $ erlport_06.escript -h|--help -- show this help~n", []), io:format(standard_error, " $ erlport_06.escript [options] iters [m n]~n", []), io:format(standard_error, "options:~n", []), io:format(standard_error, " -p -- number of processes~n", []), io:format(standard_error, " -o filename -- output file name~n", []), io:format(standard_error, "arguments:~n", []), io:format(standard_error, " iters -- number of iterations to run~n", []), io:format(standard_error, " m n -- size of array to create~n", []), ok.
Notes:
Our purpose for adding the use of Erlang behaviors to the previous example it to gain resiliency. Erlang behaviors as like templates or frameworks. The behavior provides the structure, boilerplate, and common functionality; we provide (only) the code that is specific to our needs.
Our "pool server" creates and manages the pool of Erlang/Python processes. When a client needs a process in which to run a Python function, it can request a process (and wait for one to become available, if necessary), and return the process to the pool when it (the client) finishes with it. There are several features that we are trying to gain over and above those provided by the previous example: (1) If (when) a Python processes from the pool dies, it will be replaced; (2) if the pool itself (i.e. the processes that provides the pool services) dies, it will be restarted.
In a general sense, we will attempt to implement a server in Erlang that responds to requests (1) to start a number of Erlang/Python processes and the pool to hold them; (2) to get an Erlang/Python process from the pool; (3) return an Erlang/Python process to the pool; and (4) stop all Erlang/Python processes in the pool and stop the pool server itself.
Here is the code for our process server, implemented using the following:
This is the supervisor module that sets up the pool of worker processes:
%%%------------------------------------------------------------------- %%% @author Dave Kuhlman %%% @copyright (C) 2015, Dave Kuhlman %%% @doc %%% %%% @end %%% Created : 2015-04-15 14:40:41.165827 %%%------------------------------------------------------------------- -module(process_server_02). -behaviour(supervisor). %% API -export([start_link/1]). %% Supervisor callbacks -export([init/1]). -define(SERVER, ?MODULE). %%%=================================================================== %%% API functions %%%=================================================================== %%-------------------------------------------------------------------- %% @doc %% Starts the supervisor %% %% @spec start_link(Args) -> {ok, Pid} | ignore | {error, Error} %% %% %% @end %%-------------------------------------------------------------------- start_link({PyModule, NumProcesses}) -> supervisor:start_link({local, ?SERVER}, ?MODULE, {PyModule, NumProcesses}). %%%=================================================================== %%% Supervisor callbacks %%%=================================================================== %%-------------------------------------------------------------------- %% @private %% @doc %% Whenever a supervisor is started using supervisor:start_link/[2,3], %% this function is called by the new process to find out about %% restart strategy, maximum restart frequency and child %% specifications. %% %% @spec init(Args) -> {ok, {SupFlags, [ChildSpec]}} | %% ignore | %% {error, Reason} %% @end %%-------------------------------------------------------------------- init({PyModule, NumProcesses}) -> RestartStrategy = one_for_one, MaxRestarts = 1000, MaxSecondsBetweenRestarts = 3600, SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, PoolId = pool1, PoolArgs = [ {name, {local, pool1}}, {worker_module, process_server_worker_02}, {size, NumProcesses}, {max_overflow, NumProcesses * 2} ], WorkerArgs = PyModule, PoolSpec = poolboy:child_spec(PoolId, PoolArgs, WorkerArgs), PoolSpecs = [PoolSpec], {ok, {SupFlags, PoolSpecs}}. %%%=================================================================== %%% Internal functions %%%===================================================================
Notes:
And, here is the worker module. It actually does the call into the Python process:
%%%------------------------------------------------------------------- %%% @author Dave Kuhlman %%% @copyright (C) 2015, Dave Kuhlman %%% @doc %%% %%% @end %%% Created : 2015-04-13 13:27:42.569292 %%%------------------------------------------------------------------- -module(process_server_worker_02). -behaviour(gen_server). -behaviour(poolboy_worker). %% API -export([ start_link/1, call_python/2 ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ?MODULE). -record(state, {pymodule, pypid}). %%%=================================================================== %%% API %%%=================================================================== %%-------------------------------------------------------------------- %% @doc %% Starts the server %% %% @spec start_link() -> {ok, Pid} | ignore | {error, Error} %% @end %%-------------------------------------------------------------------- start_link(Args) -> gen_server:start_link(?MODULE, Args, []). %%-------------------------------------------------------------------- %% @doc %% Call a function in the Python module. %% %% @spec call_python(Args) -> {ok, Results} | ignore | {error, Error} %% @end call_python(Function, Args) -> gen_server:call({call_python, Function, Args}). %%-------------------------------------------------------------------- %%%=================================================================== %%% gen_server callbacks %%%=================================================================== %%-------------------------------------------------------------------- %% @private %% @doc %% Initializes the server %% %% @spec init(Args) -> {ok, State} | %% {ok, State, Timeout} | %% ignore | %% {stop, Reason} %% @end %%-------------------------------------------------------------------- init(PyModule) -> {ok, PyPid} = python:start(), State = #state{pymodule=PyModule, pypid=PyPid}, io:format("(worker:init) S: ~p~n", [State]), {ok, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Handling call messages %% %% @spec handle_call(Request, From, State) -> %% {reply, Reply, State} | %% {reply, Reply, State, Timeout} | %% {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, Reply, State} | %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- handle_call({call_python, Function, Args}, _From, State) -> #state{pymodule=PyModule, pypid=PyPid} = State, Result = python:call(PyPid, PyModule, Function, Args), Reply = {ok, Result}, {reply, Reply, State}; handle_call(Request, From, State) -> io:format("(handle_call) error. R: ~p F: ~p S: ~p~n", [Request, From, State]), {stop, error, bad_call, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Handling cast messages %% %% @spec handle_cast(Msg, State) -> {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- handle_cast(_Msg, State) -> {noreply, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% Handling all non call/cast messages %% %% @spec handle_info(Info, State) -> {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- handle_info(_Info, State) -> {noreply, State}. %%-------------------------------------------------------------------- %% @private %% @doc %% This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_server terminates %% with Reason. The return value is ignored. %% %% @spec terminate(Reason, State) -> void() %% @end %%-------------------------------------------------------------------- terminate(_Reason, State) -> #state{pypid=PyPid} = State, python:stop(PyPid), ok. %%-------------------------------------------------------------------- %% @private %% @doc %% Convert process state when code is changed %% %% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} %% @end %%-------------------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. %%%=================================================================== %%% Internal functions %%%===================================================================
Notes:
And, here is an example that shows the use the above modules:
1> {ok, Pid} = process_server_02:start_link({py_math_01, 2}). {ok,<0.35.0>} 2> W1 = poolboy:checkout(pool1). <0.40.0> 3> W2 = poolboy:checkout(pool1). <0.38.0> 4> gen_server:call(W1, {call_python, run, [4,3]}). {ok,{ok,[[0.8197041170446678,0.09025189282763758, 1.297117120418452,0.5408889885926561], [-0.4671695867031289,1.5157486946812908,-1.7388360091792565, 1.7359808600661375], [3.428398463532158,-1.3317568854693462,7.344062185451977, -1.9552152696080947], [-0.07497209018122969,1.2709604952843914, -0.4159480431957444,0.6385010743573563]]}} 5> gen_server:call(W2, {call_python, run, [5, 4]}). {ok,{ok,[[2.676101292294086,-2.0670354571555447, 7.14541356732066,6.716035238616977,2.20345132459271], [0.5899636802900904,1.8999305431749123,-1.6873788851117488, -0.2819546820511976,1.534930313444429], [1.955602939047537,1.075252226950586,-1.0791942668780166, 1.2763496937826153,2.751323139911827], [2.765771008967425,-0.17404670613065715,4.0681028323280355, 4.316723365262619,2.6611127296250077], [2.206410524329622,-1.290731926151737,3.777152935079321, 4.242544083797171,2.4039164883740134]]}} 6> poolboy:checkin(pool1, W1). ok 7> W3 = poolboy:checkout(pool1). <0.40.0> 8> gen_server:call(W3, {call_python, run, [5, 4]}). o o o
The following shows the commands only, without their output:
1> {ok, Pid} = process_server_02:start_link({py_math_01, 2}). 2> W1 = poolboy:checkout(pool1). 3> W2 = poolboy:checkout(pool1). 4> gen_server:call(W1, {call_python, run, [4,3]}). 5> gen_server:call(W2, {call_python, run, [5, 4]}). 6> poolboy:checkin(pool1, W1). 7> W3 = poolboy:checkout(pool1). 8> gen_server:call(W3, {call_python, run, [5, 4]}). o o o
You can also do parallel processing in JavaScript using Node.js. However, there are reservations -- Node.js is single threaded. It does parallelism by using callbacks. Therefore, you can get tasks to run in parallel, but they will not utilize multiple cores or multiple CPUs.
However, there is support for creating multiple Node.js processes that run on multiple cores. To learn about that support, go to https://www.npmjs.com/ and search for "multi core".
You can learn more about Node.js here: https://nodejs.org/
I use the Async module to control flow. One of many flow control patterns that Async supports in parallel execution. You can install Async with npm:
$ npm install async
And, you can find it here: https://www.npmjs.com/package/async
Here is a JavaScript script that can be run with Node.js:
#!/usr/bin/env node var fs = require('fs'); var async = require('async'); var log = console.log; function reader(name, cb) { content = fs.readFileSync(name); cb(null, content); } function test1() { async.map(['tmp1.txt', 'tmp2.txt'], reader, function (err, results) { log('map -----------------'); log('map results:'); for (var idx = 0; idx < results.length; idx++) { log('length: ' + results[idx].length); } log('count: ' + results.length); }); async.filter(['tmp1.txt', 'tmp3.txt', 'tmp2.txt'], fs.exists, function(results) { log('filter -----------------'); log('filter results: %s', results); }); async.parallel([ function (cb) { fs.readFile('tmp1.txt', {encoding: 'utf-8'}, function (err, content) { content = content.toUpperCase(); content = 'content (tmp1.txt): [[' + content + ']]'; cb(null, content); }); }, function (cb) { fs.readFile('tmp2.txt', {encoding: 'utf-8'}, function (err, content) { content = content.toUpperCase(); content = 'content (tmp2.txt): [[' + content + ']]'; cb(null, content); }); }, ], function (err, results) { log('parallel -----------------'); log('parallel results: [[%s]]', results); }); } function main() { test1(); } main();
Notes:
Here are a few comments that might be of help in making a decision on which of the above technologies to choose for the work that you are planning:
The Python multiprocessing module (in the Python standard library) provides a base so that you can build the parallel processing model that you want. You will, however, have to build much of that yourself. The module enables you to write Python code that starts additional "child" process, as opposed to having to start those additional child processes is some more manual way, although I suppose you could always write scripts in whatever language that suits your needs which start up additional processes is some more automatic way. This module provides the Pipe and Queue classes for communication between "child" processes and between child processes and the process that starts them, but don't forget, you could, instead, use ZeroMQ to implement that inter-process communication. The Python multiprocessing module also enables you to share memory between processes, in a way that is not safe (see Sharing state between processes.
Parallel Python seems to work well. It provides lots of power. It's relatively quick to learn. Seems simple enough to use. It gives us multiprocessing/parallelism and distributed processing. So, we can make use of multiple cores and CPUs on the same machine; and can also run tasks across multiple machines (nodes on a network). There does not seem to be much active work on the code base itself. However, that might be because it works and there is no reason to change it.
IPython parallel processing has enough capabilities that you can spend lots of time learning how to use it if you choose to do so. However, IPython parallel processing makes it easy and even simple to do many of the common parallel tasks that many of us will want to do. Importantly, you can run the same task in parallel across different processes and you can distribute different tasks across processes. You can also share values between processes in synchronous and asynchronous ways that, I believe, but am not sure, are safe (e.g., from race conditions).
The capabilities of IPython are so many and varied that it is hard to wrap your mind around them all. Read the IPython docs for an overview, and then look for the capabilities that you need. And, of course, if you need to, you could use ZeroMQ to communicate between processes. ZeroMQ is, in fact, used in the implementation of IPython itself. For certain purposes it perhaps make sense to think of IPython parallel processing as building on ZeroMQ, taking away some of ZeroMQ's flexibility, and enabling us to do some of the things that could be done with ZeroMQ (given enough effort and smarts) but doing them in ways that are much easier and simpler.
The work on the IPython code base seems quite active, so active that the project has gotten large enough and powerful enough that it is being split into separate projects. There will be support for scripting in Ruby and Perl and other languages as well. Both Python 2 and Python 3 are supported.
ZeroMQ provides a good deal of power for communicating (sending messages) between processes. Perhaps you can think of it as providing the low level plumbing that enable you to write the logic for inter-process communication and data transfer. You will need to take care of starting up the processes. And, you will need to bring them down, although it's easy to imagine using ZeroMQ to send a message to a process asking it to exit.
One huge benefit of ZeroMQ is that there are language bindings for a variety of programming languages, so you can create your mega-application out of processes each of which is implemented in a different language, for example, Python, Ruby, JavaScript/Node.js, and more. Our example above has a "main" or client process written in Node.js requesting services from a worker process implemented in Python. That enabled us to, effectively but not terribly conveniently, request the capabilities of Lxml from Node.js, even though Lxml is available for Python but not Node.js. We can imagine something analogous for the use of Numpy and SciPy, which are available for Python.
Erlang, Erlport process pools, etc.-- Although there is plenty to learn in order to use IPython, Erlang (for non-Erlang programmers) means learning using a new and different language and, if you intend to take advantage of some of Erlang's benefits, it entails learning a good deal of Erlang infrastructure, too. That has several implications: (1) Erlang must be installed on each machine where it will be used. (2) You (or someone) will need to learn Erlang and will have to write the code that controls the Python processes that do the parallel and distributed work. Erlport helps with that, but the burden of learning Erlang is still there. My sample code (above) may help get you started, but for any serious use, you will want an experienced Erlang programmer on your team.
Elixir runs on top of Erlang, and, for some of us at least, will seem like a friendlier style of language. It still, for some of us, means learning a new language and installing both Erlang and Elixir on all your target machines. I have not tried using Erlport with Elixir.