Terse Systems

Exposing Akka Actor State With JMX

| Comments

I’ve published an activator template of how to integrate JMX into your Akka Actors. Using this method, you can look inside a running Akka application and see exactly what sort of state your actors are in. Thanks to Jamie Allen for the idea in his book, Effective Akka.

Running

Start up Activator with the following options:

1
2
3
export JAVA_OPTS="-XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:FlightRecorderOptions=samplethreads=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9191 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost"
export java_opts=$JAVA_OPTS
activator "runMain jmxexample.Main"

Then in another console, start up your JMX tool. In this example, we are using :

1
jmc
  1. Using Java Mission Control, connect to the NettyServer application listed in the right tree view.
  2. Go to “MBean Server” item in the tree view on the right.
  3. Click on “MBean Browser” in the second tab at the bottom.
  4. Open up the “jmxexample” tree folder, then “GreeterMXBean”, then “/user/master”. You’ll see the attributes on the right.
  5. Hit F5 a lot to refresh.

You should see this:

Example of JMC with jmxexample MXBean

Creating an MXBean with an External View Class

Exposing state through JMX is easy, as long as you play by the rules: always use an MXBean (which does not require JAR downloads over RMI), always think about thread safety when exposing internal variables, and always create a custom class that provides a view that the MXBean is happy with.

Here’s a trait that exposes some state, GreetingHistory. As long as the trait ends in “MXBean”, JMX is happy. It will display the properties defined in that trait.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * MXBean interface: this determines what the JMX tool will see.
 */
trait GreeterMXBean {

  /**
   * Uses composite data view to show the greeting history.
   */
  def getGreetingHistory: GreetingHistory

  /**
   * Uses a mapping JMX to show the greeting history.
   */
  def getGreetingHistoryMXView: GreetingHistoryMXView
}

Here’s the JMX actor that implements the GreeterMXBean interface. Note that the only thing it does is receive a GreeterHistory case class, and then renders it. There is a catch, however: because the greetingHistory variable is accessed both through Akka and through a JMX thread, it must be declared as volatile so that memory access is atomic.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
 * The JMX view into the Greeter
 */
class GreeterMXBeanActor extends ActorWithJMX with GreeterMXBean {

  // @volatile needed because JMX and the actor model access from different threads
  @volatile private[this] var greetingHistory: Option[GreetingHistory] = None

  def receive = {
    case gh: GreetingHistory =>
      greetingHistory = Some(gh)
  }

  def getGreetingHistory: GreetingHistory = greetingHistory.orNull

  def getGreetingHistoryMXView: GreetingHistoryMXView = greetingHistory.map(GreetingHistoryMXView(_)).orNull

  // Maps the MXType to this actor.
  override def getMXTypeName: String = "GreeterMXBean"
}

The actor which generates the GreetingHistory case class — the state that you want to expose — should be a parent of the JMX bean, and have a supervisor strategy that can handle JMX exceptions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
trait ActorJMXSupervisor extends Actor with ActorLogging {

  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case e: JMRuntimeException =>
        log.error(e, "Supervisor strategy STOPPING actor from errors during JMX invocation")
        Stop
      case e: JMException =>
        log.error(e, "Supervisor strategy STOPPING actor from incorrect invocation of JMX registration")
        Stop
      case t =>
        // Use the default supervisor strategy otherwise.
        super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate)
    }
}

class Greeter extends Actor with ActorJMXSupervisor {

  private[this] var greeting: String = ""

  private[this] val jmxActor = context.actorOf(Props(classOf[GreeterMXBeanActor]), "jmx")

  def receive: Receive = {
    case WhoToGreet(who) =>
      greeting = s"hello, $who"
    case Greet =>
      sender ! Greeting(greeting) // Send the current greeting back to the sender
    case GreetingAcknowledged =>
      // Update the JMX actor.
      val greetingHistory = GreetingHistory(new java.util.Date(), greeting, sender())
      jmxActor ! greetingHistory
  }
}

And finally, the raw GreetingHistory case class looks like this:

1
2
3
4
case class GreetingHistory(lastGreetedDate: java.util.Date,
                           greeting: String,
                           sender: ActorRef,
                           randomSet:Set[String] = Set("1", "2", "3"))

This is a fairly standard Scala case class, but JMX doesn’t know what to do with it. From the Open MBean Data Types chapter of the [JMX Tutorial], the only acceptable values are:

  • java.lang.Boolean
  • java.lang.Byte
  • java.lang.Character
  • java.lang.Short
  • java.lang.Integer
  • java.lang.Long
  • java.lang.Float
  • java.lang.Double
  • java.lang.String
  • java.math.BigInteger
  • java.math.BigDecimal
  • javax.management.ObjectName
  • javax.management.openmbean.CompositeData (interface)
  • javax.management.openmbean.TabularData (interface)

Fortunately, it’s easy to map from a case class to a view class. Here’s how to display GreetingHistory using a view class for JMX, using ConstructorProperties and BeanProperties to produce a JavaBean in the format that JMX expects. Also note that Set is not visible through JMX, and JavaConverters cannot be used here to convert to java.util.Set, because it does not do a structural copy. Instead, a structural copy must be done to create a Java Set without the wrapper:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * The custom MX view class for GreetingHistory.  Private so it can only be
 * called by the companion object.
 */
class GreetingHistoryMXView @ConstructorProperties(Array(
  "lastGreetingDate",
  "greeting",
  "sender",
  "randomSet")
) private(@BeanProperty val lastGreetingDate: java.util.Date,
          @BeanProperty val greeting: String,
          @BeanProperty val sender: String,
          @BeanProperty val randomSet:java.util.Set[String])

/**
 * Companion object for the GreetingHistory view class.  Takes a GreetingHistory and
 * returns GreetingHistoryMXView.
 */
object GreetingHistoryMXView {
  def apply(greetingHistory: GreetingHistory): GreetingHistoryMXView = {
    val lastGreetingDate: java.util.Date = greetingHistory.lastGreetedDate
    val greeting: String = greetingHistory.greeting
    val actorName: String = greetingHistory.sender.path.name
    val randomSet = scalaToJavaSetConverter(greetingHistory.randomSet)
    new GreetingHistoryMXView(lastGreetingDate, greeting, actorName, randomSet)
  }

  // http://stackoverflow.com/a/24840520/5266
  def scalaToJavaSetConverter[T](scalaSet: Set[T]): java.util.Set[String] = {
    val javaSet = new java.util.HashSet[String]()
    scalaSet.foreach(entry => javaSet.add(entry.toString))
    javaSet
  }
}

Creating In Place JMX views with CompositeDataView

Using a view class is the recommended way to display Scala data in JMX, as it’s relatively simple to set up and can be packaged outside of the main class. However, it is possible to embed the JMX logic inside the case class itself, using an in place CompositeDataView.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
case class GreetingHistory(@BeanProperty lastGreetedDate: java.util.Date,
                           @BeanProperty greeting: String,
                           sender: ActorRef,
                           randomSet:Set[String] = Set("1", "2", "3")) extends CompositeDataView {

  /**
   * Converts the GreetingHistory into a CompositeData object, including the "sender" value.
   */
  override def toCompositeData(ct: CompositeType): CompositeData = {
    import scala.collection.JavaConverters._

    // Deal with all the known properties...
    val itemNames = new ListBuffer[String]()
    itemNames ++= ct.keySet().asScala

    val itemDescriptions = new ListBuffer[String]()
    val itemTypes = new ListBuffer[OpenType[_]]()
    for (item <- itemNames) {
      itemDescriptions += ct.getDescription(item)
      itemTypes += ct.getType(item)
    }

    // Add the sender here, as it doesn't correspond to a known SimpleType...
    itemNames += "sender"
    itemDescriptions += "the sender"
    itemTypes += SimpleType.STRING

    val compositeType = new CompositeType(ct.getTypeName,
      ct.getDescription,
      itemNames.toArray,
      itemDescriptions.toArray,
      itemTypes.toArray)

    // Set up the data in given order explicitly.
    val data = Map(
      "lastGreetedDate" -> lastGreetedDate,
      "greeting" -> greeting,
      "sender" -> sender.path.name
    ).asJava

    val compositeData = new CompositeDataSupport(compositeType, data)
    require(ct.isValue(compositeData))

    compositeData
  }
}

This is messier than using a view, and does not really give you any more programmatic control. It does, however, minimize the number of types that need to be created.

Finally, the type which registers the JMX Actor with JMX is defined here:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
trait ActorWithJMX extends Actor {
  import jmxexample.AkkaJmxRegistrar._

  val objName = new ObjectName("jmxexample", {
    import scala.collection.JavaConverters._
    new java.util.Hashtable(
      Map(
        "name" -> self.path.toStringWithoutAddress,
        "type" -> getMXTypeName
      ).asJava
    )
  })

  def getMXTypeName : String

  override def preStart() = mbs.registerMBean(this, objName)

  override def postStop() = mbs.unregisterMBean(objName)
}

The MXTypeName is defined by the implementing class, and the actor is defined with its path name and registered in the preStart method when the actor is instantiated.

Note that because this trait extends preStart and postStop, any actor implementing this trait needs to explicitly call super.preStart and super.postStop when overriding, to preserve this behavior.

Future Directions

There’s a number of things that can be done with JMX, particularly if macros are involved. Actors are shown here because they are notoriously dynamic, but any part of your system can be similarly instrumented to expose their state in a running application.

You may also be interested in:

  • JAJMX, a high level JMX API designed for Scala.
  • Jolokia, a web interface for JMX.
  • jmxtrans, a JMX to JSON conversion library.

Composing Dependent Futures

| Comments

This blog post is adapted from a lightning talk I gave at NetflixOSS, Season 2, Episode 2.

I’ve noticed that when the word “reactive” is mentioned, it tends not to be associated with any code. One of the things that “reactive” means is “non-blocking” code. “Non blocking” means the idea that you can make a call, and then go on and do something else in your program until you get a notification that something happened.

There are a number of frameworks which handle the notification — the idea that a response may not happen immediately — in different ways. Scala has the option of using a couple of different non-blocking mechanisms, and I’m going to go over how they’re used and some interestin wrinkles when they are composed together.

Futures

Scala uses scala.concurrent.Future as the basic unit of non-blocking access.

The best way I’ve found to think of a Future is a box that will, at some point, contain the thing that you want. The key thing with a Future is that you never open the box. Trying to force open the box will lead you to blocking and grief. Instead, you put the Future in another, larger box, typically using the map method.

Here’s an example of a Future that contains a String. When the Future completes, then Console.println is called:

1
2
3
4
5
6
7
8
9
10
11
12
object Main {
  import scala.concurrent.Future
  import scala.concurrent.ExecutionContext.Implicits.global
  def main(args:Array[String]) : Unit = {
    val stringFuture: Future[String] = Future.successful("hello world!")
    stringFuture.map { someString =>
      // if you use .foreach you avoid creating an extra Future, but we are proving
      // the concept here...
      Console.println(someString)
    }
  }
}

Note that in this case, we’re calling the main method and then… finishing. The string’s Future, provided by the global ExecutionContext, does the work of calling Console.println. This is great, because when we give up control over when someString is going to be there and when Console.println is going to be called, we let the system manage itself. In constrast, look what happens when we try to force the box open:

1
2
val stringFuture: Future[String] = Future.successful("hello world!")
val someString = Future.await(future)

In this case, we have to wait — keep a thread twiddling its thumbs — until we get someString back. We’ve opened the box, but we’ve had to commandeer the system’s resources to get at it.

Event Based Systems with Akka

When we talk about reactive systems in Scala, we’re talking about event driven systems, which typically means Akka. When we want to get a result out of Akka, there are two ways we can do it. We can tell — fire off a message to an actor:

1
fooActor ! GetFoo(1)

and then rely on fooActor to send us back a message:

1
2
3
4
def receive = {
  case GetFoo(id) =>
    sender() ! Foo(id)
}

This has the advantage of being very simple and straightforward.

You also have the option of using ask, which will generate a Future:

1
val fooFuture: Future[Foo] = fooActor ? GetFoo(1)

When the actor’s receive method sends back Foo(id) then the Future will complete. If you want to go the other way, from a Future to an Actor, then you can use pipeTo:

1
2
3
Future.successful {
  Foo(1)
} pipeTo actorRef

tell is usually better than ask, but there are nuances to Akka message processing. I recommend Three flavours of request-response pattern in Akka and Ask, Tell and Per Request Actors for a more detailed analysis of messages, and see the Akka documentation.

The important caveat is that not all systems are Akka-based. If you’re talking to a NoSQL store like Redis or Cassandra, odds are that you are using a non-blocking driver that uses Future directly. Or you may be using Play, which will allow you to pass along a Future (and thereby avoid “opening the box”) using Action.async:

1
2
3
4
5
def index(): Future[Result] = Action.async {
  Future.successful {
    Ok("hello world!") // 200 HTTP Result
  }
}

What this means, in practice, is that if you’re using a system which is not based around Akka Actors, and you’re not using a stream based API such as Iteratees / Reactive Streams, then most of the time you hear about “reactive” and “non-blocking”, you’re going to be looking at Future. And you’re going to pass that Future as far along the stack as you can, because you want to avoid opening that box. Which brings us to dependent futures.

Dependent Futures

Assume a service (in Scala, trait means roughly the same as interface in Java) that goes and gets data:

1
2
3
trait FooService {
  def find(fooId:FooID): Option[Foo]
}

This service returns an Option[T]. Option is another “wrapper” type, which only has two possible values — for Option[Foo], you’ll get back a Some(Foo(1)) if Foo(1) exists, or None if Foo(1) wasn’t found. Using Option means that we don’t have to have null checks all over the place:

1
2
3
4
val foo:Foo = fooService.find(fooId)
if (foo != null) { // WITHOUT OPTION
  Console.println("Foo is " + foo)
}

Using the Option.map method, we can safely get at the value only if it exists:

1
2
3
4
val maybeFoo:Option[Foo] = fooService.find(fooId)
maybeFoo.map { foo => // WITH OPTION
  Console.println("Foo is " + foo)
}

You can see that both Future and Option work on the same principle: you have a type which contains another type, which you can only get at under certain conditions.

But FooService isn’t non-blocking. If we assume that there’s a non-blocking source of data behind the hood, we can do this:

1
def find(fooId:FooID): Future[Option[Foo]]

And now we can do the following:

1
2
3
4
5
fooService.find(fooId).map { maybeFoo =>
  maybeFoo.map { foo =>
    Console.println("Foo is " + foo)
  }
}

Now, the interesting thing is that we can’t compose a Future with an Option. If we had a Future in a Future then we can flatten it and get back a single Future, and if we had an Option in an Option we could flatten it and get back a single Option, but we can’t flatten a Future[Option[T]]. That means we can’t do this:

1
2
3
fooService.find(fooId).flatMap { foo =>
  Console.println("Foo is " + foo)
}

This turns out to be a problem.

So, let’s add a couple more services to the mix, following the model of FooService:

1
2
3
4
5
6
trait BarService {
  def find(barId:BarID) : Future[Option[Bar]]
}
trait QuuxService {
  def find(quuxId:QuuxID) : Future[Option[Quux]]
}

Assuming that all these services return independent futures, you can do the following:

1
2
3
val fooFuture = fooService.find(FooID(1))
val barFuture = barService.find(BarID(2))
val quuxFuture = quuxService.find(QuuxID(3))

At the end of the code block, there will be three different futures, with potentially three different threads running on them in parallel. This is the model that you’ll see in a number of Future tutorials.

Unfortunately, it doesn’t always work out like that. Often times, you’ll want something that looks roughly like:

1
2
3
val fooFuture = fooService.find(1)
val barFuture = barService.find(foo.barId) // where is foo?
val quuxFuture = quuxService.find(bar.quuxId) // where is bar?

In this example, each service needs the resolution of the previous Future in order to run. This is not quite as ideal, but it’s still better than blocking.

So, how do you do this?

The Obvious Solution

Well, given the constraints, the most immediate way is:

1
2
3
4
5
6
7
8
9
10
11
12
13
fooService.get(1).map { maybeFoo =>
  maybeFoo.map { foo =>
    barService.get(foo.barId).map { maybeBar =>
      maybeBar.map { bar =>
        quuxService.get(bar.quuxId).map { maybeQuux =>
          maybeQuux.map { quux =>
            Console.println("Quux = " + quux)
          }
        }
      }
    }
  }
}

I think we can all agree this is not very fun.

For Comprehensions

There are various different things we can try to make this better. First, we can try “for comprehensions”, a piece of useful syntactic sugar that can do wonderful things with map and flatMap.

In this case though, because Future and Option don’t compose, the fallback is nested for comprehensions:

1
2
3
for (maybeFoo <- fooService.find(1)) yield {
  for (foo <- maybeFoo) yield ...
}

Although, Christopher Hunt points out that you can do this:

1
2
3
4
5
6
def : Future[Option[Bar]] = {
  for {
    Some(foo) <- fooService.find(1)
    maybeBar <- barService.find(foo.barId)
  } yield maybeBar
}

Which is much neater.

Scala Async

Let’s try something else. Here’s Scala Async. Instead of using future.map, you use an async block, and get the result back immediately:

1
2
3
4
5
6
async {
   val maybeFoo:Option[Foo] = await(fooService.find(1))
   maybeFoo.flatMap { foo =>
     val bar = await(barService.find(foo.barId))
   }
}

It turns out this won’t compile! The reason why is the flatMap — the async documentation has a note saying “await must not be used inside a closure nested within an async block”, and so the nested await call fails to fall within the same async block.

However, there are some simple things you can do, that do work.

Small Methods

The simplest thing you can do is to break your code into very small methods, and break them up:

1
def foo2bar(futureOptionFoo:Future[Option[Foo]]) : Future[Option[Bar]]

This helps you avoid nesting, and lets you be very explicit about what types you are working with.

Flatten

Another option is to go ahead and nest everything, then use this neat trick:

1
2
3
implicit def flatten[A](fofoa: Future[Option[Future[Option[A]]]]): Future[Option[A]] = {
  fofoa.flatMap(_.getOrElse(Future.successful(None)))
}

While you can’t flatten Future and Option together, you can take a Future[Option[T]] where T is also Future[Option[T]] and flatten those together. Credit goes to Jason Zaugg for this one.

Loan Pattern

We can also use the loan pattern to pass around blocks, and immediately return in the case where we see None. In this case, we’re returning a Play Result object, which is NotFound (404) if anything returns None:

1
2
3
4
5
6
7
8
def getFuture[T](futureOptionBlock: Future[Option[T]])(foundBlock: (T => Future[Result])): Future[Result] = {
  futureOptionBlock.flatMap {
    case Some(found) =>
      foundBlock(found)
    case None =>
      Future.successful(NotFound)
  }
}

And this gives us:

1
2
3
4
5
6
7
8
9
10
11
def index: Future[Result] = Action.async {
  getFuture(fooService.find(1)) { foo =>
    getFuture(barService.find(foo.barId)) { bar =>
      getFuture(quuxService.find(bar.quuxId)) { quux =>
        Future.successful {
          Ok("Quux = " + quux)
        }
      }
    }
  }
}

If you cannot shortcut the Option, then you can call option.map(win).orElse(fail), or option.fold(fail, win) to process both the failure and success branches.

OptionT

The ultimate answer is to implement a monad transformer called OptionT to specifically compose Future with Option. Francois Garillot has written up a step by step blog post addressing how to implement OptionT in Scala.

More Reading

I recommend Future goodies and helpers: SafeFuture, TimeoutFuture, CancelableFuture if you want to do more with how Future exceptions are logged, how Future may timeout, and even cancelling(!) a Future.

Conclusion

We’ve shown how Akka and Scala provide non-blocking, asynchronous code using Future and akka message passing, and how to process Future based code without calling await or getting into deeply nested code. Hope this helps!

Play TLS Example With Client Authentication

| Comments

This is part of a series of posts about setting up Play WS as a TLS client for a “secure by default” setup.

Previous posts are:

This post is where the rubber meets the road — an actual, demonstrable activator template that shows off the WS SSL, provides the scripts for certificate generation, and provides people with an out of the box TLS 1.2 using ECDSA certificates.

Want to download it? Go to https://github.com/typesafehub/activator-play-tls-example or clone it directly:

1
git clone https://github.com/typesafehub/activator-play-tls-example.git

It’s an activator template, so you can also install it from inside Typesafe Activator by searching for “TLS”.

Be sure to read the README. This project is as lightweight as possible, but takes a little configuration to get started.

Certificate Generation

The biggest part of any demo application is setting up the scripts. I didn’t find anything that was really hands free, so I wrote my own. They are exactly the same as the ones described Certificate Generation section of the manual.

There’s various shortcuts that you can use for defining X.509 certificates, but I found it a lot more useful to go through the work of setting up the root CA certificate, defining the server certificate as having an EKU of “serverAuth” and so on.

Play Script

The actual script to run Play with all the required JVM options is… large. Part of this is the documentation on every possible feature, but sadly, there are far too many lines which are “best practices” that are very rarely practiced.

Also, the note about rh.secure is a reference to the RequestHeader class in Play itself. Ironically, even when we set HTTPS up on the server, Play itself can’t tell the protocol it’s running on without help.

I will admit to being gleefully happy at setting disabledAlgorithms.properties on startup, so that at last AlgorithmConstraints is enabled on the server:

1
2
jdk.tls.disabledAlgorithms=RSA keySize < 2048, DSA keySize < 2048, EC keySize < 224
jdk.certpath.disabledAlgorithms=MD2, MD4, MD5, RSA keySize < 2048, DSA keySize < 2048, EC keySize < 224

CustomSSLEngineProvider

The CustomSSLEngineProvider is responsible for Play’s HTTPS server. More details can be found in Configuring HTTPS.

Setting up an SSLEngineProvider with client authentication is pretty easy, once you know the magic incantations needed to get the trust managers and the key managers set up. After that, it’s a question of ensuring that the SSLEngine knows how trusting it should be.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
override def createSSLEngine(): SSLEngine = {
  val sslContext = createSSLContext(appProvider)

  // Start off with a clone of the default SSL parameters...
  val sslParameters = sslContext.getDefaultSSLParameters

  // Tells the server to ignore client's cipher suite preference.
  // http://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#cipher_suite_preference
  sslParameters.setUseCipherSuitesOrder(true)

  // http://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html#SSLParameters
  val needClientAuth = java.lang.System.getProperty("play.ssl.needClientAuth")
  sslParameters.setNeedClientAuth(java.lang.Boolean.parseBoolean(needClientAuth))

  // Clone and modify the default SSL parameters.
  val engine = sslContext.createSSLEngine
  engine.setSSLParameters(sslParameters)

  engine
}

Connecting to the server with Play WS

Setting up the Play client was pretty easy, but it’s worth repeating that Play WS can be run outside of an application with the right setup:

1
2
3
4
5
6
7
8
9
10
11
def newClient(rawConfig: play.api.Configuration): WSClient = {
  val classLoader = Thread.currentThread().getContextClassLoader
  val parser = new DefaultWSConfigParser(rawConfig, classLoader)
  val clientConfig = parser.parse()
  clientConfig.ssl.map {
    _.debug.map(new DebugConfiguration().configure)
  }
  val builder = new NingAsyncHttpClientConfigBuilder(clientConfig)
  val client = new NingWSClient(builder.build())
  client
}

The configuration on the ws.conf file was also intentionally strict. The NSA recommends some nice cipher suites Suite B Profile for TLS — the WS client will refuse to talk to the server with anything less than full on TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384 and isn’t going to look at any ECDSA signature less than EC keySize < 384.

Client authentication on the client side was a little trickier than expected, but eventually I remembered that the client should be using a key store containing a trust anchor, and it worked itself out.

Conclusion

I think I’m done with JSSE for now. I was interested in exploits at one point, but the fixes amount to “Upgrade to JDK 1.8, use ECDSA certificates, and TLS 1.2” — all of which are demonstrated in this application.

I may at some point go back and look at HSTS or public key pinning in Play WS, but it really comes down to utility. Many of the use cases of pinning involve browsers or unknown clients. I’ve not heard of any demand for the feature, and it’s unclear that anyone would find it all that useful.

With Play and with these blog posts, I’m very pleased to have written something that people find useful. Thanks.

Akka Clustering, Step by Step

| Comments

This blog post shows how an Akka cluster works by walking through an example application in detail.

Introduction

Akka is an excellent toolkit for handling concurrency. The core concept behind Akka is the Actor model: loosely stated, instead of creating an instance of a class and invoking methods on it, i.e.

1
2
val foo = new Foo()
foo.doStuff(args)

You create an Actor, and send the actor immutable messages. Those messages get queued through a mailbox, and the actor processes the messages one by one, in order they were received:

1
2
val fooActor: ActorRef = actorSystem.actorOf(FooActor.props, "fooActor")
fooActor ! DoStuffMessage(args)

Sending messages to an actor is much better than invoking a method for a number of reasons. First, you can pass around an ActorRef anywhere. You can pass them between threads. You can keep them in static methods. No matter what you do, you can’t get the actor into an inconsistent state or call methods in the “wrong” order because the Actor manages its own state in response to those messages. Second, the actor can send messages back to you:

FooActor.scala
1
2
3
4
5
6
def receive = {
  case DoStuffMessage(args) =>
    sender() ! Success("all good!")
  case _ =>
    sender() ! Failure("not so good.")
}

Message passing means that the usual binary of method calls — either return a value or throw an exception — gets opened up considerably. When you can send the actor any message you like, and the actor can send you any message back (and can send you that message when it gets around to processing it, instead of immediately), then you are not bound by locality any more. The actor that you’re sending a message to doesn’t have to live on the same JVM that you’re on. It doesn’t even have to live on the same physical machine. As long as there’s a transport capable of getting the message to the actor, it can live anywhere.

This brings us to Akka remoting.

Remoting

Akka remoting works by saying to the actor system either “I want you to create an actor on this remote host”:

1
val ref = system.actorOf(FooActor.props.withDeploy(Deploy(scope = RemoteScope(address))))

or “I want a reference to an existing actor on the remote host”:

1
val remoteFooActor = context.actorSelection("akka.tcp://actorSystemName@10.0.0.1:2552/user/fooActor")

After calling the actor, messages are sent to the remote server using Protocol Buffers for serialization, and reconstituted on the other end. This is great for peer to peer communication (it already beats RMI), but remoting is too specific in some ways — it points to a unique IP address, and really we’d like actors to just live out there “in the cloud”. This is where Akka clustering comes in.

Clustering allows you to create an actor somewhere on a cluster consisting of nodes which all share the same actor system, without knowing exactly which node it is on. Other machines can join and leave the cluster at run time.

We’ll use the akka-sample-cluster-app Activator template as a reference, and walk through each step of the TransformationApp application, showing how to run it and how it communicates.

Installation

Ideally, you should download Typesafe Activator, start it up with “activator ui” and search for “Akka Cluster Samples with Scala” in the field marked “Filter Templates”. From there, Activator can download the template and provide you with a friendly UI and tutorial.

If not, all the code snippets have links back to the source code on Github, so you can clone or copy the files directly.

Clustering

The first step in Akka clustering is the library dependencies and the akka configuration. The build.sbt file used in akka-sample-cluster-app is complex, but the only bit you need to care about is the akka-cluster library:

build.sbt
1
libraryDependencies ++= Seq("com.typesafe.akka" %% "akka-cluster" % "2.3.3")

The akka configuration is relatively simple:

application.conflink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
  }
}

You can see there’s a custom cluster provider, and that there’s a remote section that enables Akka to set up its TCP/IP stack.

There’s also a cluster section, which says that there are other nodes on ports 2551 and 2552 that should be contacted when the cluster starts up.

Starting Frontend Node

There are two sides to the Transformation application — a frontend node and some backend nodes.

The examples in akka-sample-cluster-scala tend to have a single main method, and start all the nodes inside a single JVM. I think this confuses people, and so the examples are going to be using separate JVMs on the same box instead.

Here’s the entire main method for starting up a frontend node in the cluster.

TransformationFrontend.scalalink start:16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
object TransformationFrontend {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
      withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem", config)
    val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")

    val counter = new AtomicInteger
    import system.dispatcher
    system.scheduler.schedule(2.seconds, 2.seconds) {
      implicit val timeout = Timeout(5 seconds)
      (frontend ? TransformationJob("hello-" + counter.incrementAndGet())) onSuccess {
        case result => println(result)
      }
    }
  }
}

The TransformationFrontend object has an interesting construct for its configuration that is worth breaking down.

TransformationFrontend.scalalink start:43
1
2
3
val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
  withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
  withFallback(ConfigFactory.load())

This statement adds the setting “akka.remote.netty.tcp.port=0” and the setting “akka.cluster.roles = [frontend]” onto the configuration returned from ConfigFactory.load() — which points to application.conf. The end result is the config from application.conf plus the config from all of the text preceding it. This is a quick and simple way to append configuration settings, but the name withFallback can be confusing at first glance.

After that, the class starts the actor system with the “ClusterSystem” name (this is important — note that the seed nodes also have this name):

TransformationFrontend.scalalink start:47
1
val system = ActorSystem("ClusterSystem", config)

And then starts up a frontend actor which does some heavy lifting. This is really all that’s required for the front end to get going.

TransformationFrontend.scalalink start:48
1
val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")

Finally, the main method sets up a scheduler method that asks the frontend actor to transform a job containing “hello-?” every two seconds:

TransformationFrontend.scalalink start:50
1
2
3
4
5
6
7
8
val counter = new AtomicInteger
import system.dispatcher
system.scheduler.schedule(2.seconds, 2.seconds) {
  implicit val timeout = Timeout(5 seconds)
  (frontend ? TransformationJob("hello-" + counter.incrementAndGet())) onSuccess {
    case result => println(result)
  }
}

For development purposes, it’s easiest to start the node in Activator:

1
2
$ activator
> runMain sample.cluster.transformation.TransformationFrontend

You’ll see that the frontend node starts up and complains it can’t reach the cluster:

1
2
[info] [WARN] [06/25/2014 12:43:27.962] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [Remoting] Tried to associate with unreachable remote address [akka.tcp://ClusterSystem@127.0.0.1:2552]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /127.0.0.1:2552
[info] [WARN] [06/25/2014 12:43:27.966] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [Remoting] Tried to associate with unreachable remote address [akka.tcp://ClusterSystem@127.0.0.1:2551]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /127.0.0.1:2551

This is fine for now. Once we bring the rest of the nodes up, the frontend node will be happier.

Starting Backend Node

The main method for starting the backend is similar to the frontend:

TransformationBackend.scalalink start:41
1
2
3
4
5
6
7
8
9
10
11
12
object TransformationBackend {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
      withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem", config)
    system.actorOf(Props[TransformationBackend], name = "backend")
  }
}

The only thing that’s different here is that it has a cluster role of “akka.cluster.roles = [backend]” and starts up a “backend” actor.

Open up two new shells (if you’re using iTerm you can use “Command D” to split them easily), and run the following to get the backend nodes up on each of the seed nodes:

1
2
$ activator
> runMain sample.cluster.transformation.TransformationBackend 2551
1
2
$ activator
> runMain sample.cluster.transformation.TransformationBackend 2552

Once you have the two backend nodes up, you’ll see that the frontend actor connects to both of them, decides the cluster is up, and starts transforming strings:

1
2
3
4
5
[info] [INFO] [06/25/2014 12:46:21.503] [ClusterSystem-akka.actor.default-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:52860] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[info] TransformationResult(HELLO-19)
[info] TransformationResult(HELLO-20)
[info] TransformationResult(HELLO-21)
[info] TransformationResult(HELLO-22)

TransformationFrontend Actor

The frontend actor looks like this:

TransformationFrontend.scalalink start:16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TransformationFrontend extends Actor {

  var backends = IndexedSeq.empty[ActorRef]
  var jobCounter = 0

  def receive = {
    case job: TransformationJob if backends.isEmpty =>
      sender() ! JobFailed("Service unavailable, try again later", job)

    case job: TransformationJob =>
      jobCounter += 1
      backends(jobCounter % backends.size) forward job

    case BackendRegistration if !backends.contains(sender()) =>
      context watch sender()
      backends = backends :+ sender()

    case Terminated(a) =>
      backends = backends.filterNot(_ == a)
  }
}

It’s clear what it does. When it receives TransformationJob, it forwards it to one of the backend actors, round-robin style. If there aren’t any backend actors, it sends back JobFailed. When it gets a BackendRegistration message, it tells the context to watch the sender’s ActorRef (this tells the context to notify the frontend actor if the sender has died, by sending the frontend actor a Terminated message) and then adds the sender to its internal list of backend actors. And then finally, if one of the backend actors has died, the front end actor removes it from the list.

So far, so good. The interesting bit here is that the frontend has no idea what backend actors are available — it has to receive a message from the backend explicitly to let it know. Let’s see how the backend actor does that.

TransformationBackend Actor

Here’s the full class of the TransformationBackend actor:

TransformationBackend.scalalink start:18
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TransformationBackend extends Actor {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, MemberUp
  // re-subscribe when restart
  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase)
    case state: CurrentClusterState =>
      state.members.filter(_.status == MemberStatus.Up) foreach register
    case MemberUp(m) => register(m)
  }

  def register(member: Member): Unit =
    if (member.hasRole("frontend"))
      context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
        BackendRegistration
}

This is more interesting. The backend actor is cluster aware in a way that the front end actor is not:

TransformationBackend.scalalink start:20
1
2
3
val cluster = Cluster(context.system)
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
override def postStop(): Unit = cluster.unsubscribe(self)

When the actor first starts up, it subscribes itself to the cluster, telling the cluster to send it CurrentClusterState and MemberUp events:

TransformationBackend.scalalink start:27
1
2
3
4
5
6
def receive = {
  case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase)
  case state: CurrentClusterState =>
    state.members.filter(_.status == MemberStatus.Up) foreach register
  case MemberUp(m) => register(m)
}

And when it gets those messages saying that the cluster is up, it calls register, which looks up the frontend actor and sends it the BackendRegistration message.

TransformationBackend.scalalink start:34
1
2
3
4
def register(member: Member): Unit =
  if (member.hasRole("frontend"))
    context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
      BackendRegistration

After that, the front end can send it TransformationJob messages, and it can respond to the frontend’s requests with TransformationResult(text.toUpperCase)

Conclusion

That’s it! That’s clustering in a nutshell.

Writing an SBT Plugin

| Comments

One of the things I like about SBT is that it’s interactive. SBT stays up as a long running process, and you interact with it many times, while it manages your project and compiles code for you.

Because SBT is interactive and runs on the JVM, you can use it for more than just builds. You can use it for communication. Specifically, you can use it to make HTTP requests out to things you’re interested in communicating with.

Unfortunately, I knew very little about SBT plugins. So, I talked to Christopher Hunt and Josh Suereth, downloaded eigengo’s sbt-mdrw project, read the activator blog post on markdown and then worked it out on the plane back from Germany.

I made a 0.13 SBT plugin that uses the ROME RSS library to display titles from a list of RSS feeds. It’s available from https://github.com/wsargent/sbt-rss and has lots of comments.

The SBT RSS plugin adds a single command to SBT. You type rss at the console, and it displays the feed:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
> rss
[info] Showing http://typesafe.com/blog/rss.xml
[info]      Title = The Typesafe Blog
[info]      Published = null
[info]      Most recent entry = Scala Days Presentation Roundup
[info]      Entry updated = null
[info] Showing http://letitcrash.com/rss
[info]      Title = Let it crash
[info]      Published = null
[info]      Most recent entry = Reactive Queue with Akka Reactive Streams
[info]      Entry updated = null
[info] Showing https://github.com/akka/akka.github.com/commits/master/news/_posts.atom
[info]      Title = Recent Commits to akka.github.com:master
[info]      Published = Thu May 22 05:51:21 EDT 2014
[info]      Most recent entry = Fix fixed issue list.
[info]      Entry updated = Thu May 22 05:51:21 EDT 2014

Let’s show how it does that.

First, the build file. This looks like a normal build.sbt file, except that there’s a sbtPlugin setting in it:

build.sbtlink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// this bit is important
sbtPlugin := true

organization := "com.typesafe.sbt"

name := "sbt-rss"

version := "1.0.0-SNAPSHOT"

scalaVersion := "2.10.4"

scalacOptions ++= Seq("-deprecation", "-feature")

resolvers += Resolver.sonatypeRepo("snapshots")

libraryDependencies ++= Seq(
  // RSS fetcher (note: the website is horribly outdated)
  "com.rometools" % "rome-fetcher" % "1.5.0"
)

publishMavenStyle := false

/** Console */
initialCommands in console := "import com.typesafe.sbt.rss._"

Next, there’s the Plugin scala code itself.

SbtRss.scalalink
1
2
3
object SbtRss extends AutoPlugin {
   // stuff
}

So, the first thing to note is the AutoPlugin class. The Plugins page talks about AutoPlugin — all you really need to know is if you define an autoImport object with your setting keys and then import it into an AutoPlugin, you will make the settingKey available to SBT.

The next bit is the globalSettings entry:

SbtRss.scalalink
1
2
3
override def globalSettings: Seq[Setting[_]] = super.globalSettings ++ Seq(
  Keys.commands += rssCommand
)

Here, we’re saying we’re going to add a command to SBT’s global settings, by merging it with super.globalSettings.

The next two bits detail how to create the RSS command in SBT style.

SbtRss.scalalink
1
2
3
4
5
/** Allows the RSS command to take string arguments. */
private val args = (Space ~> StringBasic).*

/** The RSS command, mapped into sbt as "rss [args]" */
private lazy val rssCommand = Command("rss")(_ => args)(doRssCommand)

Finally, there’s the command itself.

SbtRss.scalalink
1
2
3
4
5
def doRssCommand(state: State, args: Seq[String]): State = {
  // do stuff

  state
}

The first thing we need to do within a command is call Project.extract(state). This gives us a bunch of useful settings such as currentRef, which we can use to pull the value of the SettingKey out. The SBT documentation on Build State – Project related data shows some more examples:

SbtRss.scalalink
1
2
3
4
5
// Doing Project.extract(state) and then importing it gives us currentRef.
// Using currentRef allows us to get at the values of SettingKey.
// http://www.scala-sbt.org/release/docs/Build-State.html#Project-related+data
val extracted = Project.extract(state)
import extracted._

Once we have the extracted.currentRef object, we can pull out the list of URLs with this construct, where the documentation is from Build State – Project data:

SbtRss.scalalink
1
val currentList = (rssList in currentRef get structure.data).get

And then we can put that together with the ROME library to print something out.

SbtRss.scalalink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.typesafe.sbt.rss

import sbt._
import Keys._
import sbt.complete.Parsers._

import java.net.URL
import com.rometools.fetcher._
import com.rometools.fetcher.impl._
import com.rometools.rome.feed.synd._

import scala.util.control.NonFatal

/**
 * An autoplugin that displays an RSS feed.
 */
object SbtRss extends AutoPlugin {

  /**
   * Sets up the autoimports of setting keys.
   */
  object autoImport {
    /**
     * Defines "rssList" as the setting key that we want the user to fill out.
     */
    val rssList = settingKey[Seq[String]]("The list of RSS urls to update.")
  }

  // I don't know why we do this.
  import autoImport._

  /**
   * An internal cache to avoid hitting RSS feeds repeatedly.
   */
  private val feedInfoCache = HashMapFeedInfoCache.getInstance()

  /**
   * An RSS fetcher, backed by the cache.
   */
  private val fetcher = new HttpURLFeedFetcher(feedInfoCache)

  /** Allows the RSS command to take string arguments. */
  private val args = (Space ~> StringBasic).*

  /** The RSS command, mapped into sbt as "rss [args]" */
  private lazy val rssCommand = Command("rss")(_ => args)(doRssCommand)

  /**
   * Adds the rssCommand to the list of global commands in SBT.
   */
  override def globalSettings: Seq[Setting[_]] = super.globalSettings ++ Seq(
    Keys.commands += rssCommand
  )

  /**
   * The actual RSS command.
   *
   * @param state the state of the RSS application.
   * @param args the string arguments provided to "rss"
   * @return the unchanged state.
   */
  def doRssCommand(state: State, args: Seq[String]): State = {
    state.log.debug(s"args = $args")

    // Doing Project.extract(state) and then importing it gives us currentRef.
    // Using currentRef allows us to get at the values of SettingKey.
    // http://www.scala-sbt.org/release/docs/Build-State.html#Project-related+data
    val extracted = Project.extract(state)
    import extracted._

    // Create a new fetcher event listener attached to the state -- this gives
    // us a way to log the fetcher events.
    val listener = new FetcherEventListenerImpl(state)
    fetcher.addFetcherEventListener(listener)

    try {
      if (args.isEmpty) {
        // This is the way we get the setting from rssList := Seq("http://foo.com/rss")
        // http://www.scala-sbt.org/release/docs/Build-State.html#Project+data
        val currentList = (rssList in currentRef get structure.data).get
        for (currentUrl <- currentList) {
          val feedUrl = new URL(currentUrl)
          printFeed(feedUrl, state)
        }
      } else {
        for (currentUrl <- args) {
          val feedUrl = new URL(currentUrl)
          printFeed(feedUrl, state)
        }
      }
    } catch {
      case NonFatal(e) =>
        state.log.error(s"Error ${e.getMessage}")
    } finally {
      // Remove the listener so we don't have a memory leak.
      fetcher.removeFetcherEventListener(listener)
    }

    state
  }

  def printFeed(feedUrl:URL, state:State) = {
    // Allows us to do "asScala" conversion from java.util collections.
    import scala.collection.JavaConverters._

    // This is a blocking operation, but we're in SBT, so we don't care.
    val feed = fetcher.retrieveFeed(feedUrl)
    val title = feed.getTitle.trim()
    val publishDate = feed.getPublishedDate
    val entries = feed.getEntries.asScala
    val firstEntry = entries.head

    // The only way to provide the RSS feeds as a resource seems to be to
    // have another plugin extend this one.  The code's small enough that it
    // doesn't seem worth it.
    state.log.info(s"Showing $feedUrl")
    state.log.info(s"\t\tTitle = $title")
    state.log.info(s"\t\tPublished = $publishDate")
    state.log.info(s"\t\tMost recent entry = ${firstEntry.getTitle.trim()}")
    state.log.info(s"\t\tEntry updated = " + firstEntry.getUpdatedDate)
  }

  /**
   * Listens for RSS events.
   *
   * @param state
   */
  class FetcherEventListenerImpl(state:State) extends FetcherListener {
    def fetcherEvent(event:FetcherEvent) = {
      import FetcherEvent._
      event.getEventType match {
        case EVENT_TYPE_FEED_POLLED =>
          state.log.debug("\tEVENT: Feed Polled. URL = " + event.getUrlString)
        case EVENT_TYPE_FEED_RETRIEVED =>
          state.log.debug("\tEVENT: Feed Retrieved. URL = " + event.getUrlString)
        case EVENT_TYPE_FEED_UNCHANGED =>
          state.log.debug("\tEVENT: Feed Unchanged. URL = " + event.getUrlString)
      }
    }
  }
}

This is an intentionally trivial example, but it’s easy to show how you could use this to check if the build failed, for example. Have fun.