Akka Clustering, Step by Step

This blog post shows how an Akka cluster works by walking through an example application in detail.

Introduction

Akka is an excellent toolkit for handling concurrency. The core concept behind Akka is the Actor model: loosely stated, instead of creating an instance of a class and invoking methods on it, i.e.

val foo = new Foo()
foo.doStuff(args)

You create an Actor, and send the actor immutable messages. Those messages get queued through a mailbox, and the actor processes the messages one by one, in order they were received:

val fooActor: ActorRef = actorSystem.actorOf(FooActor.props, "fooActor")
fooActor ! DoStuffMessage(args)

Sending messages to an actor is much better than invoking a method for a number of reasons. First, you can pass around an ActorRef anywhere. You can pass them between threads. You can keep them in static methods. No matter what you do, you can't get the actor into an inconsistent state or call methods in the "wrong" order because the Actor manages its own state in response to those messages. Second, the actor can send messages back to you:

def receive = {
  case DoStuffMessage(args) =>
    sender() ! Success("all good!")
  case _ =>
    sender() ! Failure("not so good.")
}

Message passing means that the usual binary of method calls – either return a value or throw an exception – gets opened up considerably. When you can send the actor any message you like, and the actor can send you any message back (and can send you that message when it gets around to processing it, instead of immediately), then you are not bound by locality any more. The actor that you're sending a message to doesn't have to live on the same JVM that you're on. It doesn't even have to live on the same physical machine. As long as there's a transport capable of getting the message to the actor, it can live anywhere.

This brings us to Akka remoting.

Remoting

Akka remoting works by saying to the actor system either "I want you to create an actor on this remote host":

val ref = system.actorOf(FooActor.props.withDeploy(Deploy(scope = RemoteScope(address))))

or "I want a reference to an existing actor on the remote host":

val remoteFooActor = context.actorSelection("akka.tcp://[email protected]:2552/user/fooActor")

After calling the actor, messages are sent to the remote server using Protocol Buffers for serialization, and reconstituted on the other end. This is great for peer to peer communication (it already beats RMI), but remoting is too specific in some ways – it points to a unique IP address, and really we'd like actors to just live out there "in the cloud". This is where Akka clustering comes in.

Clustering allows you to create an actor somewhere on a cluster consisting of nodes which all share the same actor system, without knowing exactly which node it is on. Other machines can join and leave the cluster at run time.

We'll use the akka-sample-cluster-app Activator template as a reference, and walk through each step of the TransformationApp application, showing how to run it and how it communicates.

Installation

Ideally, you should download Typesafe Activator, start it up with "activator ui" and search for "Akka Cluster Samples with Scala" in the field marked "Filter Templates". From there, Activator can download the template and provide you with a friendly UI and tutorial.

If not, all the code snippets have links back to the source code on Github, so you can clone or copy the files directly.

Clustering

The first step in Akka clustering is the library dependencies and the akka configuration. The build.sbt file used in akka-sample-cluster-app is complex, but the only bit you need to care about is the akka-cluster library:

libraryDependencies ++= Seq("com.typesafe.akka" %% "akka-cluster" % "2.3.3")

The akka configuration is relatively simple:

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551",
      "akka.tcp://[email protected]:2552"]

    auto-down-unreachable-after = 10s
  }
}

You can see there's a custom cluster provider, and that there's a remote section that enables Akka to set up its TCP/IP stack.

There's also a cluster section, which says that there are other nodes on ports 2551 and 2552 that should be contacted when the cluster starts up.

Starting Frontend Node

There are two sides to the Transformation application – a frontend node and some backend nodes.

The examples in akka-sample-cluster-scala tend to have a single main method, and start all the nodes inside a single JVM. I think this confuses people, and so the examples are going to be using separate JVMs on the same box instead.

Here's the entire main method for starting up a frontend node in the cluster.

object TransformationFrontend {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
      withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem", config)
    val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")

    val counter = new AtomicInteger
    import system.dispatcher
    system.scheduler.schedule(2.seconds, 2.seconds) {
      implicit val timeout = Timeout(5 seconds)
      (frontend ? TransformationJob("hello-" + counter.incrementAndGet())) onSuccess {
        case result => println(result)
      }
    }
  }
}

The TransformationFrontend object has an interesting construct for its configuration that is worth breaking down.

val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
  withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
  withFallback(ConfigFactory.load())

This statement adds the setting "akka.remote.netty.tcp.port=0" and the setting "akka.cluster.roles = [frontend]" onto the configuration returned from ConfigFactory.load() – which points to application.conf. The end result is the config from application.conf plus the config from all of the text preceding it. This is a quick and simple way to append configuration settings, but the name withFallback can be confusing at first glance.

After that, the class starts the actor system with the "ClusterSystem" name (this is important – note that the seed nodes also have this name):

val system = ActorSystem("ClusterSystem", config)

And then starts up a frontend actor which does some heavy lifting. This is really all that's required for the front end to get going.

val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")

Finally, the main method sets up a scheduler method that asks the frontend actor to transform a job containing "hello-?" every two seconds:

val counter = new AtomicInteger
import system.dispatcher
system.scheduler.schedule(2.seconds, 2.seconds) {
  implicit val timeout = Timeout(5 seconds)
  (frontend ? TransformationJob("hello-" + counter.incrementAndGet())) onSuccess {
    case result => println(result)
  }
}

For development purposes, it's easiest to start the node in Activator:

$ activator
> runMain sample.cluster.transformation.TransformationFrontend

You'll see that the frontend node starts up and complains it can't reach the cluster:

[info] [WARN] [06/25/2014 12:43:27.962] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [Remoting] Tried to associate with unreachable remote address [akka.tcp://[email protected]:2552]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /127.0.0.1:2552
[info] [WARN] [06/25/2014 12:43:27.966] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [Remoting] Tried to associate with unreachable remote address [akka.tcp://[email protected]:2551]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /127.0.0.1:2551

This is fine for now. Once we bring the rest of the nodes up, the frontend node will be happier.

Starting Backend Node

The main method for starting the backend is similar to the frontend:

object TransformationBackend {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
      withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem", config)
    system.actorOf(Props[TransformationBackend], name = "backend")
  }
}

The only thing that's different here is that it has a cluster role of "akka.cluster.roles = [backend]" and starts up a "backend" actor.

Open up two new shells (if you're using iTerm you can use "Command D" to split them easily), and run the following to get the backend nodes up on each of the seed nodes:

$ activator
> runMain sample.cluster.transformation.TransformationBackend 2551
$ activator
> runMain sample.cluster.transformation.TransformationBackend 2552

Once you have the two backend nodes up, you'll see that the frontend actor connects to both of them, decides the cluster is up, and starts transforming strings:

[info] [INFO] [06/25/2014 12:46:21.503] [ClusterSystem-akka.actor.default-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:52860] - Welcome from [akka.tcp://[email protected]:2551]
[info] TransformationResult(HELLO-19)
[info] TransformationResult(HELLO-20)
[info] TransformationResult(HELLO-21)
[info] TransformationResult(HELLO-22)

TransformationFrontend Actor

The frontend actor looks like this:

class TransformationFrontend extends Actor {

  var backends = IndexedSeq.empty[ActorRef]
  var jobCounter = 0

  def receive = {
    case job: TransformationJob if backends.isEmpty =>
      sender() ! JobFailed("Service unavailable, try again later", job)

    case job: TransformationJob =>
      jobCounter += 1
      backends(jobCounter % backends.size) forward job

    case BackendRegistration if !backends.contains(sender()) =>
      context watch sender()
      backends = backends :+ sender()

    case Terminated(a) =>
      backends = backends.filterNot(_ == a)
  }
}

It's clear what it does. When it receives TransformationJob, it forwards it to one of the backend actors, round-robin style. If there aren't any backend actors, it sends back JobFailed. When it gets a BackendRegistration message, it tells the context to watch the sender's ActorRef (this tells the context to notify the frontend actor if the sender has died, by sending the frontend actor a Terminated message) and then adds the sender to its internal list of backend actors. And then finally, if one of the backend actors has died, the front end actor removes it from the list.

So far, so good. The interesting bit here is that the frontend has no idea what backend actors are available – it has to receive a message from the backend explicitly to let it know. Let's see how the backend actor does that.

TransformationBackend Actor

Here's the full class of the TransformationBackend actor:

class TransformationBackend extends Actor {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, MemberUp
  // re-subscribe when restart
  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase)
    case state: CurrentClusterState =>
      state.members.filter(_.status == MemberStatus.Up) foreach register
    case MemberUp(m) => register(m)
  }

  def register(member: Member): Unit =
    if (member.hasRole("frontend"))
      context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
        BackendRegistration
}

This is more interesting. The backend actor is cluster aware in a way that the front end actor is not:

val cluster = Cluster(context.system)
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
override def postStop(): Unit = cluster.unsubscribe(self)

When the actor first starts up, it subscribes itself to the cluster, telling the cluster to send it CurrentClusterState and MemberUp events:

def receive = {
  case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase)
  case state: CurrentClusterState =>
    state.members.filter(_.status == MemberStatus.Up) foreach register
  case MemberUp(m) => register(m)
}

And when it gets those messages saying that the cluster is up, it calls register, which looks up the frontend actor and sends it the BackendRegistration message.

def register(member: Member): Unit =
  if (member.hasRole("frontend"))
    context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
      BackendRegistration

After that, the front end can send it TransformationJob messages, and it can respond to the frontend's requests with TransformationResult(text.toUpperCase)

Conclusion

That's it! That's clustering in a nutshell.

Comments