gRPC in Scala with Fs2 and Scalapb

21 minute read

by Herbert Kateu

The video version is here:

1. Introduction

RPC stands for Remote Procedure Call, it’s a client-server communication protocol where one program can request a service on a different address that may be on the same or different system connected by a network. It enables users to work with remote procedures as if they were local.

In this article we will cover gRPC which is a modern Open Source RPC framework designed by Google that uses Protocol Buffers for data serialization and HTTP/2 as a transport layer. gRPC is language independent therefore it’s possible for a client written in one language to communication with a server written in another.

At the time of writing this article, gRPC officially supports 11 programming languages which include Python, Java, Kotlin, and C++ to mention but a few. Scala is not officially supported at the moment however the ScalaPB library provides a good wrapper around the official gRPC Java implementation, it provides an easy API that enables translation of Protocol Buffers to Scala case classes with support for Scala3, Scala.js, and Java Interoperability.

One of the main use cases for gRPC is communication between internal microservices. In the following sections, we’ll create a gRPC server and client that will process streams of orders sent from an online store. We’ll also learn how to write a .proto file which we’ll use to generate necessary case classes and traits using the Fs2Grpc library and finally dig into Scalapb to generate custom types for our application.

2. Setting Up

In our new Scala project, let’s create a plugins.sbt file in the project folder with the following code:

addSbtPlugin("org.typelevel" % "sbt-fs2-grpc" % "2.7.4")

Also add the following code to your build.sbt file:

val scala3Version = "3.3.0"
val http4sVersion = "0.23.23"
val weaverVersion = "0.8.3"

lazy val protobuf =
  project
    .in(file("protobuf"))
    .settings(
      name := "protobuf",
      scalaVersion := scala3Version
    )
    .enablePlugins(Fs2Grpc)

lazy val root =
  project
    .in(file("."))
    .settings(
      name := "root",
      scalaVersion := scala3Version,
      libraryDependencies ++= Seq(
        "io.grpc" % "grpc-netty-shaded" % scalapb.compiler.Version.grpcJavaVersion,
        "org.http4s" %% "http4s-ember-server" % http4sVersion,
        "org.http4s" %% "http4s-dsl" % http4sVersion,
        "org.http4s" %% "http4s-circe" % http4sVersion,
        "com.disneystreaming" %% "weaver-cats" % weaverVersion % Test
      ),
      testFrameworks += new TestFramework("weaver.framework.CatsEffect")
    )
    .dependsOn(protobuf)

Here we have a root project which depends on protobuf, with this design, when root is compiled, the Fs2Grpc plugin will automatically generate case classes from our .proto file.

We’ll also use circe for JSON encoding and decoding, and http4s to develop our REST server from which we’ll forward orders to the gRPC order service.

3. The .proto file

Create a folder called protobuf in the root of your project and add an orders.proto file in the following path, protobuf/src/main/protobuf/orders.proto.

Now add the following code to the file.

syntax = "proto3";

package com.rockthejvm.protos;

A .proto file contains Google’s protocol buffer language which is used to structure protocol buffer data. Scalapb supports both proto2 and proto3 versions.

In the first line of the file, we define syntax as proto3 which is the protobuf language version, next we specify package as com.rockthejvm.protos, this will be the namespace through which we access the case classes generated by Fs2Grpc.

Let’s define some data structures that will be used in our application.

...
message Item{
  string name = 1;
  int32 qty = 2;
  double amount = 3;
}

A message is a data structure containing a set of typed fields. Item contains data for each product in an order, it has three fields name, qty, and amount of type string, int32, and double respectfully. The “= 1”, “= 2” and “= 3” are unique tags used by each field in binary encoding. message gets translated to a case class during compilation.

...
message OrderRequest {
  int32 orderid = 1;
  repeated Item items = 2;
}

message OrderReply {
  int32 orderid = 1;
  repeated Item items = 2;
  double total = 3;
}

Above we define two more message types, OrderRequest, and OrderReply. OrderRequest consists of orderid of type int32 and items of type Item, this shows that we can nest a message within a message. The repeated annotation means that items can be repeated any number of times, in Scala this becomes a Seq of Item. OrderReply also consists of an orderid of type int32, items, a repeated Item, and total of type double.

...
service Order {
  rpc SendOrderStream (stream OrderRequest) returns (stream OrderReply) {}
}

Lastly, we define our Order service which contains a single function, SendOrderStream() that takes a stream of OrderRequest and returns a stream of OrderReply. The Order service will be translated to a Scala trait through which we can access our function.

Here’s the full code:

syntax = "proto3";

package com.rockthejvm.protos;

message Item{
  string name =1;
  int32 qty = 2;
  double amount = 3 [(scalapb.field).type = "squants.market.Money"];
}

message OrderRequest {
  int32 orderid = 1;
  repeated Item items = 2;
}

message OrderReply {
  int32 orderid = 1;
  repeated Item items = 2;
  double total = 3 [(scalapb.field).type = "squants.market.Money"];
}

service Order {
  rpc SendOrderStream (stream OrderRequest) returns (stream OrderReply) {}
}

Now we can head over to our terminal, navigate to our project directory and run sbt compile, Fs2Grpc should generate the necessary Scala files according to what we defined in orders.proto.

After compilation, we’ll find two sets of generated files in protobuf/target/scala-3.3.0/src_managed/main, one under a Fs2Grpc folder that contains our trait while the rest are cases classes will be under a Scalapb folder. If we look at the top of each of these files, we’ll find that they are all under the com.rockthejvm.protos.orders package namespace that we defined in our orders.proto file.

4. The gRPC server

Let’s take a look at the OrderFs2Grpc.scala file generated by Fs2Grpc:

trait OrderFs2Grpc[F[_], A] {
  def sendOrderStream(request: _root_.fs2.Stream[F, com.rockthejvm.protos.orders.OrderRequest], ctx: A): _root_.fs2.Stream[F, com.rockthejvm.protos.orders.OrderReply]
}

We’ll notice that an OrderFs2Grpc trait was created with a single function sendOrderStream() similar to what we defined in orders.proto, however, this function has no implementation, this is what we’ll need to provide in the next step.

Create OrderService.scala in the following path src/main/scala/OrderService.scala and add the following code:

package com.rockthejvm.service

import com.rockthejvm.protos.orders.*
import cats.effect.*
import io.grpc.*
import fs2.Stream

class OrderService extends OrderFs2Grpc[IO, Metadata] {
  override def sendOrderStream(
      request: Stream[IO, OrderRequest],
      ctx: Metadata
  ): Stream[IO, OrderReply] = {
    request.map { orderReq =>
      OrderReply(
        orderReq.orderid,
        orderReq.items,
        orderReq.items.map(i => i.amount).reduce(_ + _)
      )
    }
  }
}

Here we create a new class OrderService that extends our OrderFs2Grpc trait, it requires type parameters, an effect type, and a context which we provide as IO from cats.effect and Metadata from io.grpc. Metadata provides access to read and write metadata values to be exchanged during a call.

In our implementation of sendOrderStream above, we map on request which is an fs2 stream of type Stream[IO, OrderRequest] and for each OrderRequest, we produce an OrderReply with the same orderid, items and the total amount spent calculated using the reduce(_+_) function.

import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
import fs2.grpc.syntax.all.*

// ...

object Server {
  private val orderService: Resource[IO, ServerServiceDefinition] =
    OrderFs2Grpc.bindServiceResource[IO](new OrderService)

  private def runServer(
      service: ServerServiceDefinition
  ): Resource[IO, Server] =
    NettyServerBuilder
      .forPort(9999)
      .addService(service)
      .resource[IO]

  val grpcServer: Resource[IO, Server] =
    orderService
      .flatMap(x => runServer(x))
}

Above we create our gRPC service by calling the bindServiceResource() method on OrderFs2Grpc and passing it a service implementation, in our case a new instance of OrderService, this returns a Resource[IO, ServerServiceDefinition].

The runServer function uses NettyServerBuilder to create our server which will run on port 9999. In the last line, we create our final Resource by calling flatMap on the orderService Resource and passing the ServerServiceDefinition to runServer().

Here’s the full code:

package com.rockthejvm.service

import com.rockthejvm.protos.orders.*
import cats.effect.*
import io.grpc.*
import fs2.Stream
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
import fs2.grpc.syntax.all.*

class OrderService extends OrderFs2Grpc[IO, Metadata] {
  override def sendOrderStream(
      request: Stream[IO, OrderRequest],
      ctx: Metadata
  ): Stream[IO, OrderReply] = {
    request.map { orderReq =>
      OrderReply(
        orderReq.orderid,
        orderReq.items.map(i => i.name),
        orderReq.items.map(i => i.amount).reduce(_ + _)
      )
    }
  }
}

object Server {
  private val orderService: Resource[IO, ServerServiceDefinition] =
    OrderFs2Grpc.bindServiceResource[IO](new OrderService)

  private def runServer(
      service: ServerServiceDefinition
  ): Resource[IO, Server] =
    NettyServerBuilder
      .forPort(9999)
      .addService(service)
      .resource[IO]

  val grpcServer: Resource[IO, Server] =
    orderService
      .flatMap(x => runServer(x))
}

4. The gRPC client

Create OrderClient.scala in the following path, src/main/scala/com/rockthejvm/client/OrderClient.scala and add the following code.

package com.rockthejvm.client

import cats.effect.*
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import io.grpc.*
import fs2.grpc.syntax.all.*

object Client {
  private val resource: Resource[IO, OrderFs2Grpc[IO, Metadata]] =
    NettyChannelBuilder
      .forAddress("127.0.0.1", 9999)
      .usePlaintext()
      .resource[IO]
      .flatMap(ch => OrderFs2Grpc.stubResource[IO](ch))
}

Here we create a Client object with its first element, resource. We use NettyChannelBuilder to construct a channel that uses Netty transport. A channel provides a connection to a network socket or component that is capable of I/O operations such as read, write, connect, and bind. The forAddress() method adds host and port values (same as our server) while the usePlaintext() method sets the negotiation type for the Http/2 connection to PLAINTEXT instead of TLS which is the default for a ManagedChannel, and the resource() extension method from FS2 builds the ManagedChannel into a Resource. We then create a stubResource out of that channel, finally returning an instance of OrderFs2Grpc[IO, Metadata].

The gRPC client will enable us to directly call methods on this instance, therefore we’ll be able to process multiple orders from an fs2 stream by invoking the sendOrderStream() function, ignoring that the actual implementation of that method involves a remote connection to a different service, completing the RPC protocol.

Here’s how we can implement this:

// ...
import com.rockthejvm.protos.orders.*
import fs2.Stream

object Client {
  // ...
  private def formatItemsToStr(items: Seq[Item]): Seq[String] = {
    items.map(x => s"[${x.qty} of ${x.name}]")
  }

  private def processOrders(
      orderStub: OrderFs2Grpc[IO, Metadata],
      orders: Stream[IO, OrderRequest]
  ): IO[List[String]] = {
    for {
      response <- orderStub.sendOrderStream(
        orders,
        new Metadata()
      )
      str <- Stream.eval(
        IO(
          s"Processed orderid: ${response.orderid} for items: ${formatItemsToStr(response.items)
              .mkString(" and ")}, totaling to ${response.total.toString}"
        )
      )
    } yield str
  }.compile.toList
}

We define processOrders(), a function that takes a service of type OrderFs2Grpc[IO, Metadata] and orders of type Stream[IO, OrderRequest] as arguments. When we call sendOrderStream(orders, new Metadata()) on orderStub, we get a Stream[IO, OrderReply], which we flatMap to format each OrderReply into a String that informs the user of their orderid, the items they bought and the total amount they spent for each order, this Stream is compiled to return an IO[List[String]].

We use the formatItemsToStr() function to format the Seq[Item] as part of the returned String.

Here’s the full code:

package com.rockthejvm.client

import cats.effect.*
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import io.grpc.*
import fs2.grpc.syntax.all.*
import com.rockthejvm.protos.orders.*
import fs2.Stream

object Client {
  private val resource: Resource[IO, OrderFs2Grpc[IO, Metadata]] =
    NettyChannelBuilder
      .forAddress("127.0.0.1", 9999)
      .usePlaintext()
      .resource[IO]
      .flatMap(ch => OrderFs2Grpc.stubResource[IO](ch))

  private def formatItemsToStr(items: Seq[Item]): Seq[String] = {
    items.map(x => s"[${x.qty} of ${x.name}]")
  }

  private def processOrders(
      service: OrderFs2Grpc[IO, Metadata],
      orders: Stream[IO, OrderRequest]
  ): IO[List[String]] = {
    for {
      response <- service.sendOrderStream(
        orders,
        new Metadata()
      )
      str <- Stream.eval(
        IO(
          s"Processed orderid: ${response.orderid} for items: ${formatItemsToStr(response.items)
              .mkString(" and ")}, totaling to ${response.total.toString}"
        )
      )
    } yield str
  }.compile.toList
}

5. The Routes

In this section, we define the logic to handle incoming requests to our REST server. Create AppRoutes.scala in the following path src/main/scala/com/rockthejvm/routes/AppRoutes.scala, and add the code below:

package com.rockthejvm.routes

import cats.effect.IO
import org.http4s.*
import org.http4s.dsl.io.*

object AppRoutes {
  def restService = HttpRoutes.of[IO] {
    case req @ GET -> Root / "index.html" =>
      StaticFile
        .fromString[IO](
          "src/main/resources/index.html",
          Some(req)
        )
        .getOrElseF(NotFound())
  }
}

Above we define our first route /index.html which serves the client with an HTML page where one can make orders from our online store. It uses the fromString() method from Http4s’s StaticFile object which takes a URL String to an HTML file and an Option of a Request[IO]. If the URL is incorrect the client receives status code 404, meaning the page is not available.

// ...
import com.rockthejvm.protos.orders.OrderRequest

object AppRoutes {
  case class Orders(values: Seq[OrderRequest])

  def restService = HttpRoutes.of[IO] {
    // ...
    case req @ POST -> Root / "submit" => ???
  }
}

Next, we create our second route, /submit which will receive POST requests with a user’s orders. We create an Orders case class to capture the Seq of OrderRequests that the user sends. However, to parse our request, we’ll need an EntityDecoder for Orders in scope. We’ll also need Decoders for Orders, OrderRequest, and Item in scope.

import com.rockthejvm.protos.orders.{OrderRequest, Item}
import org.http4s.circe.*
import io.circe.Decoder
import io.circe.syntax.*
import scala.util.Random

object AppRoutes {
  case class Orders(values: Seq[OrderRequest])

  object Orders {
    private given itemDecoder: Decoder[Item] = Decoder.instance { h =>
      for {
        name <- h.get[String]("name")
        qty <- h.get[Int]("quantity")
        amount <- h.get[Double]("amount")
      } yield Item.of(
        name,
        qty,
        amount
      )
    }

    private given orDecoder: Decoder[OrderRequest] = Decoder.instance { h =>
        h.get[Seq[Item]]("items").map { items =>
          OrderRequest.of(Random.between(1000, 2000), items)
      }
    }

    given ordersDecoder: Decoder[Orders] = Decoder.instance { h =>
      h.get[Seq[OrderRequest]]("orders").map { orders =>
        Orders(orders)
      }
    }

    given ordersEntityDecoder: EntityDecoder[IO, Orders] = jsonOf[IO, Orders]
  }
  //...
}

Here we create a companion object for Orders, the last line is an EntityDecoder for Orders which uses the jsonOf[IO, Orders] function provided by http4s.circe. For this EntityDecoder to work, we needed to provide a given of type Decoder[Orders], ordersDecoder, which is implemented with the help of circe’s Decoder.instance function. We target the orders key from the incoming JSON string and pass its value to the Orders case class.

The ordersDecorder also requires the presence of given Decoders for Item and OrderRequest in scope, which were also added to the Orders companion object. When creating the OrderRequest Decoder, we use a Random Int between 1000 and 2000 to simulate the orderid.

Scalapb provides a convenient of() function to pass data to our case class without having to deal with UnknownFieldSet.empty which if you check on the generated code is part of every case class.

We can also pass values to these case classes in different ways depending on your preference, here are some examples with OrderRequest:

// Using the of() method
OrderRequest.of(Random.between(1000, 2000), items)

//Using withX() method
OrderRequest()
  .withOrderid(Random.between(1000,2000))
  .withItems(items)

//Default method
// Here we would have to import UnknownFieldSet
import scalapb.UnknownFieldSet
OrderRequest(
  Random.between(1000,2000),
  items,
  _root_.scalapb.UnknownFieldSet.empty
)

Let’s see how to process the incoming POST requests.

// ...
import com.rockthejvm.client.Client
import fs2.Stream

object AppRoutes {
  // ...
  def restService(service: OrderFs2Grpc[IO, Metadata]) = HttpRoutes.of[IO] {
    // ...
     case req @ POST -> Root / "submit" =>
      req
        .as[Orders]
        .flatMap { x =>
          Client.processOrders(service, Stream.emits(x.values).covary[IO])
        }
        .handleError(x => List(x.getMessage))
        .flatMap(x => Ok(x.asJson))
  }
}

Using the as() method on the request parses our JSON string returning IO[Orders]. We then flatMap on this to pass Orders to our processOrders() function which communicates with the gRPC server to process them. This returns either an IO[List[String]] in case of success or an error message which we package as a List in case of failure. This List is then passed back to the browser as a JSON array of strings.

Here’s the full code:

package com.rockthejvm.routes

import cats.effect.IO
import org.http4s.*
import org.http4s.dsl.io.*
import com.rockthejvm.protos.orders.{OrderRequest, Item}
import org.http4s.circe.*
import io.circe.Decoder
import io.circe.syntax.*
import scala.util.Random
import com.rockthejvm.client.Client
import fs2.Stream

object AppRoutes {
  case class Orders(values: Seq[OrderRequest])
  object Orders {
    private given itemDecoder: Decoder[Item] = Decoder.instance { h =>
      for {
        name <- h.get[String]("name")
        qty <- h.get[Int]("quantity")
        amount <- h.get[Double]("amount")
      } yield Item.of(
        name,
        qty,
        amount
      )
    }

    private given orDecoder: Decoder[OrderRequest] = Decoder.instance { h =>
        h.get[Seq[Item]]("items").map { items =>
          OrderRequest.of(Random.between(1000, 2000), items)
      }
    }

    given ordersDecoder: Decoder[Orders] = Decoder.instance { h =>
      h.get[Seq[OrderRequest]]("orders").map { orders =>
        Orders(orders)
      }
    }

    given ordersEntityDecoder: EntityDecoder[IO, Orders] = jsonOf[IO, Orders]
  }

  def restService(service: OrderFs2Grpc[IO, Metadata]) = HttpRoutes.of[IO] {
    case req @ GET -> Root / "index.html" =>
      StaticFile
        .fromString[IO](
          "src/main/resources/index.html",
          Some(req)
        )
        .getOrElseF(NotFound())
    case req @ POST -> Root / "submit" =>
      req
        .as[Orders]
        .flatMap { x =>
          Client.processOrders(service, Stream.emits(x.values).covary[IO])
        }
        .handleError(x => List(x.getMessage))
        .flatMap(x => Ok(x.asJson))
  }
}

6. The “web UI”

Create the index.html file in the following path, src/main/resources/index.html, this file is quite lengthy, we can copy the contents from this link. It consists of 3 forms each with the following format:

<form>
  <fieldset>
    <span style="font-weight: bold; color: darkorchid">Samsung</span><br />
    <span style="color: crimson">FlagShip Phones</span><br />
    <input type="radio" id="phone1" name="flagship" value="s23"/>
    <label for="phone1">Samsung Galaxy s23 Ultra -> $999.99</label><br />
    <input type="radio" id="phone2" name="flagship" value="fold5" checked/>
    <label for="phone2">Samsung Galaxy Z Fold 5 -> $1800.00</label><br />
    <input type="radio" id="phone3" name="flagship" value="note"/>
    <label for="phone3">Samsung Galaxy Note 23 -> $1000.00</label><br />
    <span style="color: crimson">Quantity</span><br />
    <input type="number" name="flagshipQty" min=1 max=4 value="1"/><br />
    <br />
    <span style="color: crimson">Budget Phones</span><br />
    <input type="radio" id="phone4" name="budget" value="a23" checked/>
    <label for="gs1">Samsung Galaxy A23 5G -> $299.99</label><br />
    <input type="radio" id="phone5" name="budget" value="a54" />
    <label for="gs2">Samsung Galaxy A54 -> $449.99</label><br />
    <input type="radio" id="phone6" name="budget" value="a42" />
    <label for="gs3">Samsung A42 5G -> $399.99</label><br />
    <span style="color: crimson">Quantity</span><br />
    <input type="number" name="budgetQty" min=1 max=4 value="1"/><br />
    <br />
  </fieldset>
</form>

Each form has two sections where one can select a Flagship phone and a Budget phone with one’s preferred quantity, some default values have been set but can be changed to your preference.

<div>
  <button id="submitBtn">Submit All</button>
</div>
<div id="reply"></div>

We provide a Submit All button that will fetch all the inputted data using JavaScript, post the data, and append the response to the div with the id reply.

const prices = {
  iphone: 1099.99,
  s23: 999.99,
  edge: 799.99,
  nord: 270.0,
  a14: 200.0,
  motog: 170.0,
};
const names = {
  iphone: "Iphone 14",
  s23: "Samsung Galaxy s23 Ultra",
  edge: "Motorola Moto Edge+",
  nord: "OnePlus Nord N30",
  a14: "Samsung Galaxy A14",
  motog: "Motorola Moto G Stylus",
};

const forms = document.getElementsByTagName("form");

const submitButton = document.getElementById("submitBtn");
const replyDiv = document.getElementById("reply");

To process the forms we first create two objects containing the phone prices and phone names. We also get all the form elements and assign them to forms using the document.getElementsByTagName("form") function. Next, we get the submit button and div to post our response and assign them to submitButton and replyDiv respectively using the document.getElementById() function.

...
submitButton.addEventListener("click", (event) => {
  var orderObj = { orders: [] };
  for (let i = 0; i < forms.length; i++) {
    const formData = new FormData(forms[i]);
    const jsonObject = Object.fromEntries(formData);
    var jsonFormatted = {
      items: [
        {
          name: names[jsonObject["flagship"]],
          quantity: jsonObject["flagshipQty"],
          amount: prices[jsonObject["flagship"]],
        },
        {
          name: names[jsonObject["budget"]],
          quantity: jsonObject["budgetQty"],
          amount: prices[jsonObject["budget"]],
        },
      ],
    };
    orderObj.orders.push(jsonFormatted);
  }
}

Here, we create an event listener for a click event on the submit Button. When the button is clicked we run through all the forms using a for loop and fetch the data from each by calling new FormData(forms[i]). We then reformat the data to resemble our Scala case classes and assign each form’s data to jsonFormatted, this is then pushed to orderObj which is now an object containing an Array of orders.

submitButton.addEventListener("click", (event) => {
  ...
  async function sendForm() {
    const res = await fetch("http://localhost:8080/submit", {
      method: "POST",
      body: JSON.stringify(orderObj),
    });
    const resData = await res.json();
    return resData;
  }
  return sendForm()
    .then((arr) => arr.map((str) => {
      var p = document.createElement('p')
      p.textContent = str;
      p.style.color = "darkorchid";
      replyDiv.appendChild(p);
    }))
    .catch((err) => (replyDiv.innerHTML = err));
}

The last section is an async function sendForm() that uses JavaScripts fetch API to submit our from data as JSON and append the response to the a div.

Heres the full javaScript code:

      const prices = {
        s23: 999.99,
        fold5: 1800.00,
        note: 1000.00,
        a23: 299.99,
        a54: 449.99,
        a42: 399.99,
        edgeNew: 799.99,
        razr: 899.99,
        think: 700.00,
        edgeOld: 249.99,
        stylus: 300.00,
        motog: 250.00,
        fiveiv: 1099.99,
        onev: 1199.00,
        pro: 999.99,
        lfour: 270.00,
        teniv: 200.00,
        tenv: 409.00,
      };
      const names = {
        s23: "Samsung Galaxy s23 Ultra",
        fold5: "Samsung Galaxy Z Fold 5",
        note: "Samsung Galaxy Note 23",
        a23: "Samsung Galaxy A23 5G ",
        a54: "Samsung Galaxy A54",
        a42: "Samsung A42 5G",
        edgeNew: "Motorola Moto Edge+",
        razr: "Motorola Moto Razr+",
        think: "Motorola ThinkPhone",
        edgeOld: "Motorola Moto Edge",
        stylus: "Moto G Stylus 5G",
        motog: "Moto G Power",
        fiveiv: "Sony Xperia 5 IV",
        onev: "Sony Xperia 1 V",
        pro: "Sony Xperia Pro-1",
        lfour: "Sony Xperia L4",
        teniv: "Sony Xperia 10 IV",
        tenv: "Sony Xperia 10 V",
      };

const forms = document.getElementsByTagName("form");

const myButton = document.getElementById("submitBtn");
const replyDiv = document.getElementById("reply");

myButton.addEventListener("click", (event) => {
  var orderObj = { orders: [] };
  for (let i = 0; i < forms.length; i++) {
    const formData = new FormData(forms[i]);
    const jsonObject = Object.fromEntries(formData);
    var jsonFormatted = {
      items: [
        {
          name: names[jsonObject["flagship"]],
          quantity: jsonObject["flagshipQty"],
          amount: prices[jsonObject["flagship"]],
        },
        {
          name: names[jsonObject["budget"]],
          quantity: jsonObject["budgetQty"],
          amount: prices[jsonObject["budget"]],
        },
      ],
    };
    orderObj.orders.push(jsonFormatted);
  }

  async function sendForm() {
    const res = await fetch("http://localhost:8080/submit", {
      method: "POST",
      body: JSON.stringify(orderObj),
    });
    const resData = await res.json();
    return resData;
  }
  return sendForm()
    .then((arr) => arr.map((str) => {
      var p = document.createElement('p')
      p.textContent = str;
      p.style.color = "darkorchid";
      replyDiv.appendChild(p);
    }))
    .catch((err) => (replyDiv.innerHTML = err));
});

7. The main program

In this section, we bring everything together to run our program. First, we’ll need a server to run restService that we created previously. Create Main.scala in the following path, src/main/scala/Main.scala, and add the following code.

import cats.effect.*
import org.http4s.ember.server.EmberServerBuilder
import com.comcast.ip4s.*

object Main extends IOApp {
  def httpServerResource(remoteService: OrderFs2Grpc[IO, Metadata]) =
    EmberServerBuilder
      .default[IO]
      .withHost(host"0.0.0.0")
      .withPort(port"8080")
      .withHttpApp(AppRoutes.restService(remoteService).orNotFound)
      .build
}

We use EmberServerBuilder to create our server with port number 8080 and build it with our restService.

import com.rockthejvm.service.Server.grpcServer
import cats.syntax.parallel.*

object Main extends IOApp {
  // ...
  def run(args: List[String]): IO[ExitCode] = for {
    remoteService <- grpcServer
        .evalMap(svr => IO(svr.start()))
        .useForever
        .fork
    httpServer <- httpServerResource(remoteService).useForever.fork
    _ <- IO.never
  } yield ()
}

In the run() function we call both httpServer and grpcServer in parellel. We also call evalMap() on the grpcServer Resource to start the server and make sure it keeps running by calling useForever().

Now Let’s test our application. When we run our server and navigate to localhost:8080/index.html, we should be greeted with the following page.

index.html

Once the form is filled and submitted, we should see a response similar to the following.

server response

8. Managing Currency with Squants

Throughout the article we’ve been representing money as Double, we could improve our code by using the Squants library to manage currency values, specifically SquantsMoney type.

Therefore the question arises, if the case classes are generated by Scalapb, how can we make it so that Scalapb generates SquantsMoney type for amount and total? Scalapb provides a mechanism for customizing generated types convenient for such scenarios.

We’ll need to add the following two libraries to our protobuf project inside build.sbt.

...
lazy val protobuf =
  project
    .in(file("protobuf"))
    .settings(
      name := "protobuf",
      scalaVersion := scala3Version,
+     libraryDependencies += "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
+     libraryDependencies += "org.typelevel" %% "squants" % squantsVersion
    )
    .enablePlugins(Fs2Grpc)
...

Once we compile our project, we’ll notice a new file, scalapb.proto in the following path protobuf/target/protobuf_external/scalapb/scalapb.proto which contains the necessary methods to make our type conversion. Still in orders.proto, let’s import this new proto file into scope.

syntax = "proto3";

package com.rockthejvm.protos;

import "scalapb/scalapb.proto";

Note: If you are using IntelliJ, this import might show red because the proto plugin hasn’t scanned the protobuf_external path, therefore, you’ll need to add it there. IntelliJ will show this as a suggestion.

Now we can make changes to the amount and total for Item and OrderReply.

...
message Item{
  string name =1;
  int32 qty = 2;
- double amount = 3;
+ double amount = 3 [(scalapb.field).type = "squants.market.Money"];
}
...
message OrderReply {
  int32 orderid = 1;
  repeated Item items = 2;
- double total = 3;
+ double total = 3 [(scalapb.field).type = "squants.market.Money"];
}

Above, we set (scalapb.field).type to squants.market.Money which is the custom type we need for amount and total.

A few more extra steps are still required, let’s create a Customtype.scala file in the following path protobuf/src/main/protobuf/Customtype.scala and add the following code.

package com.rockthejvm.protos

import squants.market.{USD, Money}
import scalapb.TypeMapper

given typeMapper: TypeMapper[Double, Money] =
  TypeMapper[Double, Money](s => USD(s))(_.amount.toDouble)

Scalapb provides a TypeMapper object whose apply method takes two arguments, a function from the base type to a custom type i.e. Double to Money, and a function from the custom type back to the base type i.e. Money to Double. We provide typeMapper as a given so that when in scope, the generated Scalapb case classes will contain the expected Money type instead of Double.

To bring this given into scope when the case classes are generated, we’ll need to add this import as a File-level option inside our orders.proto file.

...
import "scalapb/scalapb.proto";

+ option (scalapb.options) = {
+  import: "com.rockthejvm.protos.given"
+ };
...

Now that everything is set, let’s compile our project and take a look at the cases classes generated by Scalapb. We’ll notice at the top of each of them is our given import.

package com.rockthejvm.protos.orders
import com.rockthejvm.protos.given
...

And Item and OrderReply now have amount and total for Item and OrderReply respectively of type Money.

final case class Item(
    name: _root_.scala.Predef.String = "",
    qty: _root_.scala.Int = 0,
    amount: squants.market.Money = com.rockthejvm.protos.orders.Item._typemapper_amount.toCustom(0.0),
    unknownFields: _root_.scalapb.UnknownFieldSet = _root_.scalapb.UnknownFieldSet.empty
    ) 

Let’s also make the following change to OrderClient.scala.

  private def processOrders(
      orderStub: OrderFs2Grpc[IO, Metadata],
      orders: Stream[IO, OrderRequest]
  ): IO[List[String]] = {
    for {
      response <- orderStub.sendOrderStream(
        orders,
        new Metadata()
      )
      str <- Stream.eval(
-          IO(s"Processed orderid: ${response.orderid} for items ${response.names
-              .mkString(",")} totaling to USD ${response.total}") 
+         IO(s"Processed orderid: ${response.orderid} for items: ${formatItemsToStr(response.items)
+             .mkString(" and ")}, totaling to ${response.total.toString}"
          )
        )
      } yield str
  }.compile.toList

Here we use SquantstoString method which formats our currency values as follows: xxx USD.

Lastly, let’s edit AppRoutes.scala as well.

...
  given itemDecoder: Decoder[Item] = Decoder.instance { h =>
    for {
      name <- h.get[String]("name")
      qty <- h.get[Int]("quantity")
      amount <- h.get[Double]("amount")
    } yield Item.of(
      name,
      qty,
-     amount
+     USD(amount)
    )
  }
...

Here we add USD(amount) of type Money for our itemDecoder.

The application should now run with the updated changes.

9. Conclusion

In this article, we’ve looked at how to create a gRPC service in Scala using Fs2Grpc, this includes server and client implementations and how they communicate with each other. gRPC is particularly popular for communication between microservices and uses HTTP/2 which provides better performance and reduced latency.

Scalapb has a lot more to offer than what’s been discussed in this article such as more customizations, transformations, extra sbt settings as well as some extra guides on usage with other Scala libraries, I encourage you to dig into the documentation for more in-depth knowledge on gRPC in Scala.

The complete code for this article can be found (in a slightly modified version) over on GitHub.

Updated: