Composing Dependent Futures
This blog post is adapted from a lightning talk I gave at NetflixOSS, Season 2, Episode 2.
I've noticed that when the word "reactive" is mentioned, it tends not to be associated with any code. One of the things that "reactive" means is "non-blocking" code. "Non blocking" means the idea that you can make a call, and then go on and do something else in your program until you get a notification that something happened.
There are a number of frameworks which handle the notification – the idea that a response may not happen immediately – in different ways. Scala has the option of using a couple of different non-blocking mechanisms, and I'm going to go over how they're used and some interestin wrinkles when they are composed together.
Futures
Scala uses scala.concurrent.Future as the basic unit of non-blocking access.
The best way I've found to think of a Future
is a box that will, at some point, contain the thing that you want. The key thing with a Future
is that you never open the box. Trying to force open the box will lead you to blocking and grief. Instead, you put the Future
in another, larger box, typically using the map
method.
Here's an example of a Future
that contains a String
. When the Future
completes, then Console.println
is called:
object Main {
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
def main(args:Array[String]) : Unit = {
val stringFuture: Future[String] = Future.successful("hello world!")
stringFuture.map { someString =>
// if you use .foreach you avoid creating an extra Future, but we are proving
// the concept here...
Console.println(someString)
}
}
}
Note that in this case, we're calling the main
method and then… finishing. The string's Future
, provided by the global ExecutionContext
, does the work of calling Console.println
. This is great, because when we give up control over when someString
is going to be there and when Console.println
is going to be called, we let the system manage itself. In constrast, look what happens when we try to force the box open:
val stringFuture: Future[String] = Future.successful("hello world!")
val someString = Future.await(future)
In this case, we have to wait – keep a thread twiddling its thumbs – until we get someString
back. We've opened the box, but we've had to commandeer the system's resources to get at it.
Event Based Systems with Akka
When we talk about reactive systems in Scala, we're talking about event driven systems, which typically means Akka. When we want to get a result out of Akka, there are two ways we can do it. We can tell
– fire off a message to an actor:
fooActor ! GetFoo(1)
and then rely on fooActor
to send us back a message:
def receive = {
case GetFoo(id) =>
sender() ! Foo(id)
}
This has the advantage of being very simple and straightforward.
You also have the option of using ask
, which will generate a Future
:
val fooFuture: Future[Foo] = fooActor ? GetFoo(1)
When the actor's receive
method sends back Foo(id)
then the Future
will complete. If you want to go the other way, from a Future
to an Actor, then you can use pipeTo
:
Future.successful {
Foo(1)
} pipeTo actorRef
tell
is usually better than ask
, but there are nuances to Akka message processing. I recommend Three flavours of request-response pattern in Akka and Ask, Tell and Per Request Actors for a more detailed analysis of messages, and see the Akka documentation.
The important caveat is that not all systems are Akka-based. If you're talking to a NoSQL store like Redis or Cassandra, odds are that you are using a non-blocking driver that uses Future directly. Or you may be using Play, which will allow you to pass along a Future (and thereby avoid "opening the box") using Action.async
:
def index(): Future[Result] = Action.async {
Future.successful {
Ok("hello world!") // 200 HTTP Result
}
}
What this means, in practice, is that if you're using a system which is not based around Akka Actors, and you're not using a stream based API such as Iteratees / Reactive Streams, then most of the time you hear about "reactive" and "non-blocking", you're going to be looking at Future
. And you're going to pass that Future
as far along the stack as you can, because you want to avoid opening that box. Which brings us to dependent futures.
Dependent Futures
Assume a service (in Scala, trait
means roughly the same as interface
in Java) that goes and gets data:
trait FooService {
def find(fooId:FooID): Option[Foo]
}
This service returns an Option[T]
. Option
is another "wrapper" type, which only has two possible values – for Option[Foo]
, you'll get back a Some(Foo(1))
if Foo(1)
exists, or None
if Foo(1)
wasn't found. Using Option
means that we don't have to have null checks all over the place:
val foo:Foo = fooService.find(fooId)
if (foo != null) { // WITHOUT OPTION
Console.println("Foo is " + foo)
}
Using the Option.map
method, we can safely get at the value only if it exists:
val maybeFoo:Option[Foo] = fooService.find(fooId)
maybeFoo.map { foo => // WITH OPTION
Console.println("Foo is " + foo)
}
You can see that both Future
and Option
work on the same principle: you have a type which contains another type, which you can only get at under certain conditions.
But FooService
isn't non-blocking. If we assume that there's a non-blocking source of data behind the hood, we can do this:
def find(fooId:FooID): Future[Option[Foo]]
And now we can do the following:
fooService.find(fooId).map { maybeFoo =>
maybeFoo.map { foo =>
Console.println("Foo is " + foo)
}
}
Now, the interesting thing is that we can't compose a Future
with an Option
. If we had a Future
in a Future
then we can flatten it and get back a single Future
, and if we had an Option
in an Option
we could flatten it and get back a single Option
, but we can't flatten a Future[Option[T]]
. That means we can't do this:
fooService.find(fooId).flatMap { foo =>
Console.println("Foo is " + foo)
}
This turns out to be a problem.
So, let's add a couple more services to the mix, following the model of FooService
:
trait BarService {
def find(barId:BarID) : Future[Option[Bar]]
}
trait QuuxService {
def find(quuxId:QuuxID) : Future[Option[Quux]]
}
Assuming that all these services return independent futures, you can do the following:
val fooFuture = fooService.find(FooID(1))
val barFuture = barService.find(BarID(2))
val quuxFuture = quuxService.find(QuuxID(3))
At the end of the code block, there will be three different futures, with potentially three different threads running on them in parallel. This is the model that you'll see in a number of Future tutorials.
Unfortunately, it doesn't always work out like that. Often times, you'll want something that looks roughly like:
val fooFuture = fooService.find(1)
val barFuture = barService.find(foo.barId) // where is foo?
val quuxFuture = quuxService.find(bar.quuxId) // where is bar?
In this example, each service needs the resolution of the previous Future
in order to run. This is not quite as ideal, but it's still better than blocking.
So, how do you do this?
The Obvious Solution
Well, given the constraints, the most immediate way is:
fooService.get(1).map { maybeFoo =>
maybeFoo.map { foo =>
barService.get(foo.barId).map { maybeBar =>
maybeBar.map { bar =>
quuxService.get(bar.quuxId).map { maybeQuux =>
maybeQuux.map { quux =>
Console.println("Quux = " + quux)
}
}
}
}
}
}
I think we can all agree this is not very fun.
For Comprehensions
There are various different things we can try to make this better. First, we can try "for comprehensions", a piece of useful syntactic sugar that can do wonderful things with map
and flatMap
.
In this case though, because Future
and Option
don't compose, the fallback is nested for comprehensions:
for (maybeFoo <- fooService.find(1)) yield {
for (foo <- maybeFoo) yield ...
}
Although, Christopher Hunt points out that you can do this:
def : Future[Option[Bar]] = {
for {
Some(foo) <- fooService.find(1)
maybeBar <- barService.find(foo.barId)
} yield maybeBar
}
Which is much neater.
Scala Async
Let's try something else. Here's Scala Async. Instead of using future.map, you use an async
block, and get the result back immediately:
async {
val maybeFoo:Option[Foo] = await(fooService.find(1))
maybeFoo.flatMap { foo =>
val bar = await(barService.find(foo.barId))
}
}
It turns out this won't compile! The reason why is the flatMap
– the async documentation has a note saying “await must not be used inside a closure nested within an async block”, and so the nested await
call fails to fall within the same async
block.
However, there are some simple things you can do, that do work.
Small Methods
The simplest thing you can do is to break your code into very small methods, and break them up:
def foo2bar(futureOptionFoo:Future[Option[Foo]]) : Future[Option[Bar]]
This helps you avoid nesting, and lets you be very explicit about what types you are working with.
Flatten
Another option is to go ahead and nest everything, then use this neat trick:
implicit def flatten[A](fofoa: Future[Option[Future[Option[A]]]]): Future[Option[A]] = {
fofoa.flatMap(_.getOrElse(Future.successful(None)))
}
While you can't flatten Future
and Option
together, you can take a Future[Option[T]]
where T is also Future[Option[T]]
and flatten those together. Credit goes to Jason Zaugg for this one.
Loan Pattern
We can also use the loan pattern to pass around blocks, and immediately return in the case where we see None
. In this case, we're returning a Play Result
object, which is NotFound
(404) if anything returns None
:
def getFuture[T](futureOptionBlock: Future[Option[T]])(foundBlock: (T => Future[Result])): Future[Result] = {
futureOptionBlock.flatMap {
case Some(found) =>
foundBlock(found)
case None =>
Future.successful(NotFound)
}
}
And this gives us:
def index: Future[Result] = Action.async {
getFuture(fooService.find(1)) { foo =>
getFuture(barService.find(foo.barId)) { bar =>
getFuture(quuxService.find(bar.quuxId)) { quux =>
Future.successful {
Ok("Quux = " + quux)
}
}
}
}
}
If you cannot shortcut the Option
, then you can call option.map(win).orElse(fail)
, or option.fold(fail, win)
to process both the failure and success branches.
OptionT
The ultimate answer is to implement a monad transformer called OptionT
to specifically compose Future
with Option
. Francois Garillot has written up a step by step blog post addressing how to implement OptionT in Scala.
More Reading
I recommend Future goodies and helpers: SafeFuture, TimeoutFuture, CancelableFuture if you want to do more with how Future
exceptions are logged, how Future
may timeout, and even cancelling(!) a Future
.
Conclusion
We've shown how Akka and Scala provide non-blocking, asynchronous code using Future
and akka message passing, and how to process Future
based code without calling await
or getting into deeply nested code. Hope this helps!