Posts Tagged ‘continuation passing styel’

21 Jun 11

Exit… Screen Right

This article is a continuation in a series about asynchrony related to DReAM Coroutine's and C#5.0 async/await constructs in preparation for my talk on the same subject at monospace in July in Boston. In the last post "What is asynchrony good for", I introduced Result.WhenDone and Task.ContinueWith, two constructs that facilitate continuation passing style programming in C#. This time I want to tackle a workflow that is a little more complicated. It includes some parallelization of work and a number of I/O operations. Each of the below steps is an async barrier that would traditionally result in a blocking call of indeterminate duration:

  1. A request to fetch unread aggregated articles comes in
  2. The list of feeds the user is subscribed to are fetched
  3. All feeds are fetched in parallel
  4. All feeds are filtered via the DB to remove the already seen articles
  5. Combine all unseen articles, limit the max article to return and mark them as seen in the DB
  6. The limited, unseen entries are returned as the response

Each of the steps in the above workflow is separated from the previous by a potentially asynchronous operation. If we think of each step as a callback for the completion of the previous the continuation arguments fall out naturally.

The Players in our Async Workflow Production

For clarity and brevity, I'm going to treat each step as an existing async method with its body left as an exercise. Below I've defined the asynchronous method signatures we will need:

1. GetUserRecentArticles

This method implements the asynchronous workflow.

// Result
Result GetUserRecentArticles(int userId, int limit, Result response)
// Task
Task GetUserRecentArticles(int userId, int limit)

DreamMessage is DReAM's symmetric Http Request and Response data container, and serves as the value of the synchronization handle seen by the outside caller. The Http Server would call GetUserRecentArticles to start the asynchronous chain and use the synchronization handle to retrieve the final result.

2. GetFeedsForUserFromDb

This method uses asynchronous database queries to get the list of Uri's the user is subscribed to:

// Result
Result> GetFeedsForUserFromDb(int userId, Result> result)
// Task
Task> GetFeedsForUserFromDb(int userId)

3. FetchFeed

This method uses an asynchronous web request to fetch the feed for a given Uri. We will be multiplexing this call to fetch multiple feeds at once.

// Result
Result FetchFeed(XUri uri, Result result)
//Task
Task FetchFeed(XUri uri)

4. FilterViaDb

This method is called once for each feed that we've fetched to turn into a list of only the articles not yet seen by the user

// Result
Result> FilterViaDb(XDoc doc, Result> result)
// Task
Task> FilterViaDb(XDoc doc)

5. MarkAsRead

Step 5 combines the articles, applies the limit and marks the articles to be returned as read. Only the last part is an asynchronous operation, the rest can be performed inline. The method MarkAsRead makes asynchronous calls to the database to insert the canonical Uri's of the articles to be returned to mark them as read.

//Result
Result MarkAsRead(int userId, IEnumerable aggregatedArticles, Result result)
// Task
Task MarkAsRead(int userId, IEnumerable aggregatedArticles)

Act I: Result oriented

Below is the implementation of GetUserRecentArticles, which creates the asynchronous workflow calling the remaining methods in turn as continuations. This method will immediately return providing the Result instance to block on or attach a continuation:

public Result GetUserRecentArticles(int userId, int limit, Result response) {

    // Start to fetch the users feed uri's from the DB and set up continuation
    GetFeedsForUserFromDb(userId, new Result>()).WhenDone(r1 => {
        var uris = r1.Value;

        // start fetch all feeds
        var feeds = uris.Select(uri => FetchFeed(uri, new Result())).ToList();

        // set up continuation for when all feeds are fetched
        feeds.Join(new Result()).WhenDone(r2 => {

            // start to filter all feeds to only unread
            var allUnreadArticles = feeds
                .Select(feedResult => FilterViaDb(feedResult.Value, new Result>()))
                .ToList();

            // set up continuation for when feeds are filtered
            allUnreadArticles.Join(new Result()).WhenDone(r3 => {

                // combine all filtered articles
                var combinedArticles = (from unreadResult in allUnreadArticles
                                        let unreadArticles = unreadResult.Value
                                        from article in unreadArticles
                                        select article).Take(limit).ToList();

                // start to mark all remaining articles as read and set up continuation
                MarkAsRead(userId, combinedArticles, new Result()).WhenDone(r4 => {
                    var aggregatedDoc = new XDoc("articles").AddAll(combinedArticles);

                    // signal response with aggregated article document
                    response.Return(DreamMessage.Ok(aggregatedDoc));
                });
            });
        });
    });

    // immediately return the response Result object to the Caller
    return response;
}

As promised in the last article, the parallel fan-out for FetchFeed and FilterViaDb show the benefit of Result.Join(). Since it returns another Result, we can easily attach the next continuation for when all parallel operations complete to it, rather than requiring blocking behavior to join our workers back together.

Also note, that for simplicity, I set up the parallel loops for fetching and filtering the feeds as two separate blocks. This could easily be refined to only fan out once by attaching the FilterViaDb continuation to each FetchFeed call and joining back after each chain is finished, instead of after each fan-out.

Act II: A Task completed

The Task version of GetUserRecentArticles is very similar to the Result version and once again returns immediately with the Task to block on or attach a continuation to.

public Task GetUserRecentArticles(int userId, int limit) {
    var response = new TaskCompletionSource();

    // Start to fetch the users feed uri's from the DB and set up continuation
    GetFeedsForUserFromDb(userId).ContinueWith(t1 => {
        var uris = t1.Result;

        // start fetch all feeds
        var feedTasks = uris.Select(FetchFeed).ToList();

        // set up continuation for when all feeds are fetched
        TaskEx.WhenAll(feedTasks).ContinueWith(t2 => {
            var feeds = t2.Result;

            // start to filter all feeds to only unread
            var unreadArticleTasks = feeds.Select(FilterViaDb).ToList();

            // set up continuation for when feeds are filtered
            TaskEx.WhenAll(unreadArticleTasks).ContinueWith(t3 => {
                var allUnreadArticles = t3.Result;

                // combine all filtered articles
                var combinedArticles = (from unreadArticles in allUnreadArticles
                                        from article in unreadArticles
                                        select article).Take(limit).ToList();
                MarkAsRead(userId, combinedArticles).ContinueWith(t4 => {
                    var aggregatedDoc = new XDoc("articles").AddAll(combinedArticles);

                    // signal response with aggregated article document
                    response.SetResult(DreamMessage.Ok(aggregatedDoc));
                });
            });
        });
    });

    // immediately return the response Task object to the caller
    return response.Task;
}

The Task version also uses the simpler double-fan-out for the parallel operations the Result version did, but gains some additional compactness because of Task continuations being chainable. In Result the chaining pattern is meant as a fluent interface, i.e. the same result is returned by each operation that attaches behavior to a Result. Task on the other hand does not provide a fluent interface, requiring each method to be called on the original instance in turn; This allows its members to return other values. Task.ContinueWith in particular takes and Action or Func as its continuation and returns a new Task instance that captures the continuation's execution and can in turn take a continuation.

Another difference is that Result is meant to be used as a signal, while Task by default wraps an operation. This means  Result.Value can be set manually, while Task.Result cannot. In order for GetUserRecentArticles to return a Task than can be signaled on completion (and not schedule a new Thread to do it), we use TaskCompletionSource. This class has a Task property that we return allowing us signal completion in final continuation to return the list of filtered, aggregated articles.

Epilogue: It's Turtles All The Way Down

One drawback to asynchrony is that once you go async, you gotta go async all the way up and down the call chain. If we were to use GetUserRecentArticles inside an action method on an MVC controller, we'd have to block to get the result, which defeats the purpose of the whole exercise.

Fortunately, this can usually be accommodated, even if it means stepping outside the traditional frameworks. In most scenarios, we are performing the work in order to return a result to our caller, and commonly, this caller is either waiting at a GUI or calling across a network boundary. The former already has concepts of marshaling events occurring off the UI thread back to the UI, which allows our async workflow to return report back without blocking, while the latter involves I/O and the incoming API is likely to have a Begin*/End* variant for starting and ending the I/O request.

Another drawback to this, and all continuation passing styles, is the tendency for the code to wander off the right hand side of the screen, as each continuation introduces another level of nesting. While this may just appear to be a visual artifact that can be alleviated by formatting, it really is a warning of much more complicated issues that affect debugging and maintaince of this.

In order to do continuation passing right, there should be no further work in the context that attches the continuation, but other than diligence there's nothing to stop us from putting code after one of those }); terminators. Any code executing after the continuation has potentially started, is now in danger of the local variables being accessed concurrently, which introduces the same probems that make multi-threaded code so hard to debug.

And the problem goes beyond that: While the workflow still looks inherently linear, traditional block scope constructs are not available since we only enter, never exit chained scopes. That means using statements as well as try/catch/finally do not work. While each continuation can inspect the result of its antecedent, we now have to manually check for exceptions in the async methods and catch exceptions in our own continuation scope to trap, handle or forward errors correctly.

Next time, I'll introduce constructs that let us write the workflow in linear fashion like a regular, blocking workflow, but still take advantage of the asynchronous behavior like the above.

Copyright © 2011 MindTouch, Inc. Powered by