Friday, June 20, 2008

Async Batching with Twisted: A Walkthrough

While drafting a Divmod announcement last week, I had a quick chat with a dot-bomb-era colleague of mine. Turns out, his team wants to do some cool asynchronous batching jobs, so he's taking a look at Twisted. Because he's a good guy and I like Twisted, I drew up some examples for him that should get him jump-started. Each example covered something in more depth that it's predecessor, so is probably generally useful. Thus this blog post :-)

I didn't get a chance to show him a DeferredSemaphore example nor one for the Cooperator, so I will take this opportunity to do so. For each of the examples below, you can save the code as a text file and call it with "python filname.py", and the output will be displayed.

These examples don't attempt to give any sort of introduction to the complexities of asynchronous programming nor the problem domain of highly concurrent applications. Deferreds are covered in more depth here and here. However, hopefully this mini-howto will inspire curiosity about those :-)


Example 1: Just a DefferedList

This is one of the simplest examples you'll ever see for a deferred list in action. Get two deferreds (the getPage function returns a deferred) and use them to created a deferred list. Add callbacks to the list, garnish with a lemon.


Example 2: Simple Result Manipulation

We make things a little more interesting in this example by doing some processing on the results. For this to make sense, just remember that a callback gets passed the result when the deferred action completes. If we look up the API documentation for DeferredList, we see that it returns a list of (success, result) tuples, where success is a Boolean and result is the result of a deferred that was put in the list (remember, we've got two layers of deferreds here!).


Example 3: Page Callbacks Too

Here, we mix things up a little bit. Instead of doing processing on all the results at once (in the deferred list callback), we're processing them when the page callbacks fire. Our processing here is just a simple example of getting the length of the getPage deferred result: the HTML content of the page at the given URL.


Example 4: Results with More Structure

A follow-up to the last example, here we put the data in which we are interested into a dictionary. We don't end up pulling any of the data out of the dictionary; we just stringify it and print it to stdout.


Example 5: Passing Values to Callbacks

After all this playing, we start asking ourselves more serious questions, like: "I want to decide which values show up in my callbacks" or "Some information that is available here, isn't available there. How do I get it there?" This is how :-) Just pass the parameters you want to your callback. They'll be tacked on after the result (as you can see from the function signatures).

In this example, we needed to create our own deferred-returning function, one that wraps the getPage function so that we can also pass the URL on to the callback.


Example 6: Adding Some Error Checking

As we get closer to building real applications, we start getting concerned about things like catching/anticipating errors. We haven't added any errbacks to the deferred list, but we have added one to our page callback. We've added more URLs and put them in a list to ease the pains of duplicate code. As you can see, two of the URLs should return errors: one a 404, and the other should be a domain not resolving (we'll see this as a timeout).


Example 7: Batching with DeferredSemaphore

These last two examples are for more advanced use cases. As soon as the reactor starts, deferreds that are ready, start "firing" -- their "jobs" start running. What if we've got 500 deferreds in a list? Well, they all start processing. As you can imagine, this is an easy way to run an accidental DoS against a friendly service. Not cool.

For situations like this, what we want is a way to run only so many deferreds at a time. This is a great use for the deferred semaphore. When I repeated runs of the example above, the content lengths of the four pages returned after about 2.5 seconds. With the example rewritten to use just the deferred list (no deferred semaphore), the content lengths were returned after about 1.2 seconds. The extra time is due to the fact that I (for the sake of the example) forced only one deferred to run at a time, obviously not what you're going to want to do for a highly concurrent task ;-)

Note that without changing the code and only setting maxRun to 4, the timings for getting the the content lengths is about the same, averaging for me 1.3 seconds (there's a little more overhead involved when using the deferred semaphore).

One last subtle note (in anticipation of the next example): the for loop creates all the deferreds at once; the deferred semaphore simply limits how many get run at a time.


Example 8: Throttling with Cooperator

This is the last example for this post, and it's is probably the most arcane :-) This example is taken from JP's blog post from a couple years ago. Our observation in the previous example about the way that the deferreds were created in the for loop and how they were run is now our counter example. What if we want to limit when the deferreds are created? What if we're using deferred semaphore to create 1000 deferreds (but only running them 50 at a time), but running out of file descriptors? Cooperator to the rescue.

This one is going to require a little more explanation :-) Let's see if we can move through the justifications for the strangeness clearly:
  1. We need the deferreds to be yielded so that the callback is not created until it's actually needed (as opposed to the situation in the deferred semaphore example where all the deferreds were created at once).
  2. We need to call doWork before the for loop so that the generator is created outside the loop. thus making our way through the URLs (calling it inside the loop would give us all four URLs every iteration).
  3. We removed the result-processing callback on the deferred list because coop.coiterate swallows our results; if we need to process, we have to do it with pageCallback.
  4. We still use a deferred list as the means to determine when all the batches have finished.
This example could have been written much more concisely: the doWork function could have been left in test as a generator expression and test's for loop could have been a list comprehension. However, the point is to show very clearly what is going on.

I hope these examples were informative and provide some practical insight on working with deferreds in your Twisted projects :-)

15 comments:

lucas said...

Great article! Thanks for this I was looking how to implement a semaphore.

Duncan McGreggor said...

lagenar,

Glad you liked it and are going to be able to use it!

Unknown said...

Duncan, how did you make your code look nice on blogger? I just created my blog, and I don't see a "mark this as code" button...did you go down to raw html?

Unknown said...

I understood it all up until example 8. And I almost understand 8. That is, I understand the problem, and I understand the description of the solution, I just don't understand how that code actually implements that solution...

I'm getting closer to twisted mastery...

Duncan McGreggor said...

Hey Nathan,

The pre formatting on blogger is horrible :-) Glyph recently wrote a blog post about it here. Basically, every time you switch between "compose" and "html" edit modes, blogger efs with the whitespace. This means that once you get it the way you want it, don't switch between edit modes. Also, if you edit it, again in the future, be sure that you do so in "html" mode.

As for Example 8, it's a bit tricky. I'll give another shot at describing what it's doing; feel free to ask more specific questions if this still isn't clear.

The thing to keep in mind is that you're working with a generator. This means that each iteration that is defined in doWork doesn't actually happen until the moment it is iterated. Calling doWork returns the generator; iterating it once creates deferred. Knowing this, we can actually look at the iteration in the test function.

At first glance, it looks like we're only iterating twice, since it's over xrange(2). However, the maxRun variable is used to determine how many iterations of the doWork generator to process at once, not in total. The work iterator is processed in maxRun chunks at a time until there are no more items. The code that makes this possible is fairly difficult to read (it is, after all, a fairly specialized pattern), so don't feel bad :-) The code is viewable in twisted/internet/task.py of a Twisted svn checkout. Be aware that the constructor takes three parameters, for all of which we are accepting the defaults. The two key ones are a callable that gets invoked at the beginning of each step, and a callable that schedules each step of the Cooperator.

Hope that helps!

Unknown said...

My python-fu grows, thanks in part to your explanation!

1) I didn't understand generators. I've done some reading and (more importantly) some testing, and now I understand generators. (Hooray!)

2) I had no idea what a Cooperator was or what Cooperator.coiterate() did. After reading your explanation(s) and looking at the source, I think I understand: Cooperators manage running through some iterable asynchronously (like an asynchronous for loop), right? And then .coiterate() gives the Cooperator an iterator to start (asynchronously with regards to main execution) iterating through. What you've done, is simply give the cooperator the _same_ iterable twice. Backing up, each time coiterate() was called, it returned a deferred that would fire when the cooperator reached the end of the iterable. You put those two deferreds into a deferredList that fires when all the processing on the iterator has finished.

Am I getting it so far? If so, there's two remaining thing I don't quite understand:

1) Does the cooperator internally process each of the iterators synchronously that was passed into coiterate()? It seems that it would need to, otherwise lots of deferreds could get created, which is the whole problem you were avoiding.

2) What does the cooperator do with the deferreds created by the generator?

Unknown said...

Duncan...

Thank you so much for this article, it was incredibly helpful to me.

I was wondering if you could add an example showing how to get the response headers from the server along with the page.

I've been wracking my brain on this for the longest time and just can't figure it out.

Thanks

Klaus said...

Thank you, these examples were very helpful!

I ended up using a simplified version with a single coiterated iterator, which just runs in a loop doing chunks of work (including scheduling things to run in the background). There are probably easier ways to do that, but I think this had the advantage that it's clearer to see where the scheduling decisions are happening.

from twisted.internet import reactor
from twisted.web.client import getPage
from twisted.internet import defer, task

maxRun = 2

urls = [
  'http://oubiwann.blogspot.com',
  'http://cnn.com',
  'http://yahoo.com',
  'http://www.google.com',
  ]

tasks_running = 0

def pageCallback(result):
  global tasks_running
  tasks_running -= 1
  print len(result)
  return result

def doWork():
  global tasks_running
  while True:
    print "doWork: tasks_running=%d urls=%s" % (tasks_running, urls)
    if urls and tasks_running < maxRun:
      # Just download the next page in the background, the Cooperator
      # doesn't need to know about this.
      getPage(urls.pop()).addCallback(pageCallback)
      tasks_running += 1
      yield True # Run again immediately
    elif not urls and not tasks_running:
      # We're done, quit iterating.
      break
    else:
      # Wait a bit, giving background downloads a chance to finish.
      print "doWork: waiting, tasks_running=%d" % tasks_running
      d = defer.Deferred()
      reactor.callLater(0.1, d.callback, None)
      yield d
  print "doWork: done."

def finish(ign):
  reactor.stop()

def test():
  coop = task.Cooperator()
  deferreds = [coop.coiterate(doWork())]
  defer.DeferredList(deferreds).addCallback(finish)

test()
reactor.run()

Unknown said...

This post was immensely useful to me.

Thanks!

erik said...

Passing extra parameters to callbacks is better done with closurers, so instead of:

d.addCallback(pageCallback, url)

one might instead go with:

def getPageData(url):
return getPage(url).addCallback(lambda result: {
'url': url,
'length': len(result),
'content': result,
})

Also, addCallback returns the Deferred instance, so there is no need to use a temporary variable (with either callback approach).

erik said...

The following code causes the program to run with only 1 parallel request.

deferreds = [coop.coiterate(doWork())]

What you probably meant was:

worker = doWork()
deferreds = [coop.coiterate(worker) for _ in range(maxRun)]

Tom Prince said...

Why is a closure better than passing extra arguments? In any case, one should be aware of the fact that they have different semantics. Using a closure bind the *name* where as passing additional arguments to addCallback pass the *value* (which might be mutable). If calling addCallback in a loop, you almost certainly want the later.

Anonymous said...

if we use getPage(url,timeout = 30) it means every url does not take more than 30 sec to run.
I think timeout error not occur for most of urls.
but i got this error for same url which previsouly which 3 sec
however very much thanks! for this post

Unknown said...

This is exactly what I was looking for. Much appreciated. Thank you.

Anonymous said...

@Tammo, you're very welcome :-) I'm glad that, after these several years, it's still useful for folks!