Archive for January, 2012
Rolling your own coroutines
This post marks the conclusion of the async programming series I originally started as lead in for my monospace 2011 talk. The remaining subject, as promised in my last article, is the creation of tradtional coroutines using .NET iterators. The example code and Monospace slides talking about it can be found on github. I had hoped to time it with a definite date for async/await arriving in .NET but WinRT has kind of derailed future releases, it would seem.
But before I get into, let's roll up the posts that got us here:
- Tasked To Get Results – Introducing our asynchronous completion handles, DReAM's Result and the Task Parallel Library's Task
- What's Asynchrony Good For? — When should we use these constructs
- Exit … Screen Right — CPS (Continuation Passing Style) asynchronous chaining using Result and Task
- Asynchrony and Sequential Workflows — Using DReAM coroutines and TPL's async/await to bring back sequential flow in asynchronous operations
The canonical example of coroutines are producer/consumer pipelines, in which each stage of the pipeline does some amount of work and yields execution to the next stage once it cannot continue. The benefits of this type of processing pipeline is avoidance of locks and maximum utilization of the processing power given to it, in addition to the loose coupling of the pipeline that easily let you add or remove steps in the chain.
For this processing to work the coroutines must all have the same "shape", e.g. signature. For the Iterator based coroutines I will cover first, I chose the following shape:
The coroutine accepts the Coordinator and yields once it cannot continue with any more work. The Coordinator contains the shared state T that all coroutines have access to. Since only one coroutine is ever executed at a time, the state can be mutated without locks.
The example we'll use is a producer of a 3x4 matrix and a consumer that outputs that matrix transpoed to 6x2. To show the ability to add random stages to this chain, we will have another coroutine that will take every value and square it.
We create a Coordinator<int[]>, i.e. our state is a block of ints, requiring a coroutine signature of IEnumerator Coroutine<T>(Coordinator<T> coordinator), but since the Coordinator accepts the coroutines as an array of Funcs, we can curry signatures of different shapes into the required shape. In our example we capture a source and destination for the Producer and Consumer coroutines, respectively, while the Exponetiator already has the required shape.
Let's look at the Producer:
The Producer takes the input matrix and writes it one row at a time into the state object. After each row, the producer yields execution to the next coroutine. We yield null, since the iterator is just used for its side-effect of letting us suspend a method mid-execution and continue later. And that's the magic of it. Yes, it looks like a regular method, but it really does exit and re-enter multiple times. Every time yield is called, the state of the method is suspended while another coroutine gets to run, and once resumed, the method continues on with all its local state from the point right after the yield. We are stopping in the middle of a loop, letting someone else go and then continue on from that exact same place in the loop, all without blocking a thread.
The Consumer is provided the output matrix and starts a loop that will continue until its output matrix is filled, reading the block of integers from the state, writing it to the new matrix and yielding its execution to the coordinator, so that the a fresh set of integers can be provided.
The last coroutine is the Exponentiator which just continues to square every value in the state integer block each time it is resumed and then yields execution until provided a fresh set of integers to square.
Finally, Coordinator<T> allows the construction of these arbitrary co-operative processing chains. It is simply an inversion of a regular iterator. Instead of receiving a new value, one at a time from an enumerator, we have one enumerator per coroutine that we can command to run to its next yield point with MoveNext(). MoveNext() will return false if it was a yield break, signaling that the coroutine is done or true if it was a yield return, signaling that the coroutine is willing to continue in which case we put the enumerator into our queue, pick the next coroutine from the queue and compel it resume.
In the github project I also implemented the same coroutines using async/await. The one benefit that implementation has over Iterator based coroutines is that once inside an async method, you can await any other async method. Allowing these types of coroutines to suspend the execution chain to let some other async task (such as a web request) execute.
While interesting and useful in a limited set of circumstances the coroutines introduced in this article are not likely going to find themselves into your next project. Far more useful and applicable in any application liable to block on I/O are the asynchronous workflows introduced in Asynchrony and Sequential Workflows since these types of coroutines allow us to use traditional sequential coding styles while never blocking on asynchronous operations.