Terse Systems

Composing Dependent Futures

| Comments

This blog post is adapted from a lightning talk I gave at NetflixOSS, Season 2, Episode 2.

I’ve noticed that when the word “reactive” is mentioned, it tends not to be associated with any code. One of the things that “reactive” means is “non-blocking” code. “Non blocking” means the idea that you can make a call, and then go on and do something else in your program until you get a notification that something happened.

There are a number of frameworks which handle the notification — the idea that a response may not happen immediately — in different ways. Scala has the option of using a couple of different non-blocking mechanisms, and I’m going to go over how they’re used and some interestin wrinkles when they are composed together.

Futures

Scala uses scala.concurrent.Future as the basic unit of non-blocking access.

The best way I’ve found to think of a Future is a box that will, at some point, contain the thing that you want. The key thing with a Future is that you never open the box. Trying to force open the box will lead you to blocking and grief. Instead, you put the Future in another, larger box, typically using the map method.

Here’s an example of a Future that contains a String. When the Future completes, then Console.println is called:

1
2
3
4
5
6
7
8
9
10
11
12
object Main {
  import scala.concurrent.Future
  import scala.concurrent.ExecutionContext.Implicits.global
  def main(args:Array[String]) : Unit = {
    val stringFuture: Future[String] = Future.successful("hello world!")
    stringFuture.map { someString =>
      // if you use .foreach you avoid creating an extra Future, but we are proving
      // the concept here...
      Console.println(someString)
    }
  }
}

Note that in this case, we’re calling the main method and then… finishing. The string’s Future, provided by the global ExecutionContext, does the work of calling Console.println. This is great, because when we give up control over when someString is going to be there and when Console.println is going to be called, we let the system manage itself. In constrast, look what happens when we try to force the box open:

1
2
val stringFuture: Future[String] = Future.successful("hello world!")
val someString = Future.await(future)

In this case, we have to wait — keep a thread twiddling its thumbs — until we get someString back. We’ve opened the box, but we’ve had to commandeer the system’s resources to get at it.

Event Based Systems with Akka

When we talk about reactive systems in Scala, we’re talking about event driven systems, which typically means Akka. When we want to get a result out of Akka, there are two ways we can do it. We can tell — fire off a message to an actor:

1
fooActor ! GetFoo(1)

and then rely on fooActor to send us back a message:

1
2
3
4
def receive = {
  case GetFoo(id) =>
    sender() ! Foo(id)
}

This has the advantage of being very simple and straightforward.

You also have the option of using ask, which will generate a Future:

1
val fooFuture: Future[Foo] = fooActor ? GetFoo(1)

When the actor’s receive method sends back Foo(id) then the Future will complete. If you want to go the other way, from a Future to an Actor, then you can use pipeTo:

1
2
3
Future.successful {
  Foo(1)
} pipeTo actorRef

tell is usually better than ask, but there are nuances to Akka message processing. I recommend Three flavours of request-response pattern in Akka and Ask, Tell and Per Request Actors for a more detailed analysis of messages, and see the Akka documentation.

The important caveat is that not all systems are Akka-based. If you’re talking to a NoSQL store like Redis or Cassandra, odds are that you are using a non-blocking driver that uses Future directly. Or you may be using Play, which will allow you to pass along a Future (and thereby avoid “opening the box”) using Action.async:

1
2
3
4
5
def index(): Future[Result] = Action.async {
  Future.successful {
    Ok("hello world!") // 200 HTTP Result
  }
}

What this means, in practice, is that if you’re using a system which is not based around Akka Actors, and you’re not using a stream based API such as Iteratees / Reactive Streams, then most of the time you hear about “reactive” and “non-blocking”, you’re going to be looking at Future. And you’re going to pass that Future as far along the stack as you can, because you want to avoid opening that box. Which brings us to dependent futures.

Dependent Futures

Assume a service (in Scala, trait means roughly the same as interface in Java) that goes and gets data:

1
2
3
trait FooService {
  def find(fooId:FooID): Option[Foo]
}

This service returns an Option[T]. Option is another “wrapper” type, which only has two possible values — for Option[Foo], you’ll get back a Some(Foo(1)) if Foo(1) exists, or None if Foo(1) wasn’t found. Using Option means that we don’t have to have null checks all over the place:

1
2
3
4
val foo:Foo = fooService.find(fooId)
if (foo != null) { // WITHOUT OPTION
  Console.println("Foo is " + foo)
}

Using the Option.map method, we can safely get at the value only if it exists:

1
2
3
4
val maybeFoo:Option[Foo] = fooService.find(fooId)
maybeFoo.map { foo => // WITH OPTION
  Console.println("Foo is " + foo)
}

You can see that both Future and Option work on the same principle: you have a type which contains another type, which you can only get at under certain conditions.

But FooService isn’t non-blocking. If we assume that there’s a non-blocking source of data behind the hood, we can do this:

1
def find(fooId:FooID): Future[Option[Foo]]

And now we can do the following:

1
2
3
4
5
fooService.find(fooId).map { maybeFoo =>
  maybeFoo.map { foo =>
    Console.println("Foo is " + foo)
  }
}

Now, the interesting thing is that we can’t compose a Future with an Option. If we had a Future in a Future then we can flatten it and get back a single Future, and if we had an Option in an Option we could flatten it and get back a single Option, but we can’t flatten a Future[Option[T]]. That means we can’t do this:

1
2
3
fooService.find(fooId).flatMap { foo =>
  Console.println("Foo is " + foo)
}

This turns out to be a problem.

So, let’s add a couple more services to the mix, following the model of FooService:

1
2
3
4
5
6
trait BarService {
  def find(barId:BarID) : Future[Option[Bar]]
}
trait QuuxService {
  def find(quuxId:QuuxID) : Future[Option[Quux]]
}

Assuming that all these services return independent futures, you can do the following:

1
2
3
val fooFuture = fooService.find(FooID(1))
val barFuture = barService.find(BarID(2))
val quuxFuture = quuxService.find(QuuxID(3))

At the end of the code block, there will be three different futures, with potentially three different threads running on them in parallel. This is the model that you’ll see in a number of Future tutorials.

Unfortunately, it doesn’t always work out like that. Often times, you’ll want something that looks roughly like:

1
2
3
val fooFuture = fooService.find(1)
val barFuture = barService.find(foo.barId) // where is foo?
val quuxFuture = quuxService.find(bar.quuxId) // where is bar?

In this example, each service needs the resolution of the previous Future in order to run. This is not quite as ideal, but it’s still better than blocking.

So, how do you do this?

The Obvious Solution

Well, given the constraints, the most immediate way is:

1
2
3
4
5
6
7
8
9
10
11
12
13
fooService.get(1).map { maybeFoo =>
  maybeFoo.map { foo =>
    barService.get(foo.barId).map { maybeBar =>
      maybeBar.map { bar =>
        quuxService.get(bar.quuxId).map { maybeQuux =>
          maybeQuux.map { quux =>
            Console.println("Quux = " + quux)
          }
        }
      }
    }
  }
}

I think we can all agree this is not very fun.

For Comprehensions

There are various different things we can try to make this better. First, we can try “for comprehensions”, a piece of useful syntactic sugar that can do wonderful things with map and flatMap.

In this case though, because Future and Option don’t compose, the fallback is nested for comprehensions:

1
2
3
for (maybeFoo <- fooService.find(1)) yield {
  for (foo <- maybeFoo) yield ...
}

Although, Christopher Hunt points out that you can do this:

1
2
3
4
5
6
def : Future[Option[Bar]] = {
  for {
    Some(foo) <- fooService.find(1)
    maybeBar <- barService.find(foo.barId)
  } yield maybeBar
}

Which is much neater.

Scala Async

Let’s try something else. Here’s Scala Async. Instead of using future.map, you use an async block, and get the result back immediately:

1
2
3
4
5
6
async {
   val maybeFoo:Option[Foo] = await(fooService.find(1))
   maybeFoo.flatMap { foo =>
     val bar = await(barService.find(foo.barId))
   }
}

It turns out this won’t compile! The reason why is the flatMap — the async documentation has a note saying “await must not be used inside a closure nested within an async block”, and so the nested await call fails to fall within the same async block.

However, there are some simple things you can do, that do work.

Small Methods

The simplest thing you can do is to break your code into very small methods, and break them up:

1
def foo2bar(futureOptionFoo:Future[Option[Foo]]) : Future[Option[Bar]]

This helps you avoid nesting, and lets you be very explicit about what types you are working with.

Flatten

Another option is to go ahead and nest everything, then use this neat trick:

1
2
3
implicit def flatten[A](fofoa: Future[Option[Future[Option[A]]]]): Future[Option[A]] = {
  fofoa.flatMap(_.getOrElse(Future.successful(None)))
}

While you can’t flatten Future and Option together, you can take a Future[Option[T]] where T is also Future[Option[T]] and flatten those together. Credit goes to Jason Zaugg for this one.

Loaner Pattern

We can also use the loaner pattern to pass around blocks, and immediately return in the case where we see None. In this case, we’re returning a Play Result object, which is NotFound (404) if anything returns None:

1
2
3
4
5
6
7
8
def getFuture[T](futureOptionBlock: Future[Option[T]])(foundBlock: (T => Future[Result])): Future[Result] = {
  futureOptionBlock.flatMap {
    case Some(found) =>
      foundBlock(found)
    case None =>
      Future.successful(NotFound)
  }
}

And this gives us:

1
2
3
4
5
6
7
8
9
10
11
def index: Future[Result] = Action.async {
  getFuture(fooService.find(1)) { foo =>
    getFuture(barService.find(foo.barId)) { bar =>
      getFuture(quuxService.find(bar.quuxId)) { quux =>
        Future.successful {
          Ok("Quux = " + quux)
        }
      }
    }
  }
}

If you cannot shortcut the Option, then you can call option.map(win).orElse(fail), or option.fold(fail, win) to process both the failure and success branches.

OptionT

The ultimate answer is to implement a monad transformer called OptionT to specifically compose Future with Option. Francois Garillot has written up a step by step blog post addressing how to implement OptionT in Scala.

More Reading

I recommend Future goodies and helpers: SafeFuture, TimeoutFuture, CancelableFuture if you want to do more with how Future exceptions are logged, how Future may timeout, and even cancelling(!) a Future.

Conclusion

We’ve shown how Akka and Scala provide non-blocking, asynchronous code using Future and akka message passing, and how to process Future based code without calling await or getting into deeply nested code. Hope this helps!

Play TLS Example With Client Authentication

| Comments

This is part of a series of posts about setting up Play WS as a TLS client for a “secure by default” setup.

Previous posts are:

This post is where the rubber meets the road — an actual, demonstrable activator template that shows off the WS SSL, provides the scripts for certificate generation, and provides people with an out of the box TLS 1.2 using ECDSA certificates.

Want to download it? Go to https://github.com/typesafehub/activator-play-tls-example or clone it directly:

1
git clone https://github.com/typesafehub/activator-play-tls-example.git

It’s an activator template, so you can also install it from inside Typesafe Activator by searching for “TLS”.

Be sure to read the README. This project is as lightweight as possible, but takes a little configuration to get started.

Certificate Generation

The biggest part of any demo application is setting up the scripts. I didn’t find anything that was really hands free, so I wrote my own. They are exactly the same as the ones described Certificate Generation section of the manual.

There’s various shortcuts that you can use for defining X.509 certificates, but I found it a lot more useful to go through the work of setting up the root CA certificate, defining the server certificate as having an EKU of “serverAuth” and so on.

Play Script

The actual script to run Play with all the required JVM options is… large. Part of this is the documentation on every possible feature, but sadly, there are far too many lines which are “best practices” that are very rarely practiced.

Also, the note about rh.secure is a reference to the RequestHeader class in Play itself. Ironically, even when we set HTTPS up on the server, Play itself can’t tell the protocol it’s running on without help.

I will admit to being gleefully happy at setting disabledAlgorithms.properties on startup, so that at last AlgorithmConstraints is enabled on the server:

1
2
jdk.tls.disabledAlgorithms=RSA keySize < 2048, DSA keySize < 2048, EC keySize < 224
jdk.certpath.disabledAlgorithms=MD2, MD4, MD5, RSA keySize < 2048, DSA keySize < 2048, EC keySize < 224

CustomSSLEngineProvider

The CustomSSLEngineProvider is responsible for Play’s HTTPS server. More details can be found in Configuring HTTPS.

Setting up an SSLEngineProvider with client authentication is pretty easy, once you know the magic incantations needed to get the trust managers and the key managers set up. After that, it’s a question of ensuring that the SSLEngine knows how trusting it should be.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
override def createSSLEngine(): SSLEngine = {
  val sslContext = createSSLContext(appProvider)

  // Start off with a clone of the default SSL parameters...
  val sslParameters = sslContext.getDefaultSSLParameters

  // Tells the server to ignore client's cipher suite preference.
  // http://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#cipher_suite_preference
  sslParameters.setUseCipherSuitesOrder(true)

  // http://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#SSLParameters
  val needClientAuth = java.lang.System.getProperty("play.ssl.needClientAuth")
  sslParameters.setNeedClientAuth(java.lang.Boolean.parseBoolean(needClientAuth))

  // Clone and modify the default SSL parameters.
  val engine = sslContext.createSSLEngine
  engine.setSSLParameters(sslParameters)

  engine
}

Connecting to the server with Play WS

Setting up the Play client was pretty easy, but it’s worth repeating that Play WS can be run outside of an application with the right setup:

1
2
3
4
5
6
7
8
9
10
11
def newClient(rawConfig: play.api.Configuration): WSClient = {
  val classLoader = Thread.currentThread().getContextClassLoader
  val parser = new DefaultWSConfigParser(rawConfig, classLoader)
  val clientConfig = parser.parse()
  clientConfig.ssl.map {
    _.debug.map(new DebugConfiguration().configure)
  }
  val builder = new NingAsyncHttpClientConfigBuilder(clientConfig)
  val client = new NingWSClient(builder.build())
  client
}

The configuration on the ws.conf file was also intentionally strict. The NSA recommends some nice cipher suites Suite B Profile for TLS — the WS client will refuse to talk to the server with anything less than full on TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384 and isn’t going to look at any ECDSA signature less than EC keySize < 384.

Client authentication on the client side was a little trickier than expected, but eventually I remembered that the client should be using a key store containing a trust anchor, and it worked itself out.

Conclusion

I think I’m done with JSSE for now. I was interested in exploits at one point, but the fixes amount to “Upgrade to JDK 1.8, use ECDSA certificates, and TLS 1.2” — all of which are demonstrated in this application.

I may at some point go back and look at HSTS or public key pinning in Play WS, but it really comes down to utility. Many of the use cases of pinning involve browsers or unknown clients. I’ve not heard of any demand for the feature, and it’s unclear that anyone would find it all that useful.

With Play and with these blog posts, I’m very pleased to have written something that people find useful. Thanks.

Akka Clustering, Step by Step

| Comments

This blog post shows how an Akka cluster works by walking through an example application in detail.

Introduction

Akka is an excellent toolkit for handling concurrency. The core concept behind Akka is the Actor model: loosely stated, instead of creating an instance of a class and invoking methods on it, i.e.

1
2
val foo = new Foo()
foo.doStuff(args)

You create an Actor, and send the actor immutable messages. Those messages get queued through a mailbox, and the actor processes the messages one by one, in order they were received:

1
2
val fooActor: ActorRef = actorSystem.actorOf(FooActor.props, "fooActor")
fooActor ! DoStuffMessage(args)

Sending messages to an actor is much better than invoking a method for a number of reasons. First, you can pass around an ActorRef anywhere. You can pass them between threads. You can keep them in static methods. No matter what you do, you can’t get the actor into an inconsistent state or call methods in the “wrong” order because the Actor manages its own state in response to those messages. Second, the actor can send messages back to you:

FooActor.scala
1
2
3
4
5
6
def receive = {
  case DoStuffMessage(args) =>
    sender() ! Success("all good!")
  case _ =>
    sender() ! Failure("not so good.")
}

Message passing means that the usual binary of method calls — either return a value or throw an exception — gets opened up considerably. When you can send the actor any message you like, and the actor can send you any message back (and can send you that message when it gets around to processing it, instead of immediately), then you are not bound by locality any more. The actor that you’re sending a message to doesn’t have to live on the same JVM that you’re on. It doesn’t even have to live on the same physical machine. As long as there’s a transport capable of getting the message to the actor, it can live anywhere.

This brings us to Akka remoting.

Remoting

Akka remoting works by saying to the actor system either “I want you to create an actor on this remote host”:

1
val ref = system.actorOf(FooActor.props.withDeploy(Deploy(scope = RemoteScope(address))))

or “I want a reference to an existing actor on the remote host”:

1
val remoteFooActor = context.actorSelection("akka.tcp://actorSystemName@10.0.0.1:2552/user/fooActor")

After calling the actor, messages are sent to the remote server using Protocol Buffers for serialization, and reconstituted on the other end. This is great for peer to peer communication (it already beats RMI), but remoting is too specific in some ways — it points to a unique IP address, and really we’d like actors to just live out there “in the cloud”. This is where Akka clustering comes in.

Clustering allows you to create an actor somewhere on a cluster consisting of nodes which all share the same actor system, without knowing exactly which node it is on. Other machines can join and leave the cluster at run time.

We’ll use the akka-sample-cluster-app Activator template as a reference, and walk through each step of the TransformationApp application, showing how to run it and how it communicates.

Installation

Ideally, you should download Typesafe Activator, start it up with “activator ui” and search for “Akka Cluster Samples with Scala” in the field marked “Filter Templates”. From there, Activator can download the template and provide you with a friendly UI and tutorial.

If not, all the code snippets have links back to the source code on Github, so you can clone or copy the files directly.

Clustering

The first step in Akka clustering is the library dependencies and the akka configuration. The build.sbt file used in akka-sample-cluster-app is complex, but the only bit you need to care about is the akka-cluster library:

build.sbt
1
libraryDependencies ++= Seq("com.typesafe.akka" %% "akka-cluster" % "2.3.3")

The akka configuration is relatively simple:

application.conflink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
  }
}

You can see there’s a custom cluster provider, and that there’s a remote section that enables Akka to set up its TCP/IP stack.

There’s also a cluster section, which says that there are other nodes on ports 2551 and 2552 that should be contacted when the cluster starts up.

Starting Frontend Node

There are two sides to the Transformation application — a frontend node and some backend nodes.

The examples in akka-sample-cluster-scala tend to have a single main method, and start all the nodes inside a single JVM. I think this confuses people, and so the examples are going to be using separate JVMs on the same box instead.

Here’s the entire main method for starting up a frontend node in the cluster.

TransformationFrontend.scalalink start:16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
object TransformationFrontend {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
      withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem", config)
    val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")

    val counter = new AtomicInteger
    import system.dispatcher
    system.scheduler.schedule(2.seconds, 2.seconds) {
      implicit val timeout = Timeout(5 seconds)
      (frontend ? TransformationJob("hello-" + counter.incrementAndGet())) onSuccess {
        case result => println(result)
      }
    }
  }
}

The TransformationFrontend object has an interesting construct for its configuration that is worth breaking down.

TransformationFrontend.scalalink start:43
1
2
3
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
  withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
  withFallback(ConfigFactory.load())

This statement adds the setting “akka.remote.netty.tcp.port=0” and the setting “akka.cluster.roles = [frontend]” onto the configuration returned from ConfigFactory.load() — which points to application.conf. The end result is the config from application.conf plus the config from all of the text preceding it. This is a quick and simple way to append configuration settings, but the name withFallback can be confusing at first glance.

After that, the class starts the actor system with the “ClusterSystem” name (this is important — note that the seed nodes also have this name):

TransformationFrontend.scalalink start:47
1
val system = ActorSystem("ClusterSystem", config)

And then starts up a frontend actor which does some heavy lifting. This is really all that’s required for the front end to get going.

TransformationFrontend.scalalink start:48
1
val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")

Finally, the main method sets up a scheduler method that asks the frontend actor to transform a job containing “hello-?” every two seconds:

TransformationFrontend.scalalink start:50
1
2
3
4
5
6
7
8
val counter = new AtomicInteger
import system.dispatcher
system.scheduler.schedule(2.seconds, 2.seconds) {
  implicit val timeout = Timeout(5 seconds)
  (frontend ? TransformationJob("hello-" + counter.incrementAndGet())) onSuccess {
    case result => println(result)
  }
}

For development purposes, it’s easiest to start the node in Activator:

1
2
$ activator
> runMain sample.cluster.transformation.TransformationFrontend

You’ll see that the frontend node starts up and complains it can’t reach the cluster:

1
2
[info] [WARN] [06/25/2014 12:43:27.962] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [Remoting] Tried to associate with unreachable remote address [akka.tcp://ClusterSystem@127.0.0.1:2552]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /127.0.0.1:2552
[info] [WARN] [06/25/2014 12:43:27.966] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [Remoting] Tried to associate with unreachable remote address [akka.tcp://ClusterSystem@127.0.0.1:2551]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /127.0.0.1:2551

This is fine for now. Once we bring the rest of the nodes up, the frontend node will be happier.

Starting Backend Node

The main method for starting the backend is similar to the frontend:

TransformationBackend.scalalink start:41
1
2
3
4
5
6
7
8
9
10
11
12
object TransformationBackend {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
      withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem", config)
    system.actorOf(Props[TransformationBackend], name = "backend")
  }
}

The only thing that’s different here is that it has a cluster role of “akka.cluster.roles = [backend]” and starts up a “backend” actor.

Open up two new shells (if you’re using iTerm you can use “Command D” to split them easily), and run the following to get the backend nodes up on each of the seed nodes:

1
2
$ activator
> runMain sample.cluster.transformation.TransformationBackend 2551
1
2
$ activator
> runMain sample.cluster.transformation.TransformationBackend 2552

Once you have the two backend nodes up, you’ll see that the frontend actor connects to both of them, decides the cluster is up, and starts transforming strings:

1
2
3
4
5
[info] [INFO] [06/25/2014 12:46:21.503] [ClusterSystem-akka.actor.default-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:52860] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[info] TransformationResult(HELLO-19)
[info] TransformationResult(HELLO-20)
[info] TransformationResult(HELLO-21)
[info] TransformationResult(HELLO-22)

TransformationFrontend Actor

The frontend actor looks like this:

TransformationFrontend.scalalink start:16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TransformationFrontend extends Actor {

  var backends = IndexedSeq.empty[ActorRef]
  var jobCounter = 0

  def receive = {
    case job: TransformationJob if backends.isEmpty =>
      sender() ! JobFailed("Service unavailable, try again later", job)

    case job: TransformationJob =>
      jobCounter += 1
      backends(jobCounter % backends.size) forward job

    case BackendRegistration if !backends.contains(sender()) =>
      context watch sender()
      backends = backends :+ sender()

    case Terminated(a) =>
      backends = backends.filterNot(_ == a)
  }
}

It’s clear what it does. When it receives TransformationJob, it forwards it to one of the backend actors, round-robin style. If there aren’t any backend actors, it sends back JobFailed. When it gets a BackendRegistration message, it tells the context to watch the sender’s ActorRef (this tells the context to notify the frontend actor if the sender has died, by sending the frontend actor a Terminated message) and then adds the sender to its internal list of backend actors. And then finally, if one of the backend actors has died, the front end actor removes it from the list.

So far, so good. The interesting bit here is that the frontend has no idea what backend actors are available — it has to receive a message from the backend explicitly to let it know. Let’s see how the backend actor does that.

TransformationBackend Actor

Here’s the full class of the TransformationBackend actor:

TransformationBackend.scalalink start:18
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TransformationBackend extends Actor {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, MemberUp
  // re-subscribe when restart
  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase)
    case state: CurrentClusterState =>
      state.members.filter(_.status == MemberStatus.Up) foreach register
    case MemberUp(m) => register(m)
  }

  def register(member: Member): Unit =
    if (member.hasRole("frontend"))
      context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
        BackendRegistration
}

This is more interesting. The backend actor is cluster aware in a way that the front end actor is not:

TransformationBackend.scalalink start:20
1
2
3
val cluster = Cluster(context.system)
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
override def postStop(): Unit = cluster.unsubscribe(self)

When the actor first starts up, it subscribes itself to the cluster, telling the cluster to send it CurrentClusterState and MemberUp events:

TransformationBackend.scalalink start:27
1
2
3
4
5
6
def receive = {
  case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase)
  case state: CurrentClusterState =>
    state.members.filter(_.status == MemberStatus.Up) foreach register
  case MemberUp(m) => register(m)
}

And when it gets those messages saying that the cluster is up, it calls register, which looks up the frontend actor and sends it the BackendRegistration message.

TransformationBackend.scalalink start:34
1
2
3
4
def register(member: Member): Unit =
  if (member.hasRole("frontend"))
    context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
      BackendRegistration

After that, the front end can send it TransformationJob messages, and it can respond to the frontend’s requests with TransformationResult(text.toUpperCase)

Conclusion

That’s it! That’s clustering in a nutshell.

Writing an SBT Plugin

| Comments

One of the things I like about SBT is that it’s interactive. SBT stays up as a long running process, and you interact with it many times, while it manages your project and compiles code for you.

Because SBT is interactive and runs on the JVM, you can use it for more than just builds. You can use it for communication. Specifically, you can use it to make HTTP requests out to things you’re interested in communicating with.

Unfortunately, I knew very little about SBT plugins. So, I talked to Christopher Hunt and Josh Suereth, downloaded eigengo’s sbt-mdrw project, read the activator blog post on markdown and then worked it out on the plane back from Germany.

I made a 0.13 SBT plugin that uses the ROME RSS library to display titles from a list of RSS feeds. It’s available from https://github.com/wsargent/sbt-rss and has lots of comments.

The SBT RSS plugin adds a single command to SBT. You type rss at the console, and it displays the feed:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
> rss
[info] Showing http://typesafe.com/blog/rss.xml
[info]      Title = The Typesafe Blog
[info]      Published = null
[info]      Most recent entry = Scala Days Presentation Roundup
[info]      Entry updated = null
[info] Showing http://letitcrash.com/rss
[info]      Title = Let it crash
[info]      Published = null
[info]      Most recent entry = Reactive Queue with Akka Reactive Streams
[info]      Entry updated = null
[info] Showing https://github.com/akka/akka.github.com/commits/master/news/_posts.atom
[info]      Title = Recent Commits to akka.github.com:master
[info]      Published = Thu May 22 05:51:21 EDT 2014
[info]      Most recent entry = Fix fixed issue list.
[info]      Entry updated = Thu May 22 05:51:21 EDT 2014

Let’s show how it does that.

First, the build file. This looks like a normal build.sbt file, except that there’s a sbtPlugin setting in it:

build.sbtlink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// this bit is important
sbtPlugin := true

organization := "com.typesafe.sbt"

name := "sbt-rss"

version := "1.0.0-SNAPSHOT"

scalaVersion := "2.10.4"

scalacOptions ++= Seq("-deprecation", "-feature")

resolvers += Resolver.sonatypeRepo("snapshots")

libraryDependencies ++= Seq(
  // RSS fetcher (note: the website is horribly outdated)
  "com.rometools" % "rome-fetcher" % "1.5.0"
)

publishMavenStyle := false

/** Console */
initialCommands in console := "import com.typesafe.sbt.rss._"

Next, there’s the Plugin scala code itself.

SbtRss.scalalink
1
2
3
object SbtRss extends AutoPlugin {
   // stuff
}

So, the first thing to note is the AutoPlugin class. The Plugins page talks about AutoPlugin — all you really need to know is if you define an autoImport object with your setting keys and then import it into an AutoPlugin, you will make the settingKey available to SBT.

The next bit is the globalSettings entry:

SbtRss.scalalink
1
2
3
override def globalSettings: Seq[Setting[_]] = super.globalSettings ++ Seq(
  Keys.commands += rssCommand
)

Here, we’re saying we’re going to add a command to SBT’s global settings, by merging it with super.globalSettings.

The next two bits detail how to create the RSS command in SBT style.

SbtRss.scalalink
1
2
3
4
5
/** Allows the RSS command to take string arguments. */
private val args = (Space ~> StringBasic).*

/** The RSS command, mapped into sbt as "rss [args]" */
private lazy val rssCommand = Command("rss")(_ => args)(doRssCommand)

Finally, there’s the command itself.

SbtRss.scalalink
1
2
3
4
5
def doRssCommand(state: State, args: Seq[String]): State = {
  // do stuff

  state
}

The first thing we need to do within a command is call Project.extract(state). This gives us a bunch of useful settings such as currentRef, which we can use to pull the value of the SettingKey out. The SBT documentation on Build State – Project related data shows some more examples:

SbtRss.scalalink
1
2
3
4
5
// Doing Project.extract(state) and then importing it gives us currentRef.
// Using currentRef allows us to get at the values of SettingKey.
// http://www.scala-sbt.org/release/docs/Build-State.html#Project-related+data
val extracted = Project.extract(state)
import extracted._

Once we have the extracted.currentRef object, we can pull out the list of URLs with this construct, where the documentation is from Build State – Project data:

SbtRss.scalalink
1
val currentList = (rssList in currentRef get structure.data).get

And then we can put that together with the ROME library to print something out.

SbtRss.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.typesafe.sbt.rss

import sbt._
import Keys._
import sbt.complete.Parsers._

import java.net.URL
import com.rometools.fetcher._
import com.rometools.fetcher.impl._
import com.rometools.rome.feed.synd._

import scala.util.control.NonFatal

/**
 * An autoplugin that displays an RSS feed.
 */
object SbtRss extends AutoPlugin {

  /**
   * Sets up the autoimports of setting keys.
   */
  object autoImport {
    /**
     * Defines "rssList" as the setting key that we want the user to fill out.
     */
    val rssList = settingKey[Seq[String]]("The list of RSS urls to update.")
  }

  // I don't know why we do this.
  import autoImport._

  /**
   * An internal cache to avoid hitting RSS feeds repeatedly.
   */
  private val feedInfoCache = HashMapFeedInfoCache.getInstance()

  /**
   * An RSS fetcher, backed by the cache.
   */
  private val fetcher = new HttpURLFeedFetcher(feedInfoCache)

  /** Allows the RSS command to take string arguments. */
  private val args = (Space ~> StringBasic).*

  /** The RSS command, mapped into sbt as "rss [args]" */
  private lazy val rssCommand = Command("rss")(_ => args)(doRssCommand)

  /**
   * Adds the rssCommand to the list of global commands in SBT.
   */
  override def globalSettings: Seq[Setting[_]] = super.globalSettings ++ Seq(
    Keys.commands += rssCommand
  )

  /**
   * The actual RSS command.
   *
   * @param state the state of the RSS application.
   * @param args the string arguments provided to "rss"
   * @return the unchanged state.
   */
  def doRssCommand(state: State, args: Seq[String]): State = {
    state.log.debug(s"args = $args")

    // Doing Project.extract(state) and then importing it gives us currentRef.
    // Using currentRef allows us to get at the values of SettingKey.
    // http://www.scala-sbt.org/release/docs/Build-State.html#Project-related+data
    val extracted = Project.extract(state)
    import extracted._

    // Create a new fetcher event listener attached to the state -- this gives
    // us a way to log the fetcher events.
    val listener = new FetcherEventListenerImpl(state)
    fetcher.addFetcherEventListener(listener)

    try {
      if (args.isEmpty) {
        // This is the way we get the setting from rssList := Seq("http://foo.com/rss")
        // http://www.scala-sbt.org/release/docs/Build-State.html#Project+data
        val currentList = (rssList in currentRef get structure.data).get
        for (currentUrl <- currentList) {
          val feedUrl = new URL(currentUrl)
          printFeed(feedUrl, state)
        }
      } else {
        for (currentUrl <- args) {
          val feedUrl = new URL(currentUrl)
          printFeed(feedUrl, state)
        }
      }
    } catch {
      case NonFatal(e) =>
        state.log.error(s"Error ${e.getMessage}")
    } finally {
      // Remove the listener so we don't have a memory leak.
      fetcher.removeFetcherEventListener(listener)
    }

    state
  }

  def printFeed(feedUrl:URL, state:State) = {
    // Allows us to do "asScala" conversion from java.util collections.
    import scala.collection.JavaConverters._

    // This is a blocking operation, but we're in SBT, so we don't care.
    val feed = fetcher.retrieveFeed(feedUrl)
    val title = feed.getTitle.trim()
    val publishDate = feed.getPublishedDate
    val entries = feed.getEntries.asScala
    val firstEntry = entries.head

    // The only way to provide the RSS feeds as a resource seems to be to
    // have another plugin extend this one.  The code's small enough that it
    // doesn't seem worth it.
    state.log.info(s"Showing $feedUrl")
    state.log.info(s"\t\tTitle = $title")
    state.log.info(s"\t\tPublished = $publishDate")
    state.log.info(s"\t\tMost recent entry = ${firstEntry.getTitle.trim()}")
    state.log.info(s"\t\tEntry updated = " + firstEntry.getUpdatedDate)
  }

  /**
   * Listens for RSS events.
   *
   * @param state
   */
  class FetcherEventListenerImpl(state:State) extends FetcherListener {
    def fetcherEvent(event:FetcherEvent) = {
      import FetcherEvent._
      event.getEventType match {
        case EVENT_TYPE_FEED_POLLED =>
          state.log.debug("\tEVENT: Feed Polled. URL = " + event.getUrlString)
        case EVENT_TYPE_FEED_RETRIEVED =>
          state.log.debug("\tEVENT: Feed Retrieved. URL = " + event.getUrlString)
        case EVENT_TYPE_FEED_UNCHANGED =>
          state.log.debug("\tEVENT: Feed Unchanged. URL = " + event.getUrlString)
      }
    }
  }
}

This is an intentionally trivial example, but it’s easy to show how you could use this to check if the build failed, for example. Have fun.

Testing Hostname Verification

| Comments

This is part of a series of posts about setting up Play WS as a TLS client for a “secure by default” setup and configuration through text files, along with the research and thinking behind the setup. I recommend The Most Dangerous Code in the World for more background. And thanks to Jon for the shoutout in Techcrunch.

Previous posts are:

The last talked about implementing hostname verification, which was a particular concern in TMDCitW. This post shows how you can test that your TLS client implements hostname verification correctly, by staging an attack. We’re going to use dnschef, a DNS proxy server, to confuse the client into talking to the wrong server.

To keep things simple, I’m going to assume you’re on Mac OS X Mavericks at this point. (If you’re on Linux, this is old hat. If you’re on Windows, it’s probably easier to use a VM like Virtualbox to set up a Linux environment.)

The first step to installing dnschef is to install a decent Python. The Python Guide suggests Homebrew, and Homebrew requires XCode be installed, so let’s start there.

Install XCode

Install XCode from the App Store and also install the command line tools:

1
xcode-select --install

Install Homebrew itself:

1
ruby -e "$(curl -fsSL https://raw.github.com/Homebrew/homebrew/go/install)"

Homebrew has some notes about Python, so we set up the command line environment:

1
2
export ARCHFLAGS="-arch x86_64"
export PATH=/usr/local/bin:/usr/local/sbin:~/bin:$PATH

Now (if you already have homebrew installed):

1
2
3
brew update
brew install openssl
brew install python --with-brewed-openssl --framework

You should see:

1
2
3
4
$ python --version
Python 2.7.6
$ which python
/usr/local/bin/python

If you run into trouble, then brew doctor or brew link --overwrite python should sort things out.

Now upgrade the various package tools for Python:

1
2
pip install --upgrade setuptools
pip install --upgrade pip

Now that we’ve got Python installed, we can install dnschef:

1
2
3
wget https://thesprawl.org/media/projects/dnschef-0.2.1.tar.gz
tar xvzf dnschef-0.2.1.tar.gz
cd dnschef-0.2.1

Then, we need to use dnschef as a nameserver. An attacker would use rogue DHCP or ARP spoofing to fool your computer into accepting this, but we can just add it directly:

OS X – Open System Preferences and click on the Network icon.

Select the active interface and fill in the DNS Server field. If you are using Airport then you will have to click on Advanced… button and edit DNS servers from there.

Don’t forget to click “Apply” after making the changes!

Now, we’re going to use DNS to redirect https://www.howsmyssl.com to https://playframework.com.

1
2
$ host playframework.com
playframework.com has address 54.243.50.169

We need to specify the IP address 54.243.50.169 as the fakeip argument.

1
2
3
4
5
6
7
8
9
10
11
12
$ sudo /usr/local/bin/python ./dnschef.py --fakedomains www.howsmyssl.com --fakeip 54.243.50.169
          _                _          __
         | | version 0.2  | |        / _|
       __| |_ __  ___  ___| |__   ___| |_
      / _` | '_ \/ __|/ __| '_ \ / _ \  _|
     | (_| | | | \__ \ (__| | | |  __/ |
      \__,_|_| |_|___/\___|_| |_|\___|_|
                   iphelix@thesprawl.org

[*] DNSChef started on interface: 127.0.0.1
[*] Using the following nameservers: 8.8.8.8
[*] Cooking A replies to point to 54.243.50.169 matching: www.howsmyssl.com

Now that we’ve got dnschef working as a proxy, we can see whether various TLS clients notice that www.howsmyssl.com has started returning an X.509 certificate that says it came from “playframework.com”:

1
2
3
$ curl https://www.howsmyssl.com/
curl: (60) SSL certificate problem: Invalid certificate chain
More details here: http://curl.haxx.se/docs/sslcerts.html

Curl is not fooled. It knows the subjectAltName.dnsName is different.

Let’s try Play WS:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
[ssltest] $ testOnly HowsMySSLSpec
[info] Compiling 1 Scala source to /Users/wsargent/work/ssltest/target/scala-2.10/test-classes...
Mar 31, 2014 6:11:08 PM org.jboss.netty.channel.DefaultChannelFuture
WARNING: An exception was thrown by ChannelFutureListener.
java.net.ConnectException: HostnameVerifier exception.
  at com.ning.http.client.providers.netty.NettyConnectListener.operationComplete(NettyConnectListener.java:81)
  at org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:427)
  at org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:413)
  at org.jboss.netty.channel.DefaultChannelFuture.setSuccess(DefaultChannelFuture.java:362)
  at org.jboss.netty.handler.ssl.SslHandler.setHandshakeSuccess(SslHandler.java:1383)
  at org.jboss.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1252)
  at org.jboss.netty.handler.ssl.SslHandler.decode(SslHandler.java:913)
  at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
  at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
  at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
  at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
  at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)
  at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)
  at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
  at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:744)

[info] HowsMySSLSpec
[info]
[info] WS should
[info] + NOT be fooled by dnschef
[info]
[info] Total for specification HowsMySSLSpec
[info] Finished in 21 seconds, 162 ms
[info] 1 example, 0 failure, 0 error
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 25 s, completed Mar 31, 2014 6:11:26 PM]

Yep, it throws an exception.

Now let’s try it with hostname verification off by setting the ‘loose’ option on the client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class HowsMySSLSpec extends PlaySpecification with ClientMethods {
  val timeout: Timeout = 20.seconds
  "WS" should {
    "be fooled by dnschef" in {
      val rawConfig = play.api.Configuration(ConfigFactory.parseString(
        """
          |ws.ssl.loose.disableHostnameVerification=true
        """.stripMargin))

      val client = createClient(rawConfig)
      val response = await(client.url("https://www.howsmyssl.com").get())(timeout)
      response.status must be_==(200)
      response.body must contain("Play Framework")
    }
  }
}

Run the test:

1
2
3
4
5
6
7
8
9
10
11
[ssltest] $ testOnly HowsMySSLSpec
[info] HowsMySSLSpec
[info]
[info] WS should
[info] + be fooled by dnschef
[info]
[info] Total for specification HowsMySSLSpec
[info] Finished in 9 seconds, 675 ms
[info] 1 example, 0 failure, 0 error
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 12 s, completed Mar 31, 2014 6:08:50 PM

It works! We have fooled WS into setting up a TLS connection with a different host, one that we have control over. If we were evil, we could then proxy https://playframework.com to the intended URL, and save off all the content or inject fake data.

Let’s try Apache HttpClient 3.x:

1
2
3
4
5
6
7
8
9
10
11
12
name := "httpclienttest"

version := "1.0-SNAPSHOT"

libraryDependencies ++= Seq(
    "commons-httpclient" % "commons-httpclient" % "3.1",
    "org.specs2" %% "specs2" % "2.3.10" % "test"
)

scalacOptions in Test ++= Seq("-Yrangepos")

resolvers ++= Seq("snapshots", "releases").map(Resolver.sonatypeRepo)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.commons.httpclient.HttpClient
import org.apache.commons.httpclient.methods.GetMethod
import org.specs2.mutable.Specification

class HttpClientSpec extends Specification {
  "HTTPClient" should {
    "do something" in {
      val httpclient = new HttpClient()
      val httpget = new GetMethod("https://www.howsmyssl.com/")
      try {
        httpclient.executeMethod(httpget)
        //val line = httpget.getResponseBodyAsString
        //line must not contain("Play Framework")
        httpget.getStatusCode must not be_==(200)
      } finally {
        httpget.releaseConnection()
      }
    }
  }
}

Running this gives:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[info] HttpClientSpec
[info]
[info] HTTPClient should
[info] x do something
[error]    '200' is equal to '200' (HttpClientSpec.scala:14)
[info]
[info]
[info] Total for specification HttpClientSpec
[info] Finished in 18 ms
[info] 1 example, 1 failure, 0 error
[error] Failed: Total 1, Failed 1, Errors 0, Passed 0
[error] Failed tests:
[error]     HttpClientSpec
[error] (test:test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 4 s, completed Mar 31, 2014 8:26:46 PM

Nope. HttpClient 3.x was retired in 2007, but any code that’s still using it under the hood is vulnerable to this attack.

Try this on your own code and see what it does. I’ll bet it’ll be interesting.

Next

Odds and ends I couldn’t cover elsewhere. And then best practices, and summing things up.