Wednesday, April 18, 2012

Clojure-Py and Distributed Concurrency (Part 2)

In the last part of this series on Clojure-Py and distributed concurrency we discussed how tasklets are created. In this post, we will discuss the scheduler that manages message passing between tasklets. In the following articles we will use process and tasklet interchangeably. When "process" is mentioned, we don't mean  a os process, but instead, a share-nothing python tasklet, based on a generator.

Using the python yield bytecode we are able to get data both into and out of tasklets. Whenever a tasklet yields it will return a python tuple in the following format:

(state-kw & args)

There are several states that each tasklet can take:

::spawn - creates a new tasklet
::send - sends a message to a  tasklet (via a pid)
::recv - receives a message from a tasklet
::become - keeps the same pid, but swaps out the function being executed by the process
::next-msg - the message just sent via ::recv was unhandled, re-queue it and try the next message
::die - kills the process
::yield - simply reschedules this tasklet for a new time-slice, useful for inside loops that would otherwise block

The scheduler then is simply a loop that iterates over all the tasklets. If a ::recv command was given last, then the scheduler looks for a message to hand to the tasklet. If the tasklet returns ::next-msg, then the scheduler instantly passes in the next message, then the next, then the next, until a different tasklet state is returned. This allows tasklets to search the message queue. If no message can be found then the tasklet is put into a sleep state until another process sends it a message to awaken the tasklet.

The ::become state is useful for creating what the Erlang people call "universal servers". For example:

(defn universal-server []
  (recv [fn args]
             (become fn args)))

A universal server is a server that simply listens for the first message it receives, retrieves a fn and args from that message, then transforms itself into a server that runs that fn. The astute reader will recognize this as a form of tail-call-optimization. If Clojure-Py had TCO, we could simply call (fn args) instead of (become fn args), but since Clojure-Py (like Clojure on the JVM) is limited by the VM it runs on, we must implement this using yield bytecodes.

From here, the other tasklet states should be rather clear. The scheduler is really nothing more than a loop that manages the mini state machines we know as tasklets. Putting it all together:

(defn crazy-server []
   (receive [:ping] (! spid :pong)     ; spid is implicitly supplied by receive, yields (::send spid :pong)
                [:clone] (! spid (spawn crazy-server)) ; yields (::spawn crazy-server)
                [:become fn args] (become fn args)     ; yields (::become fn args)
                [:die] (die)))                                       ; yields (::die)

Normally there will be one scheduler for each running instance of the Python VM. However, if in the future, PyPy gets full STM support, multiple VMs could be started.

In the next part, we will discuss fault tolerance, network communication, and OS process management.


No comments:

Post a Comment