Monday, October 29, 2012

Async in Clojure: Playing with Agents, Part I

Clojure has a very interesting async primitive: the agent. There is some good documentation on agents, but for those that come from a background such as mine (Python at Twisted), I thought it might be nice to present one way of using agents to mimic the familiar async + callback reactive-style programming.

Do note, however, that Clojure agents run in one of two threadpools (one intended for CPU-intensive tasks, and the other for I/O-intensive tasks). As such, this is quite different than the event-loop approach that Twisted uses (or async frameworks that utilize libraries such as libevent or libev). Twisted has the deferToThread functionality, which is ... well, not exactly close, really. Regardless, let's get started.

In the following examples, we're going to pretend we have huge files we'll be reading off a local disk.

What to Call
Clojure's agent function is very, very simple: you pass it a value (it's initial state) and some options, if needed. That's it.


To update its state, you use either the send or send-off functions. If you've got CPU-bound tasks whose state you want to manage with agents, then you should use the send function. If your tasks will be I/O-bound, then you should use the send-off function for updating agent state. (The threadpool dedicated for use by send has a fixed size, based on the number of processors on your system. The threadpool for send-off is exapandable with thread caching and keep-alives.) Since our examples are focused on disk I/O, we'll be using send-off. (they have the same signature, though, so the following usage information applies to both).

When you send-off something to an agent, you pass if a few things:
  • an agent
  • the action or update function
  • any number of additional parameters you want the action function to consume
Here's what that looks like:


What to Write
So, we know what an agent looks like when bound and we know how we're going to send an update to the agent, but how might we construct the update itself? Perhaps like this:


As you can see, the first value that an action function takes is the "old" value of the agent -- the value that the agent has prior to the action that will take place. Once this function returns, the agent's value will be set to the return value of the action function. (What's more, if we needed to access the agent itself inside the action function for any reason, we could do so using the *agent* variable -- accessible within the scope of the action function).

Before we go on, let's take a look at this in action from the REPL:


The first thing we do is switch from the default namespace to one dedicated to our examples (this makes managing scope in the REPL much cleaner). Then we load a file that has the agent and action function defined. Then we tell it to run our fake "big read" function, asking it to run for about 10 seconds. As you can see, send-off returns the agent immediately. We then get the current value of the agent by dereferencing it. Finally our big read finishes, and we see it print how long it took. We then look at the agent directly, and then dereference it again -- both showing us what we'd expect: that the value of the agent has been updated to the return value of our big read function. Finally, we shutdown the agent threads and exit the REPL.

(The start-clojure script is wrapped with rlwrap so that I have access to a command line history, persistent over different sessions. The script boils down to this: rlwrap java -cp /usr/local/clojure-1.4.0/clojure-1.4.0.jar clojure.main.)

We've seen the agent in action now, but there's a bit more we can do. We'll take a look at that in the next post.

No comments: