Adding Echopraxia to Akka

TL;DR I released echopraxia-plusakka, a library that integrates Echopraxia with Akka's component system, which also resulted in adding a "direct" API to echopraxia based off SLF4J markers.


It's been a minute. After releasing the Scala API for Echopraxia and writing it up, I've been working my way up the chain and trying to exploit/break the API with progressively more demanding use cases.

Akka has been a personal favorite testing ground of mine. Akka is deeply concurrent, and as such using a debugger is nearly pointless – even if you add breakpoints, you'll trip over timeouts if you take too long to return a message. As such there's really only two reliable ways to debug and observe Akka code. Unit tests… and logging.

So. The task I set myself was to add structured logging to Akka. I already had an advantage in that I'm familiar with Akka internals, and in the end it was fairly straightforward with only a couple of surprises.

Akka Logging

Akka's logging depends on an underlying LoggingAdapter which goes through an event bus to akka-slf4j.

The first obstacle to adding structured logging is that the MarkerLoggingAdapter serializes arguments into a String before publishing it to the event bus, using formatN to convert arguments.

class MarkerLoggingAdapter extends BusLogging {
  def error(marker: LogMarker, cause: Throwable, template: String, arg1: Any): Unit =
    if (isErrorEnabled(marker)) 
      bus.publish(Error(cause, logSource, logClass, 
        format1(template, arg1), // returns `String`
        mdc, marker))
}

Because MarkerLoggingAdapter converts arguments to String eagerly, any arguments that pass through Akka's logging will be flattened and can't be retrieved later – there is no Error(msg, arg1) passed through to the event bus and then to Slf4jLogger.

It is still possible to pass through structured data though! Because the MarkerLoggingAdapter passes through LogMarker then using SLF4JLogMarker will pass along an org.slf4j.Marker through to Logback, and we can piggyback information on the way.

This led me to think about using Echopraxia directly against SLF4J.

Direct API

Echopraxia does allow you to log using an org.slf4j.Logger directly for simple cases. For example, arguments work fine:

FieldBuilder fb = FieldBuilder.instance();
org.slf4j.Logger slf4jLogger = org.slf4j.LoggerFactory.getLogger("com.example.Main");
slf4jLogger.info("SLF4J message {}", fb.string("foo", "bar"));

Although as exceptions in SLF4J get "eaten" if they have a template placeholder, if you want to keep the exception, you need to pass it in twice:

slf4jLogger.error("SLF4J exception {}", fb.exception(e), e);

However, conditions and context fields do not exist in the SLF4J API. If we want to use SLF4J, then it's time to fake it with markers.

I added some direct API features to Echopraxia. Using the direct API, context fields can be represented by FieldMarker, and conditions by ConditionMarker. This passes information through to the backend appropriately.

import com.tersesystems.echopraxia.logback.*;
import net.logstash.logback.marker.Markers;

FieldBuilder fb = FieldBuilder.instance();
FieldMarker fields = FieldMarker.apply(
  fb.list(
    fb.string("sessionId", "value"), 
    fb.number("correlationId", 1)
  )
); 
ConditionMarker conditionMarker = ConditionMarker.apply(
  Condition.stringMatch("sessionId", s -> s.raw().equals("value")))
);

logger.info(Markers.aggregate(fieldMarker, conditionMarker), "condition and marker");

This is only half the story though – the condition still needs to be evaluated, and because that doesn't go through an Echopraxia logger, that means adding a ConditionTurboFilter:

<configuration>
  <turboFilter class="com.tersesystems.echopraxia.logback.ConditionTurboFilter"/>
</configuration>

And then also when rendering JSON, we need to swap out the FieldMarker with actual event specific custom fields that logstash-logback-encoder will recognize, using LogstashFieldAppender.

<configuration>
  <!-- ... -->

  <root level="INFO">
    <!-- replaces fields with logstash markers and structured arguments -->
    <appender class="com.tersesystems.echopraxia.logstash.LogstashFieldAppender"> 
      <appender class="ch.qos.logback.core.FileAppender">
        <file>application.log</file>
        <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
      </appender>
    </appender>
  </root>
</configuration>

This… is a hack, and I don't love it. The problem here is that there is no central pipeline for creating and manipulating Logback's LoggingEvent. The turbo filter API will only let you return FilterReply and the actual creation of a LoggingEvent happens internally. So… if you want to tweak the logging event, you have to have an appender transform it, then pass it through to the appender's children. This is the approach used in composite appenders.

This is complicated by Logback not officially supporting appender-ref for appenders themselves. You can add appender-ref from the root:

<configuration>
  <!-- ... -->

  <root level="DEBUG">
    <appender-ref ref="FILE" />
  </root>
</configuration>

but even though it's perfectly legal to have appender children, to add appender-ref on appenders, you need to explicitly loosen the AppenderRefAction to match (which can cause complaints):

<configuration>
  <!-- loosen the rule on appender refs so appenders can reference them -->
  <newRule pattern="*/appender/appender-ref"
    actionClass="ch.qos.logback.core.joran.action.AppenderRefAction"/>

  <!-- ... -->

  <appender name="CONSOLE_AND_FILE" class="com.tersesystems.logback.CompositeAppender">
    <appender-ref ref="CONSOLE"/>
    <appender-ref ref="FILE"/>
  </appender>

  <root level="DEBUG">
    <appender-ref ref="CONSOLE_AND_FILE" />
  </root>

</configuration>

On a tangent, because Logback runs through appenders in sequence in the same thread, it's possible for synchronous appenders to block asynchronous appenders:

<configuration>
  <root level="INFO">
    <!-- this runs first in the executing thread -->
    <appender class="ch.qos.logback.core.FileAppender">
      <!-- ... -->
    </appender>

    <!-- only gets the event after first appender... -->
    <appender class="net.logstash.logback.appender.LoggingEventAsyncDisruptorAppender">
      <!-- ... -->
    </appender>
  </root>
</configuration>

As such, either you have multiple async appenders, or you wrap all the IO appenders inside a disruptor so you only have the overhead of one thread. This means that appenders can really serve three different roles: managing concurrency, event transformation, and IO sinks with encoders.

<configuration>
  <root level="INFO">
    <!-- immediately move off the rendering thread... -->
    <appender class="net.logstash.logback.appender.LoggingEventAsyncDisruptorAppender">
      <!-- ...transform event in pipeline... -->
      <appender class="com.tersesystems.echopraxia.logstash.LogstashFieldAppender">
        <!-- ...render to IO/Network/STDOUT -->

        <appender class="ch.qos.logback.core.ConsoleAppender">
          <encoder>
            <pattern>[%-5level] %logger{15} -  message%n%xException{10}</pattern>
          </encoder>
        </appender>

        <appender class="ch.qos.logback.core.FileAppender">
          <file>application.log</file>
          <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
        </appender>
      </appender>      
    </appender>
  </root>
</configuration>

Anyhoo.

Adding the direct API means that there is a fallback position, but I found that it was still very fiddly. Filters and other features that depend on composing loggers are not available in SLF4J. Aggregating multiple markers is awkward, even leveraging implicit conversion.

The second option was to sidestep the LoggingAdapter altogether and extend Akka's models with a structured logging API. There are two models in Akka: actors and streams, and they each have their own approach.

Field Builders

The first goal was to provide values for Akka components. The plan was to create structured output that would correspond to the toString debug output. But Akka components such as ActorSystem and ActorPath make heavy use of internal APIs that are only accessible under the akka package. Solution: define the package as akka.echopraxia to open up the API.

First, a pure trait so you can provide your mapping:

package akka.echopraxia.actor

trait AkkaFieldBuilder extends FieldBuilder {  
  implicit def byteStringToValue: ToValue[ByteString]
  implicit def addressToValue: ToValue[akka.actor.Address]
  implicit def actorRefToValue: ToValue[akka.actor.ActorRef]
  implicit def actorPathToValue: ToValue[akka.actor.ActorPath]
  implicit def actorSystemToValue: ToValue[akka.actor.ActorSystem]
  // ...
}

and then some default implementations:

trait DefaultAkkaFieldBuilder extends AkkaFieldBuilder {
  override implicit val byteStringToValue: ToValue[ByteString] = bs => ToObjectValue(
    keyValue("length" -> bs.length),
    keyValue("utf8String" -> bs.utf8String)
  )
  
  override implicit val addressToValue: ToValue[akka.actor.Address] = { address =>
    ToValue(address.toString)
  }

  override implicit val actorRefToValue: ToValue[akka.actor.ActorRef] = { actorRef =>
    ToValue(actorRef.path)
  }

  override implicit val actorPathToValue: ToValue[akka.actor.ActorPath] = { actorPath =>
    ToObjectValue(
      keyValue("address" -> actorPath.address),
      keyValue("name" -> actorPath.name),
      keyValue("uid" -> actorPath.uid)
    )
  }

  override implicit def actorSystemToValue: ToValue[akka.actor.ActorSystem] = { actorSystem =>
    ToObjectValue(
      keyValue("system" -> actorSystem.name),
      keyValue("startTime" -> actorSystem.startTime),
    )
  }
  // ...
}

So far, so good.

Akka Actors

There are two Scala APIs for Akka actors, typed actors and "classic" untyped actors. The logging in Akka Typed, and the logging in Akka Classic are a little different, but they both provide additional context in the form of MDC.

Next: create an echopraxia equivalent to ActorLogging. This is a bit complicated, because an echopraxia logger needs a field builder, and that means that an actor has to be able to provide it. That's okay – we can define the field builder requirement by adding AkkaFieldBuilderProvider and DefaultAkkaFieldBuilderProvider:

trait AkkaFieldBuilderProvider {
  type FieldBuilderType <: AkkaFieldBuilder
  protected def fieldBuilder: FieldBuilderType
}

trait DefaultAkkaFieldBuilderProvider extends AkkaFieldBuilderProvider {
  override type FieldBuilderType = DefaultAkkaFieldBuilder.type
  override protected def fieldBuilder: FieldBuilderType = DefaultAkkaFieldBuilder
}

and then use a self type to create a logger using the given field builder:

package akka.echopraxia.actor

trait ActorLogging {
  this: Actor with AkkaFieldBuilderProvider =>
  protected val log: Logger[FieldBuilderType] = ...
}

And that opens the door to actors with an Echopraxia logger:

trait LoggingActor extends Actor with ActorLogging with DefaultAkkaFieldBuilderProvider

class MyActor extends LoggingActor {
  
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.error("Restarting due to [{}] when processing [{}]", fb => fb.list(
      fb.exception(reason),
      fb.string("message" -> message.toString),
      fb.keyValue("self" -> self.path)
    ))
  }
}

This is pretty straightforward and almost transparent.

Next, we need an implicit EchopraxiaLoggingAdapter for our version of LoggingReceive:

trait EchopraxiaLoggingAdapter[FB] {
  def core: CoreLogger
  def fieldBuilder: FB
}

object EchopraxiaLoggingAdapter {
  def apply[FB](logger: Logger[FB]): EchopraxiaLoggingAdapter[FB] = new EchopraxiaLoggingAdapter[FB] {
    override def core: CoreLogger = logger.core
    override def fieldBuilder: FB = logger.fieldBuilder
  }
}

And we're good to go. LoggingReceive will take an Any because it's untyped, so that gets rendered with a standard toString.

Akka Typed

For Akka Typed, it's pretty much the same thing. We'll need to render AkkaTypedFieldBuilder similar to the AkkaFieldBuilder, but working with typed Behavior[T] means that we can require that the message passed through has a fieldBuilder.ToValue defined on it, which is easiest to do on the logger itself through implicits rather than Behaviors.logMessages.

val logger = LoggerFactory.getLogger.withFieldBuilder(MyFieldBuilder).withActorContext(context)

// Then log SayHello messages
logger.debugMessages[SayHello] {
  Behaviors.receiveMessage { message =>
    val replyTo = context.spawn(GreeterBot(max = 3), message.name)
    greeter ! Greeter.Greet(message.name, replyTo)
    Behaviors.same
  }
}

Okay, but how does that work?

Well, there's a Behaviors.intercept method that does what we want. This isn't in the documentation, but it is is part of the public API so we can use it and pass it to the LogMessagesInterceptor:

object Implicits {
  implicit class AkkaLoggerOps[FB <: AkkaTypedFieldBuilder](logger: Logger[FB]) {
    def debugMessages[T: ToValue : ClassTag](behavior: Behavior[T]): Behavior[T] =
      Behaviors.intercept(() => new LogMessagesInterceptor[T](Level.DEBUG, logger))(behavior)
  }
}

class LogMessagesInterceptor[T: ToValue : ClassTag](val level: Level, logger: Logger[FB]) extends BehaviorInterceptor[T, T] {
  import LogMessagesInterceptor._

  override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
    log(LogMessageTemplate, fb => {
      import fb._
      list(
        value("self" -> ctx.asScala.self),
        value("message" -> msg)
      )
    })
    target(ctx, msg)
  }
}

If it doesn't have a fieldBuilder.ToValue, it doesn't compile, and all output uses the given field builder instead of a global unstructured toString. Finally, at long last.

Akka Streams

Integrating Echopraxia with Akka Streams is a bit different, as it involves type enrichment on the SourceOps and FlowOps methods (and their Context equivalents). This follows from the custom operator suggestions given in the docs:

trait Implicits {
  implicit class SourceLogging[Out, Mat](s: Source[Out, Mat]) {
    def elog[FB <: AkkaStreamFieldBuilder](implicit log: EchopraxiaLoggingAdapter[FB]): SourceLoggingStage[FB, Out, Mat] = {
      new SourceLoggingStage(s, log.core, log.fieldBuilder)
    }
  }

  implicit class SourceWithContextLogging[Out, Ctx, Mat](s: SourceWithContext[Out, Ctx, Mat]) {
    def elog[FB <: AkkaStreamFieldBuilder](implicit log: EchopraxiaLoggingAdapter[FB]): SourceWithContextLoggingStage[FB, Out, Ctx, Mat] = {
      new SourceWithContextLoggingStage(s, log.core, log.fieldBuilder)
    }
  }

  implicit class FlowLogging[In, Out, Mat](f: Flow[In, Out, Mat]) {

    def elog[FB <: AkkaStreamFieldBuilder](implicit log: EchopraxiaLoggingAdapter[FB]): FlowLoggingStage[FB, In, Out, Mat] = {
      new FlowLoggingStage(f, log.core, log.fieldBuilder)
    }
  }

  implicit class FlowWithContextLogging[In, Out, Ctx, Mat](flow: FlowWithContext[In, Ctx, Out, Ctx, Mat]) {
    def elog[FB <: AkkaStreamFieldBuilder](implicit log: EchopraxiaLoggingAdapter[FB]): FlowWithContextLoggingStage[FB, In, Out, Ctx, Mat] = {
      new FlowWithContextLoggingStage(flow, log.core, log.fieldBuilder)
    }
  }
}

Because the SourceLoggingStage is not a SourceOps or FlowOps itself, it does require an elog.info("name") to close the loop. This takes out the implicit LoggingOptions (which I really don't like) and allows for elog.withCondition and elog.withFields similar to the Logger API.

Each logging stage class breaks down into a call to EchopraxiaLog, which has structured logging for the graph stage and exposes the graph stage operation as operationKey.

final case class EchopraxiaLog[FB <: AkkaStreamFieldBuilder, T](extract: (FB, T) => Field) 
  extends SimpleLinearGraphStage[T] {
  
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with OutHandler with InHandler {
      def decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
      override def onPush(): Unit = {
        try {
          val elem = grab(in)
          log.log(level, "[{}]: {} {}", (fb: FB) => fb.list(
            fb.string(nameKey, name),
            fb.string(operationKey, "push"),
            extract(fb, elem)
          ), fieldBuilder)

          push(out, elem)
        } catch {
          case NonFatal(ex) =>
            decider(ex) match {
              case Supervision.Stop => failStage(ex)
              case _ => pull(in)
            }
        }
      }      

      // ...
    }
}

The extract method means that you can decide how you want to render the field, using fb.value or keyValue:

val s = Source(1 to 4)
  .elog
  .withCondition(condition)
  .withFields(fb => fb.keyValue("foo", "bar"))
  .info("before", (fb, el) => fb.keyValue("elem", el))

One of my hopes was that I could represent a Flow as a tree through structured logging. This doesn't seem possible from what I can tell, because streams are "traversable" and a traversal builder doesn't have a complete picture of the flow before it's built – I can render the inlets and outlets, but it's not the same. It's the same problem with functions – referential transparency means that you have to use something like treelog to describe a computation.

Unexpected Wins

It's really nice to see how the early bets have paid off. One unexpected win was being able to resolve ByteString as a structured utf8 string instead of an array of bytes. The default implementation contains both bytes length and utf8:

  override implicit val byteStringToValue: ToValue[ByteString] = bs => ToObjectValue(
    keyValue("length" -> bs.length),
    keyValue("utf8String" -> bs.utf8String)
  )

Being able to render ByteString as either binary focused or text focused depending on what field builder is passed in is great.

Another win is the automatic type class derivation – since messages are typically case classes, it's trivial to add mappings in field builders for them.

Comments