Composing Dependent Futures

This blog post is adapted from a lightning talk I gave at NetflixOSS, Season 2, Episode 2.

I've noticed that when the word "reactive" is mentioned, it tends not to be associated with any code. One of the things that "reactive" means is "non-blocking" code. "Non blocking" means the idea that you can make a call, and then go on and do something else in your program until you get a notification that something happened.

There are a number of frameworks which handle the notification – the idea that a response may not happen immediately – in different ways. Scala has the option of using a couple of different non-blocking mechanisms, and I'm going to go over how they're used and some interestin wrinkles when they are composed together.

Futures

Scala uses scala.concurrent.Future as the basic unit of non-blocking access.

The best way I've found to think of a Future is a box that will, at some point, contain the thing that you want. The key thing with a Future is that you never open the box. Trying to force open the box will lead you to blocking and grief. Instead, you put the Future in another, larger box, typically using the map method.

Here's an example of a Future that contains a String. When the Future completes, then Console.println is called:

object Main {
  import scala.concurrent.Future
  import scala.concurrent.ExecutionContext.Implicits.global
  def main(args:Array[String]) : Unit = {
    val stringFuture: Future[String] = Future.successful("hello world!")
    stringFuture.map { someString =>
      // if you use .foreach you avoid creating an extra Future, but we are proving
      // the concept here...
      Console.println(someString)
    }
  }
}

Note that in this case, we're calling the main method and then… finishing. The string's Future, provided by the global ExecutionContext, does the work of calling Console.println. This is great, because when we give up control over when someString is going to be there and when Console.println is going to be called, we let the system manage itself. In constrast, look what happens when we try to force the box open:

val stringFuture: Future[String] = Future.successful("hello world!")
val someString = Future.await(future)

In this case, we have to wait – keep a thread twiddling its thumbs – until we get someString back. We've opened the box, but we've had to commandeer the system's resources to get at it.

Event Based Systems with Akka

When we talk about reactive systems in Scala, we're talking about event driven systems, which typically means Akka. When we want to get a result out of Akka, there are two ways we can do it. We can tell – fire off a message to an actor:

fooActor ! GetFoo(1)

and then rely on fooActor to send us back a message:

def receive = {
  case GetFoo(id) =>
    sender() ! Foo(id)
}

This has the advantage of being very simple and straightforward.

You also have the option of using ask, which will generate a Future:

val fooFuture: Future[Foo] = fooActor ? GetFoo(1)

When the actor's receive method sends back Foo(id) then the Future will complete. If you want to go the other way, from a Future to an Actor, then you can use pipeTo:

Future.successful {
  Foo(1)
} pipeTo actorRef

tell is usually better than ask, but there are nuances to Akka message processing. I recommend Three flavours of request-response pattern in Akka and Ask, Tell and Per Request Actors for a more detailed analysis of messages, and see the Akka documentation.

The important caveat is that not all systems are Akka-based. If you're talking to a NoSQL store like Redis or Cassandra, odds are that you are using a non-blocking driver that uses Future directly. Or you may be using Play, which will allow you to pass along a Future (and thereby avoid "opening the box") using Action.async:

def index(): Future[Result] = Action.async {
  Future.successful {
    Ok("hello world!") // 200 HTTP Result
  }
}

What this means, in practice, is that if you're using a system which is not based around Akka Actors, and you're not using a stream based API such as Iteratees / Reactive Streams, then most of the time you hear about "reactive" and "non-blocking", you're going to be looking at Future. And you're going to pass that Future as far along the stack as you can, because you want to avoid opening that box. Which brings us to dependent futures.

Dependent Futures

Assume a service (in Scala, trait means roughly the same as interface in Java) that goes and gets data:

trait FooService {
  def find(fooId:FooID): Option[Foo]
}

This service returns an Option[T]. Option is another "wrapper" type, which only has two possible values – for Option[Foo], you'll get back a Some(Foo(1)) if Foo(1) exists, or None if Foo(1) wasn't found. Using Option means that we don't have to have null checks all over the place:

val foo:Foo = fooService.find(fooId)
if (foo != null) { // WITHOUT OPTION
  Console.println("Foo is " + foo)
}

Using the Option.map method, we can safely get at the value only if it exists:

val maybeFoo:Option[Foo] = fooService.find(fooId)
maybeFoo.map { foo => // WITH OPTION
  Console.println("Foo is " + foo)
}

You can see that both Future and Option work on the same principle: you have a type which contains another type, which you can only get at under certain conditions.

But FooService isn't non-blocking. If we assume that there's a non-blocking source of data behind the hood, we can do this:

def find(fooId:FooID): Future[Option[Foo]]

And now we can do the following:

fooService.find(fooId).map { maybeFoo =>
  maybeFoo.map { foo =>
    Console.println("Foo is " + foo)
  }
}

Now, the interesting thing is that we can't compose a Future with an Option. If we had a Future in a Future then we can flatten it and get back a single Future, and if we had an Option in an Option we could flatten it and get back a single Option, but we can't flatten a Future[Option[T]]. That means we can't do this:

fooService.find(fooId).flatMap { foo =>
  Console.println("Foo is " + foo)  
}

This turns out to be a problem.

So, let's add a couple more services to the mix, following the model of FooService:

trait BarService {
  def find(barId:BarID) : Future[Option[Bar]]
}
trait QuuxService {
  def find(quuxId:QuuxID) : Future[Option[Quux]]
}

Assuming that all these services return independent futures, you can do the following:

val fooFuture = fooService.find(FooID(1))
val barFuture = barService.find(BarID(2))
val quuxFuture = quuxService.find(QuuxID(3))

At the end of the code block, there will be three different futures, with potentially three different threads running on them in parallel. This is the model that you'll see in a number of Future tutorials.

Unfortunately, it doesn't always work out like that. Often times, you'll want something that looks roughly like:

val fooFuture = fooService.find(1)
val barFuture = barService.find(foo.barId) // where is foo?
val quuxFuture = quuxService.find(bar.quuxId) // where is bar?

In this example, each service needs the resolution of the previous Future in order to run. This is not quite as ideal, but it's still better than blocking.

So, how do you do this?

The Obvious Solution

Well, given the constraints, the most immediate way is:

fooService.get(1).map { maybeFoo =>
  maybeFoo.map { foo =>
    barService.get(foo.barId).map { maybeBar =>
      maybeBar.map { bar =>
        quuxService.get(bar.quuxId).map { maybeQuux =>
          maybeQuux.map { quux =>
            Console.println("Quux = " + quux)
          }
        }
      }
    }
  }
}

I think we can all agree this is not very fun.

For Comprehensions

There are various different things we can try to make this better. First, we can try "for comprehensions", a piece of useful syntactic sugar that can do wonderful things with map and flatMap.

In this case though, because Future and Option don't compose, the fallback is nested for comprehensions:

for (maybeFoo <- fooService.find(1)) yield {
  for (foo <- maybeFoo) yield ...
}

Although, Christopher Hunt points out that you can do this:

def : Future[Option[Bar]] = {
  for {
    Some(foo) <- fooService.find(1)
    maybeBar <- barService.find(foo.barId)
  } yield maybeBar
}

Which is much neater.

Scala Async

Let's try something else. Here's Scala Async. Instead of using future.map, you use an async block, and get the result back immediately:

async {
   val maybeFoo:Option[Foo] = await(fooService.find(1))
   maybeFoo.flatMap { foo =>
     val bar = await(barService.find(foo.barId))
   }
}

It turns out this won't compile! The reason why is the flatMap – the async documentation has a note saying “await must not be used inside a closure nested within an async block”, and so the nested await call fails to fall within the same async block.

However, there are some simple things you can do, that do work.

Small Methods

The simplest thing you can do is to break your code into very small methods, and break them up:

def foo2bar(futureOptionFoo:Future[Option[Foo]]) : Future[Option[Bar]]

This helps you avoid nesting, and lets you be very explicit about what types you are working with.

Flatten

Another option is to go ahead and nest everything, then use this neat trick:

implicit def flatten[A](fofoa: Future[Option[Future[Option[A]]]]): Future[Option[A]] = {
  fofoa.flatMap(_.getOrElse(Future.successful(None)))
}

While you can't flatten Future and Option together, you can take a Future[Option[T]] where T is also Future[Option[T]] and flatten those together. Credit goes to Jason Zaugg for this one.

Loan Pattern

We can also use the loan pattern to pass around blocks, and immediately return in the case where we see None. In this case, we're returning a Play Result object, which is NotFound (404) if anything returns None:

def getFuture[T](futureOptionBlock: Future[Option[T]])(foundBlock: (T => Future[Result])): Future[Result] = {
  futureOptionBlock.flatMap {
    case Some(found) =>
      foundBlock(found)
    case None =>
      Future.successful(NotFound)
  }
}

And this gives us:

def index: Future[Result] = Action.async {
  getFuture(fooService.find(1)) { foo =>
    getFuture(barService.find(foo.barId)) { bar =>
      getFuture(quuxService.find(bar.quuxId)) { quux =>
        Future.successful {
          Ok("Quux = " + quux)
        }
      }
    }
  }
}

If you cannot shortcut the Option, then you can call option.map(win).orElse(fail), or option.fold(fail, win) to process both the failure and success branches.

OptionT

The ultimate answer is to implement a monad transformer called OptionT to specifically compose Future with Option. Francois Garillot has written up a step by step blog post addressing how to implement OptionT in Scala.

More Reading

I recommend Future goodies and helpers: SafeFuture, TimeoutFuture, CancelableFuture if you want to do more with how Future exceptions are logged, how Future may timeout, and even cancelling(!) a Future.

Conclusion

We've shown how Akka and Scala provide non-blocking, asynchronous code using Future and akka message passing, and how to process Future based code without calling await or getting into deeply nested code. Hope this helps!

Comments