Finagle Tutorial: Twitter’s RPC Library for Scala

10 minute read

This article is for Scala developers of all levels - you don’t need any fancy Scala knowledge to make the best out of this piece.

For those who don’t know yet, almost the entire Twitter backend runs on Scala, and the Finagle library is at the core of almost all Twitter services. They also built an entire ecosystem of libraries, tools and frameworks on top of Finagle, which have been successfully used in production at other big companies (e.g. Foursquare, ING, Pinterest, Soundcloud, etc).

As I’m writing this article, Twitter is going through a major shakeup. More than half the staff have been fired, and more have resigned, including the entire Finagle team. However, the success of Twitter as a distributed system is because of Scala and Finagle; moving away from Scala/Finagle would be a dumb and costly move.

I expect this article to still be relevant, even just for you to understand some solid principles which you can use to build your applications, regardless of library or tool, even if Finagle ceases maintenance.

If you’d like to watch the video form:

1. Finagle Services

In order to add Finagle to a build.sbt project we’ll need these definitions:

libraryDependencies ++= Seq(
  "com.twitter" %% "finagle-core" % "22.7.0",
  "com.twitter" %% "finagle-http" % "22.7.0",
)

As of this writing, Finagle supports Scala 2.13. As Finagle (to my knowledge) doesn’t use any Scala fancy tools (e.g. macros), supporting Scala 3 shouldn’t be difficult to add.

Finagle is an RPC library for distributed systems. This means it’s protocol-agnostic: we can interact with another machine via any sort of protocol: gRPC, HTTP, Thrift, etc. In this article we’ll demonstrate the principles by means of HTTP.

RPC is an old term for “remote procedure call”, which in our case is a bit outdated, but the principle still stands. A Finagle “service” is described by a function

abstract class Service[-A, +B] extends (A => Future[B])

That’s it. A Service is a function that takes in an argument and returns a result later. Normally we think of such functions as async functions on the same machine, but Finagle extends this concept to include distributed systems: our result may be (and often is) returned from another system. This means that the protocol to interact with the outside system, as well as the serialization/deserialization between our “native” values and the data the protocol understands, are all abstracted away.

This is the single most powerful concept in Finagle.

A note that the Future type we use in this signature is Twitter’s own Future type, not that from scala.concurrent. It has the same semantics and similar transformers (e.g. map, flatMap, for-comprehensions, onComplete) as the built-in one. If you’re wondering why Twitter isn’t using the scala.concurrent version, it’s because Twitter’s Future is older than Scala’s Future. It’s no exaggeration that some Twitter packages inspired what’s now in the Scala standard library.

2. HTTP Services and Clients

Let’s demonstrate some Finagle services with HTTP. Let’s consider a server that listens on port 9090 and receives requests at URLs of the form localhost:9090?name=daniel and returns HTTP responses with the length of the query parameter “name”, as the payload.

import com.twitter.finagle._
import com.twitter.finagle.http._
import com.twitter.util._
def stringLengthService = new Service[Request, Response] {
  override def apply(request: Request): Future[Response] = Future {
    val computationResult = Option(request.getParam("name")).map(_.length).getOrElse(-1)
    val response = Response(Status.Ok)
    response.setContentString(computationResult.toString)
    response
  }
}

An HTTP service is a Service that takes in HTTP Requests and returns Futures holding HTTP Responses. This particular service parses the request, takes its query parameter value and computes its length, then builds a Response (note it’s mutable) and returns it.

The main application can simply say

def main(args: Array[String]): Unit = {
  val server = Http.serve(":9090", stringLengthService)
  Await.ready(server)
}

At this point, we can start our application an issue HTTP requests via cURL or HTTPie or your favorite HTTP tool (even your browser).

$ http get localhost:9090?name=daniel

HTTP/1.1 200 OK
Content-Length: 1

6

We can also build clients that interact with this HTTP server. In another Scala object, we can build one:

object SimpleClient {
  def main(args: Array[String]): Unit = {
    val client: Service[Request, Response] = Http.newService("localhost:9090")

    val request = Request(Method.Get, "/?name=daniel")
    val response: Future[Response] = filteredClient(request)

    response.onSuccess(resp => println(resp.getContentString()))
    response.onFailure(ex => ex.printStackTrace())
  }
}

Through Http.newService(...) we abstract away a Service, which we can now invoke like a function, locally, while the response is actually fetched from the other system. This is what makes Finagle an RPC system. Running this SimpleClient application will print the same value as the shell example.

Also note how Futures can be handled by supplying callback lambdas, much like we now do with the Scala standard Futures.

3. Filters

Services can be easily built and used, as shown in the previous section. Often, though, we would like to build extra layers of logic on top of existing services. Examples:

  • Data transformations between the inputs/outputs that the service supports and our desired data structures. Example: encoders/decoders
  • Extra conditions on the inputs, outputs or other attributes of the service itself. Example: enforcing SLAs or input sanitizing

These extra layers of logic are sometimes called “middleware” in other library ecosystems (e.g. Typelevel or ZIO), here they’re called Filters.

A Filter has 4 type arguments, following the flow of data in this gorgeous ASCII diagram I made myself:

  ReqIn  +--------+   RepOut   +---------+
 ------> |        |  ------->  |         |
         | Filter |            | Service |
 <------ |        |  <-------  |         |
  RepIn  +--------+   ReqOut   +---------+

A Filter is placed “in front of” the service we would like to enhance. Thinking of function composition, this diagram has 3 functions:

  • a ReqIn => RepOut function which transforms the input into something that the service understands
  • the RepOut => ReqOut function which is the service itself (remember that Service extends Function1); technically this function is RepOut => Future[ReqOut]
  • the ReqOut => RepIn function which transforms the output of the service into another type for the outside world

Compacting these 3 functions into one, this leads to a big function (ReqIn, (RepOut => Future[ReqOut])) => Future[RepIn] or (ReqIn, Service[RepOut,ReqOut]) => Future[RepIn] which is the fundamental description of a Filter:

abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
  extends ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut])

Because many filters don’t need data transformation, we also have SimpleFilters which do no data transformations

abstract class SimpleFilter[Req, Rep] extends Filter[Req, Rep, Req, Rep]

With this new concept, let’s add a new filter which enforces replies to be sent back within one second, otherwise the Future holding the reply will be failed:

class TimeoutFilter[Req, Rep](timeout: Duration, timer: Timer) extends SimpleFilter[Req, Rep] {
  override def apply(request: Req, service: Service[Req, Rep]): Future[Rep] =
    service(request).within(timer, timeout)
}

With this filter implemented, adding it is easy once we have an existing service. We can add it in front of our HTTP client to enforce SLAs, for example:

object TimeoutClient {
  def main(args: Array[String]): Unit = {
    val originalClient: Service[Request, Response] = Http.newService("localhost:9090")
    val timeoutFilter = new TimeoutFilter[Request, Response](Duration.fromSeconds(1), new JavaTimer())
    val filteredClient = timeoutFilter.andThen(originalClient)

    val request = Request(Method.Get, "/?name=daniel")
    val response: Future[Response] = filteredClient(request)
    // asynchronous API
    // map, flatMap, for comprehensions
    response.onSuccess(resp => println(resp.getContentString()))
    response.onFailure(ex => ex.printStackTrace())
    Thread.sleep(2000)
  }
}

The critical line is val filteredClient = timeoutFilter.andThen(originalClient), which is an intuitive API. This also makes it easy for us to place multiple filters before a service, each time returning a new service.

This application will work as before, since the HTTP server’s Futures are completed immediately. If we add a Thread.sleep(1200) inside the HTTP server’s implementation, for example:

def stringLengthService = new Service[Request, Response] {
  override def apply(request: Request): Future[Response] = Future {
    val computationResult = Option(request.getParam("name")).map(_.length).getOrElse(-1)
    val response = Response(Status.Ok)
    response.setContentString(computationResult.toString)
    Thread.sleep(1200) // simulate taking a long time
    response
  }
}

then if we restart the server, our client will now see a failed Future, and we’ll see a timeout exception with a stack trace.

4. Load Balancing

This feature of Finagle is quite spectacular. Load balancing is important in distributed systems, and if we have multiple instances of the same application hosted on many machines, it’s often useful to distribute the load among them. Finagle makes this easy.

Finagle has a Name data structure which can be bound to a client. For single-instance services, the Name contains a single address. But we can instantiate multiple HTTP services on multiple ports to demonstrate how Names can be bound to all these instances at once.

First, to modify the HTTP service code a bit:

def debugFilter(id: String) = new SimpleFilter[Request, Response] {
  override def apply(request: Request, service: Service[Request, Response]) = {
    println(s"[${id}] received request $request")
    service(request)
  }
}

// same stringLengthService implementation

def simpleHttpServer(port: Int) =
  Http.serve(s":${port}", debugFilter(s"server-$port").andThen(stringLengthService))

def main(args: Array[String]): Unit = {
  (9090 to 9090).map { port =>
    simpleHttpServer(port)
  }.foreach(server => Await.ready(server))
}

This code spins up 3 different HTTP servers on the same JVM. We can of course start 3 separate instances of the same HTTP server on different JVMs, but this one is easier to manage and the result is the same.

We first add a debugger in the form of the debugFilter method, which returns a Filter — notice how useful the concept is — which prints the request along with the server id, so that we can track which request arrived at which server, from the same console.

We also changed how the HTTP service is served with the simpleHttpServer method, then started 3 servers on ports 9090, 9091 and 9092.

From the point of view of the client now, things are comparatively simpler (as they should):

object LoadBalancedClient {
  def main(args: Array[String]): Unit = {
    val addresses = (9090 to 9092).toList.map(port => Address("localhost", port))
    val name: Name = Name.bound(addresses: _*)
    val client = Http.newService(name, "client")
    val requests = (1 to 20).map(i => Request(Method.Get, s"?name=${"daniel" * i}"))
    val responses = requests.map(req => client(req).map(_.getContentString))
    // collect = "traverse"
    Future.collect(responses).foreach(println)
    Thread.sleep(5000)
  }
}

After building the addresses, we bind them to the Name data structure, and then the Name in turn to val client = Http.newService(name, "client"). After this point, the client can be used exactly as before, as a function Request => Future[Response]! Except in this case, the requests will be (internally) redirected to the 3 HTTP servers. The algorithm for routing is outside the scope of this article, but it can be configured, and you can read more about the built-in algorithms in the docs.

After load-balancing, the API stays consistent, and the client doesn’t have (and doesn’t need to have) any knowledge about how the service is implemented, or even how many instances of the service there are.

Running the application, we’ll see that the requests are cycled in between server instances:

[server-9090] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58174)
[server-9092] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58171)
[server-9091] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58176)
[server-9092] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58169)
[server-9090] received request Request("GET ?name=danieldanieldanieldanieldanieldaniel", from /127.0.0.1:58177)
[server-9091] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58181)
[server-9092] received request Request("GET ?name=danieldanieldanieldanieldaniel", from /127.0.0.1:58185)
[server-9090] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58175)
[server-9090] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58168)
[server-9092] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58173)
[server-9092] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58180)
[server-9092] received request Request("GET ?name=danieldanieldaniel", from /127.0.0.1:58179)
[server-9092] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58183)
[server-9091] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58166)
[server-9090] received request Request("GET ?name=danieldaniel", from /127.0.0.1:58178)
[server-9090] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58182)
[server-9091] received request Request("GET ?name=danieldanieldanieldaniel", from /127.0.0.1:58184)
[server-9091] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58170)
[server-9091] received request Request("GET ?name=daniel", from /127.0.0.1:58172)
[server-9090] received request Request("GET ?name=danieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldanieldaniel", from /127.0.0.1:58167)

This feature makes it very easy for us to spin up additional instances of a service: just start it, and add it to a configuration file (or something persistent) that a client can now use, and the client will start directing traffic towards the new instance as well.

5. Conclusion

In this article, we’ve looked at the basics of the Finagle RPC library:

  • how services are described and consumed
  • how to build HTTP servers and clients
  • how to add filters and “middleware” on top of existing services
  • how to add load balancing while keeping the consumer API intact

The Finagle ecosystem of libraries is extremely powerful. I personally doubt they will cease development, given the success they’ve had with massively distributed systems (not just at Twitter). If nothing else, we should all learn from Finagle’s philosophy for **our own tools and libraries, and I hope this article helped achieve that goal.