gRPC in Scala with Fs2 and Scalapb
The video version is here:
1. Introduction
RPC stands for Remote Procedure Call, it’s a client-server communication protocol where one program can request a service on a different address that may be on the same or different system connected by a network. It enables users to work with remote procedures as if they were local.
In this article we will cover gRPC
which is a modern Open Source RPC framework designed by Google that uses Protocol Buffers
for data serialization and HTTP/2
as a transport layer. gRPC
is language independent therefore it’s possible for a client written in one language to communication with a server written in another.
At the time of writing this article, gRPC
officially supports 11 programming languages which include Python, Java, Kotlin, and C++ to mention but a few. Scala is not officially supported at the moment however the ScalaPB
library provides a good wrapper around the official gRPC Java
implementation, it provides an easy API that enables translation of Protocol Buffers
to Scala case classes with support for Scala3, Scala.js, and Java Interoperability.
One of the main use cases for gRPC
is communication between internal microservices. In the following sections, we’ll create a gRPC
server and client that will process streams of orders sent from an online store.
We’ll also learn how to write a .proto
file which we’ll use to generate necessary case classes and traits
using the Fs2Grpc
library and finally dig into Scalapb
to generate custom types for our application.
2. Setting Up
In our new Scala project, let’s create a plugins.sbt
file in the project
folder with the following code:
addSbtPlugin("org.typelevel" % "sbt-fs2-grpc" % "2.7.4")
Also add the following code to your build.sbt
file:
val scala3Version = "3.3.0"
val http4sVersion = "0.23.23"
val weaverVersion = "0.8.3"
lazy val protobuf =
project
.in(file("protobuf"))
.settings(
name := "protobuf",
scalaVersion := scala3Version
)
.enablePlugins(Fs2Grpc)
lazy val root =
project
.in(file("."))
.settings(
name := "root",
scalaVersion := scala3Version,
libraryDependencies ++= Seq(
"io.grpc" % "grpc-netty-shaded" % scalapb.compiler.Version.grpcJavaVersion,
"org.http4s" %% "http4s-ember-server" % http4sVersion,
"org.http4s" %% "http4s-dsl" % http4sVersion,
"org.http4s" %% "http4s-circe" % http4sVersion,
"com.disneystreaming" %% "weaver-cats" % weaverVersion % Test
),
testFrameworks += new TestFramework("weaver.framework.CatsEffect")
)
.dependsOn(protobuf)
Here we have a root
project which depends on protobuf
, with this design, when root
is compiled, the Fs2Grpc
plugin will automatically generate case classes from our .proto
file.
We’ll also use circe
for JSON
encoding and decoding, and http4s
to develop our REST
server from which we’ll forward orders to the gRPC
order service.
3. The .proto file
Create a folder called protobuf
in the root of your project and add an orders.proto
file in the following path, protobuf/src/main/protobuf/orders.proto
.
Now add the following code to the file.
syntax = "proto3";
package com.rockthejvm.protos;
A .proto
file contains Google’s protocol buffer
language which is used to structure protocol buffer data. Scalapb
supports both proto2 and proto3 versions.
In the first line of the file, we define syntax
as proto3
which is the protobuf language version, next we specify package
as com.rockthejvm.protos
, this will be the namespace through which we access the case classes generated by Fs2Grpc
.
Let’s define some data structures that will be used in our application.
...
message Item{
string name = 1;
int32 qty = 2;
double amount = 3;
}
A message
is a data structure containing a set of typed fields. Item
contains data for each product in an order, it has three fields name
, qty
, and amount
of type string
, int32
, and double
respectfully. The “= 1”, “= 2” and “= 3” are unique tags used by each field in binary encoding. message
gets translated to a case class during compilation.
...
message OrderRequest {
int32 orderid = 1;
repeated Item items = 2;
}
message OrderReply {
int32 orderid = 1;
repeated Item items = 2;
double total = 3;
}
Above we define two more message
types, OrderRequest
, and OrderReply
. OrderRequest
consists of orderid
of type int32
and items
of type Item
, this shows that we can nest a message
within a message
. The repeated
annotation means that items
can be repeated any number of times, in Scala this becomes a Seq
of Item
.
OrderReply
also consists of an orderid
of type int32
, items
, a repeated
Item
, and total
of type double
.
...
service Order {
rpc SendOrderStream (stream OrderRequest) returns (stream OrderReply) {}
}
Lastly, we define our Order
service which contains a single function, SendOrderStream()
that takes a stream of OrderRequest
and returns a stream of OrderReply
. The Order
service will be translated to a Scala trait
through which we can access our function.
Here’s the full code:
syntax = "proto3";
package com.rockthejvm.protos;
message Item{
string name =1;
int32 qty = 2;
double amount = 3 [(scalapb.field).type = "squants.market.Money"];
}
message OrderRequest {
int32 orderid = 1;
repeated Item items = 2;
}
message OrderReply {
int32 orderid = 1;
repeated Item items = 2;
double total = 3 [(scalapb.field).type = "squants.market.Money"];
}
service Order {
rpc SendOrderStream (stream OrderRequest) returns (stream OrderReply) {}
}
Now we can head over to our terminal, navigate to our project directory and run sbt compile
, Fs2Grpc
should generate the necessary Scala files according to what we defined in orders.proto
.
After compilation, we’ll find two sets of generated files in protobuf/target/scala-3.3.0/src_managed/main
, one under a Fs2Grpc
folder that contains our trait
while the rest are cases classes will be under a Scalapb
folder. If we look at the top of each of these files, we’ll find that they are all under the com.rockthejvm.protos.orders
package namespace that we defined in our orders.proto
file.
4. The gRPC server
Let’s take a look at the OrderFs2Grpc.scala
file generated by Fs2Grpc
:
trait OrderFs2Grpc[F[_], A] {
def sendOrderStream(request: _root_.fs2.Stream[F, com.rockthejvm.protos.orders.OrderRequest], ctx: A): _root_.fs2.Stream[F, com.rockthejvm.protos.orders.OrderReply]
}
We’ll notice that an OrderFs2Grpc
trait
was created with a single function sendOrderStream()
similar to what we defined in orders.proto
, however, this function has no implementation, this is what we’ll need to provide in the next step.
Create OrderService.scala
in the following path src/main/scala/OrderService.scala
and add the following code:
package com.rockthejvm.service
import com.rockthejvm.protos.orders.*
import cats.effect.*
import io.grpc.*
import fs2.Stream
class OrderService extends OrderFs2Grpc[IO, Metadata] {
override def sendOrderStream(
request: Stream[IO, OrderRequest],
ctx: Metadata
): Stream[IO, OrderReply] = {
request.map { orderReq =>
OrderReply(
orderReq.orderid,
orderReq.items,
orderReq.items.map(i => i.amount).reduce(_ + _)
)
}
}
}
Here we create a new class OrderService
that extends our OrderFs2Grpc
trait, it requires type parameters, an effect type, and a context which we provide as IO
from cats.effect
and Metadata
from io.grpc
. Metadata
provides access to read and write metadata values to be exchanged during a call.
In our implementation of sendOrderStream
above, we map on request
which is an fs2
stream of type Stream[IO, OrderRequest]
and for each OrderRequest
, we produce an OrderReply
with the same orderid
, items
and the total
amount spent calculated using the reduce(_+_)
function.
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
import fs2.grpc.syntax.all.*
// ...
object Server {
private val orderService: Resource[IO, ServerServiceDefinition] =
OrderFs2Grpc.bindServiceResource[IO](new OrderService)
private def runServer(
service: ServerServiceDefinition
): Resource[IO, Server] =
NettyServerBuilder
.forPort(9999)
.addService(service)
.resource[IO]
val grpcServer: Resource[IO, Server] =
orderService
.flatMap(x => runServer(x))
}
Above we create our gRPC
service by calling the bindServiceResource()
method on OrderFs2Grpc
and passing it a service implementation, in our case a new instance of OrderService
, this returns a Resource[IO, ServerServiceDefinition]
.
The runServer
function uses NettyServerBuilder
to create our server which will run on port 9999
. In the last line, we create our final Resource
by calling flatMap
on the orderService
Resource
and passing the ServerServiceDefinition
to runServer()
.
Here’s the full code:
package com.rockthejvm.service
import com.rockthejvm.protos.orders.*
import cats.effect.*
import io.grpc.*
import fs2.Stream
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
import fs2.grpc.syntax.all.*
class OrderService extends OrderFs2Grpc[IO, Metadata] {
override def sendOrderStream(
request: Stream[IO, OrderRequest],
ctx: Metadata
): Stream[IO, OrderReply] = {
request.map { orderReq =>
OrderReply(
orderReq.orderid,
orderReq.items.map(i => i.name),
orderReq.items.map(i => i.amount).reduce(_ + _)
)
}
}
}
object Server {
private val orderService: Resource[IO, ServerServiceDefinition] =
OrderFs2Grpc.bindServiceResource[IO](new OrderService)
private def runServer(
service: ServerServiceDefinition
): Resource[IO, Server] =
NettyServerBuilder
.forPort(9999)
.addService(service)
.resource[IO]
val grpcServer: Resource[IO, Server] =
orderService
.flatMap(x => runServer(x))
}
4. The gRPC client
Create OrderClient.scala
in the following path, src/main/scala/com/rockthejvm/client/OrderClient.scala
and add the following code.
package com.rockthejvm.client
import cats.effect.*
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import io.grpc.*
import fs2.grpc.syntax.all.*
object Client {
private val resource: Resource[IO, OrderFs2Grpc[IO, Metadata]] =
NettyChannelBuilder
.forAddress("127.0.0.1", 9999)
.usePlaintext()
.resource[IO]
.flatMap(ch => OrderFs2Grpc.stubResource[IO](ch))
}
Here we create a Client
object with its first element, resource
. We use NettyChannelBuilder
to construct a channel that uses Netty transport. A channel provides a connection to a network socket or component that is capable of I/O operations such as read, write, connect, and bind.
The forAddress()
method adds host and port values (same as our server) while the usePlaintext()
method sets the negotiation type for the Http/2 connection to PLAINTEXT
instead of TLS
which is the default for a ManagedChannel
, and the resource()
extension method from FS2 builds the ManagedChannel
into a Resource
. We then create a stubResource
out of that channel, finally returning an instance of OrderFs2Grpc[IO, Metadata]
.
The gRPC client will enable us to directly call methods on this instance, therefore we’ll be able to process multiple orders from an fs2
stream by invoking the sendOrderStream()
function, ignoring that the actual implementation of that method involves a remote connection to a different service, completing the RPC protocol.
Here’s how we can implement this:
// ...
import com.rockthejvm.protos.orders.*
import fs2.Stream
object Client {
// ...
private def formatItemsToStr(items: Seq[Item]): Seq[String] = {
items.map(x => s"[${x.qty} of ${x.name}]")
}
private def processOrders(
orderStub: OrderFs2Grpc[IO, Metadata],
orders: Stream[IO, OrderRequest]
): IO[List[String]] = {
for {
response <- orderStub.sendOrderStream(
orders,
new Metadata()
)
str <- Stream.eval(
IO(
s"Processed orderid: ${response.orderid} for items: ${formatItemsToStr(response.items)
.mkString(" and ")}, totaling to ${response.total.toString}"
)
)
} yield str
}.compile.toList
}
We define processOrders()
, a function that takes a service
of type OrderFs2Grpc[IO, Metadata]
and orders
of type Stream[IO, OrderRequest]
as arguments. When we call sendOrderStream(orders, new Metadata())
on orderStub
, we get a Stream[IO, OrderReply]
, which we flatMap
to format each OrderReply
into a String
that informs the user of their orderid
, the items
they bought and the total
amount they spent for each order, this Stream
is compiled to return an IO[List[String]]
.
We use the formatItemsToStr()
function to format the Seq[Item]
as part of the returned String
.
Here’s the full code:
package com.rockthejvm.client
import cats.effect.*
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import io.grpc.*
import fs2.grpc.syntax.all.*
import com.rockthejvm.protos.orders.*
import fs2.Stream
object Client {
private val resource: Resource[IO, OrderFs2Grpc[IO, Metadata]] =
NettyChannelBuilder
.forAddress("127.0.0.1", 9999)
.usePlaintext()
.resource[IO]
.flatMap(ch => OrderFs2Grpc.stubResource[IO](ch))
private def formatItemsToStr(items: Seq[Item]): Seq[String] = {
items.map(x => s"[${x.qty} of ${x.name}]")
}
private def processOrders(
service: OrderFs2Grpc[IO, Metadata],
orders: Stream[IO, OrderRequest]
): IO[List[String]] = {
for {
response <- service.sendOrderStream(
orders,
new Metadata()
)
str <- Stream.eval(
IO(
s"Processed orderid: ${response.orderid} for items: ${formatItemsToStr(response.items)
.mkString(" and ")}, totaling to ${response.total.toString}"
)
)
} yield str
}.compile.toList
}
5. The Routes
In this section, we define the logic to handle incoming requests to our REST
server. Create AppRoutes.scala
in the following path src/main/scala/com/rockthejvm/routes/AppRoutes.scala
, and add the code below:
package com.rockthejvm.routes
import cats.effect.IO
import org.http4s.*
import org.http4s.dsl.io.*
object AppRoutes {
def restService = HttpRoutes.of[IO] {
case req @ GET -> Root / "index.html" =>
StaticFile
.fromString[IO](
"src/main/resources/index.html",
Some(req)
)
.getOrElseF(NotFound())
}
}
Above we define our first route /index.html
which serves the client with an HTML
page where one can make orders from our online store. It uses the fromString()
method from Http4s
’s StaticFile
object which takes a URL
String
to an HTML
file and an Option
of a Request[IO]
. If the URL
is incorrect the client receives status code 404
, meaning the page is not available.
// ...
import com.rockthejvm.protos.orders.OrderRequest
object AppRoutes {
case class Orders(values: Seq[OrderRequest])
def restService = HttpRoutes.of[IO] {
// ...
case req @ POST -> Root / "submit" => ???
}
}
Next, we create our second route, /submit
which will receive POST
requests with a user’s orders. We create an Orders
case class to capture the Seq
of OrderRequest
s that the user sends. However, to parse our request, we’ll need an EntityDecoder
for Orders
in scope. We’ll also need Decoder
s for Orders
, OrderRequest
, and Item
in scope.
import com.rockthejvm.protos.orders.{OrderRequest, Item}
import org.http4s.circe.*
import io.circe.Decoder
import io.circe.syntax.*
import scala.util.Random
object AppRoutes {
case class Orders(values: Seq[OrderRequest])
object Orders {
private given itemDecoder: Decoder[Item] = Decoder.instance { h =>
for {
name <- h.get[String]("name")
qty <- h.get[Int]("quantity")
amount <- h.get[Double]("amount")
} yield Item.of(
name,
qty,
amount
)
}
private given orDecoder: Decoder[OrderRequest] = Decoder.instance { h =>
h.get[Seq[Item]]("items").map { items =>
OrderRequest.of(Random.between(1000, 2000), items)
}
}
given ordersDecoder: Decoder[Orders] = Decoder.instance { h =>
h.get[Seq[OrderRequest]]("orders").map { orders =>
Orders(orders)
}
}
given ordersEntityDecoder: EntityDecoder[IO, Orders] = jsonOf[IO, Orders]
}
//...
}
Here we create a companion object for Orders
, the last line is an EntityDecoder
for Orders
which uses the jsonOf[IO, Orders]
function provided by http4s.circe
. For this EntityDecoder
to work, we needed to provide a given
of type Decoder[Orders]
, ordersDecoder
, which is implemented with the help of circe
’s Decoder.instance
function. We target the orders
key from the incoming JSON
string and pass its value to the Orders
case class.
The ordersDecorder
also requires the presence of given
Decoder
s for Item
and OrderRequest
in scope, which were also added to the Orders
companion object. When creating the OrderRequest
Decoder
, we use a Random
Int
between 1000 and 2000 to simulate the orderid
.
Scalapb
provides a convenient of()
function to pass data to our case class without having to deal with UnknownFieldSet.empty
which if you check on the generated code is part of every case class.
We can also pass values to these case classes in different ways depending on your preference, here are some examples with OrderRequest
:
// Using the of() method
OrderRequest.of(Random.between(1000, 2000), items)
//Using withX() method
OrderRequest()
.withOrderid(Random.between(1000,2000))
.withItems(items)
//Default method
// Here we would have to import UnknownFieldSet
import scalapb.UnknownFieldSet
OrderRequest(
Random.between(1000,2000),
items,
_root_.scalapb.UnknownFieldSet.empty
)
Let’s see how to process the incoming POST
requests.
// ...
import com.rockthejvm.client.Client
import fs2.Stream
object AppRoutes {
// ...
def restService(service: OrderFs2Grpc[IO, Metadata]) = HttpRoutes.of[IO] {
// ...
case req @ POST -> Root / "submit" =>
req
.as[Orders]
.flatMap { x =>
Client.processOrders(service, Stream.emits(x.values).covary[IO])
}
.handleError(x => List(x.getMessage))
.flatMap(x => Ok(x.asJson))
}
}
Using the as()
method on the request parses our JSON string returning IO[Orders]
. We then flatMap
on this to pass Orders
to our processOrders()
function which communicates with the gRPC
server to process them. This returns either an IO[List[String]]
in case of success or an error message which we package as a List
in case of failure. This List
is then passed back to the browser as a JSON
array of strings.
Here’s the full code:
package com.rockthejvm.routes
import cats.effect.IO
import org.http4s.*
import org.http4s.dsl.io.*
import com.rockthejvm.protos.orders.{OrderRequest, Item}
import org.http4s.circe.*
import io.circe.Decoder
import io.circe.syntax.*
import scala.util.Random
import com.rockthejvm.client.Client
import fs2.Stream
object AppRoutes {
case class Orders(values: Seq[OrderRequest])
object Orders {
private given itemDecoder: Decoder[Item] = Decoder.instance { h =>
for {
name <- h.get[String]("name")
qty <- h.get[Int]("quantity")
amount <- h.get[Double]("amount")
} yield Item.of(
name,
qty,
amount
)
}
private given orDecoder: Decoder[OrderRequest] = Decoder.instance { h =>
h.get[Seq[Item]]("items").map { items =>
OrderRequest.of(Random.between(1000, 2000), items)
}
}
given ordersDecoder: Decoder[Orders] = Decoder.instance { h =>
h.get[Seq[OrderRequest]]("orders").map { orders =>
Orders(orders)
}
}
given ordersEntityDecoder: EntityDecoder[IO, Orders] = jsonOf[IO, Orders]
}
def restService(service: OrderFs2Grpc[IO, Metadata]) = HttpRoutes.of[IO] {
case req @ GET -> Root / "index.html" =>
StaticFile
.fromString[IO](
"src/main/resources/index.html",
Some(req)
)
.getOrElseF(NotFound())
case req @ POST -> Root / "submit" =>
req
.as[Orders]
.flatMap { x =>
Client.processOrders(service, Stream.emits(x.values).covary[IO])
}
.handleError(x => List(x.getMessage))
.flatMap(x => Ok(x.asJson))
}
}
6. The “web UI”
Create the index.html
file in the following path, src/main/resources/index.html
, this file is quite lengthy, we can copy the contents from this link.
It consists of 3 forms each with the following format:
<form>
<fieldset>
<span style="font-weight: bold; color: darkorchid">Samsung</span><br />
<span style="color: crimson">FlagShip Phones</span><br />
<input type="radio" id="phone1" name="flagship" value="s23"/>
<label for="phone1">Samsung Galaxy s23 Ultra -> $999.99</label><br />
<input type="radio" id="phone2" name="flagship" value="fold5" checked/>
<label for="phone2">Samsung Galaxy Z Fold 5 -> $1800.00</label><br />
<input type="radio" id="phone3" name="flagship" value="note"/>
<label for="phone3">Samsung Galaxy Note 23 -> $1000.00</label><br />
<span style="color: crimson">Quantity</span><br />
<input type="number" name="flagshipQty" min=1 max=4 value="1"/><br />
<br />
<span style="color: crimson">Budget Phones</span><br />
<input type="radio" id="phone4" name="budget" value="a23" checked/>
<label for="gs1">Samsung Galaxy A23 5G -> $299.99</label><br />
<input type="radio" id="phone5" name="budget" value="a54" />
<label for="gs2">Samsung Galaxy A54 -> $449.99</label><br />
<input type="radio" id="phone6" name="budget" value="a42" />
<label for="gs3">Samsung A42 5G -> $399.99</label><br />
<span style="color: crimson">Quantity</span><br />
<input type="number" name="budgetQty" min=1 max=4 value="1"/><br />
<br />
</fieldset>
</form>
Each form has two sections where one can select a Flagship phone and a Budget phone with one’s preferred quantity, some default values have been set but can be changed to your preference.
<div>
<button id="submitBtn">Submit All</button>
</div>
<div id="reply"></div>
We provide a Submit All
button that will fetch all the inputted data using JavaScript, post the data, and append the response to the div
with the id reply
.
const prices = {
iphone: 1099.99,
s23: 999.99,
edge: 799.99,
nord: 270.0,
a14: 200.0,
motog: 170.0,
};
const names = {
iphone: "Iphone 14",
s23: "Samsung Galaxy s23 Ultra",
edge: "Motorola Moto Edge+",
nord: "OnePlus Nord N30",
a14: "Samsung Galaxy A14",
motog: "Motorola Moto G Stylus",
};
const forms = document.getElementsByTagName("form");
const submitButton = document.getElementById("submitBtn");
const replyDiv = document.getElementById("reply");
To process the forms we first create two objects containing the phone prices and phone names. We also get all the form elements and assign them to forms
using the document.getElementsByTagName("form")
function. Next, we get the submit button and div to post our response and assign them to submitButton
and replyDiv
respectively using the document.getElementById()
function.
...
submitButton.addEventListener("click", (event) => {
var orderObj = { orders: [] };
for (let i = 0; i < forms.length; i++) {
const formData = new FormData(forms[i]);
const jsonObject = Object.fromEntries(formData);
var jsonFormatted = {
items: [
{
name: names[jsonObject["flagship"]],
quantity: jsonObject["flagshipQty"],
amount: prices[jsonObject["flagship"]],
},
{
name: names[jsonObject["budget"]],
quantity: jsonObject["budgetQty"],
amount: prices[jsonObject["budget"]],
},
],
};
orderObj.orders.push(jsonFormatted);
}
}
Here, we create an event listener for a click
event on the submit Button. When the button is clicked we run through all the forms using a for loop
and fetch the data from each by calling new FormData(forms[i])
.
We then reformat the data to resemble our Scala case classes and assign each form’s data to jsonFormatted
, this is then pushed to orderObj
which is now an object
containing an Array
of orders
.
submitButton.addEventListener("click", (event) => {
...
async function sendForm() {
const res = await fetch("http://localhost:8080/submit", {
method: "POST",
body: JSON.stringify(orderObj),
});
const resData = await res.json();
return resData;
}
return sendForm()
.then((arr) => arr.map((str) => {
var p = document.createElement('p')
p.textContent = str;
p.style.color = "darkorchid";
replyDiv.appendChild(p);
}))
.catch((err) => (replyDiv.innerHTML = err));
}
The last section is an async
function sendForm()
that uses JavaScripts fetch API
to submit our from data as JSON
and append the response to the a div
.
Heres the full javaScript code:
const prices = {
s23: 999.99,
fold5: 1800.00,
note: 1000.00,
a23: 299.99,
a54: 449.99,
a42: 399.99,
edgeNew: 799.99,
razr: 899.99,
think: 700.00,
edgeOld: 249.99,
stylus: 300.00,
motog: 250.00,
fiveiv: 1099.99,
onev: 1199.00,
pro: 999.99,
lfour: 270.00,
teniv: 200.00,
tenv: 409.00,
};
const names = {
s23: "Samsung Galaxy s23 Ultra",
fold5: "Samsung Galaxy Z Fold 5",
note: "Samsung Galaxy Note 23",
a23: "Samsung Galaxy A23 5G ",
a54: "Samsung Galaxy A54",
a42: "Samsung A42 5G",
edgeNew: "Motorola Moto Edge+",
razr: "Motorola Moto Razr+",
think: "Motorola ThinkPhone",
edgeOld: "Motorola Moto Edge",
stylus: "Moto G Stylus 5G",
motog: "Moto G Power",
fiveiv: "Sony Xperia 5 IV",
onev: "Sony Xperia 1 V",
pro: "Sony Xperia Pro-1",
lfour: "Sony Xperia L4",
teniv: "Sony Xperia 10 IV",
tenv: "Sony Xperia 10 V",
};
const forms = document.getElementsByTagName("form");
const myButton = document.getElementById("submitBtn");
const replyDiv = document.getElementById("reply");
myButton.addEventListener("click", (event) => {
var orderObj = { orders: [] };
for (let i = 0; i < forms.length; i++) {
const formData = new FormData(forms[i]);
const jsonObject = Object.fromEntries(formData);
var jsonFormatted = {
items: [
{
name: names[jsonObject["flagship"]],
quantity: jsonObject["flagshipQty"],
amount: prices[jsonObject["flagship"]],
},
{
name: names[jsonObject["budget"]],
quantity: jsonObject["budgetQty"],
amount: prices[jsonObject["budget"]],
},
],
};
orderObj.orders.push(jsonFormatted);
}
async function sendForm() {
const res = await fetch("http://localhost:8080/submit", {
method: "POST",
body: JSON.stringify(orderObj),
});
const resData = await res.json();
return resData;
}
return sendForm()
.then((arr) => arr.map((str) => {
var p = document.createElement('p')
p.textContent = str;
p.style.color = "darkorchid";
replyDiv.appendChild(p);
}))
.catch((err) => (replyDiv.innerHTML = err));
});
7. The main program
In this section, we bring everything together to run our program. First, we’ll need a server to run restService
that we created previously. Create Main.scala
in the following path, src/main/scala/Main.scala
, and add the following code.
import cats.effect.*
import org.http4s.ember.server.EmberServerBuilder
import com.comcast.ip4s.*
object Main extends IOApp {
def httpServerResource(remoteService: OrderFs2Grpc[IO, Metadata]) =
EmberServerBuilder
.default[IO]
.withHost(host"0.0.0.0")
.withPort(port"8080")
.withHttpApp(AppRoutes.restService(remoteService).orNotFound)
.build
}
We use EmberServerBuilder
to create our server with port number 8080 and build it with our restService
.
import com.rockthejvm.service.Server.grpcServer
import cats.syntax.parallel.*
object Main extends IOApp {
// ...
def run(args: List[String]): IO[ExitCode] = for {
remoteService <- grpcServer
.evalMap(svr => IO(svr.start()))
.useForever
.fork
httpServer <- httpServerResource(remoteService).useForever.fork
_ <- IO.never
} yield ()
}
In the run()
function we call both httpServer
and grpcServer
in parellel. We also call evalMap()
on the grpcServer
Resource
to start the server and make sure it keeps running by calling useForever()
.
Now Let’s test our application.
When we run our server and navigate to localhost:8080/index.html
, we should be greeted with the following page.
Once the form is filled and submitted, we should see a response similar to the following.
8. Managing Currency with Squants
Throughout the article we’ve been representing money as Double
, we could improve our code by using the Squants
library to manage currency values, specifically Squants
’ Money
type.
Therefore the question arises, if the case classes are generated by Scalapb
, how can we make it so that Scalapb
generates Squants
’ Money
type for amount
and total
? Scalapb
provides a mechanism for customizing generated types convenient for such scenarios.
We’ll need to add the following two libraries to our protobuf
project inside build.sbt
.
...
lazy val protobuf =
project
.in(file("protobuf"))
.settings(
name := "protobuf",
scalaVersion := scala3Version,
+ libraryDependencies += "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf",
+ libraryDependencies += "org.typelevel" %% "squants" % squantsVersion
)
.enablePlugins(Fs2Grpc)
...
Once we compile our project, we’ll notice a new file, scalapb.proto
in the following path protobuf/target/protobuf_external/scalapb/scalapb.proto
which contains the necessary methods to make our type conversion.
Still in orders.proto
, let’s import this new proto file into scope.
syntax = "proto3";
package com.rockthejvm.protos;
import "scalapb/scalapb.proto";
Note: If you are using IntelliJ, this import might show red
because the proto plugin hasn’t scanned the protobuf_external
path, therefore, you’ll need to add it there. IntelliJ will show this as a suggestion.
Now we can make changes to the amount
and total
for Item
and OrderReply
.
...
message Item{
string name =1;
int32 qty = 2;
- double amount = 3;
+ double amount = 3 [(scalapb.field).type = "squants.market.Money"];
}
...
message OrderReply {
int32 orderid = 1;
repeated Item items = 2;
- double total = 3;
+ double total = 3 [(scalapb.field).type = "squants.market.Money"];
}
Above, we set (scalapb.field).type
to squants.market.Money
which is the custom type we need for amount
and total
.
A few more extra steps are still required, let’s create a Customtype.scala
file in the following path protobuf/src/main/protobuf/Customtype.scala
and add the following code.
package com.rockthejvm.protos
import squants.market.{USD, Money}
import scalapb.TypeMapper
given typeMapper: TypeMapper[Double, Money] =
TypeMapper[Double, Money](s => USD(s))(_.amount.toDouble)
Scalapb
provides a TypeMapper
object whose apply
method takes two arguments, a function from the base type to a custom type i.e. Double
to Money
, and a function from the custom type back to the base type i.e. Money
to Double
. We provide typeMapper
as a given
so that when in scope, the generated Scalapb
case classes will contain the expected Money
type instead of Double
.
To bring this given
into scope when the case classes are generated, we’ll need to add this import as a File-level
option inside our orders.proto
file.
...
import "scalapb/scalapb.proto";
+ option (scalapb.options) = {
+ import: "com.rockthejvm.protos.given"
+ };
...
Now that everything is set, let’s compile our project and take a look at the cases classes generated by Scalapb
. We’ll notice at the top of each of them is our given
import.
package com.rockthejvm.protos.orders
import com.rockthejvm.protos.given
...
And Item
and OrderReply
now have amount
and total
for Item
and OrderReply
respectively of type Money
.
final case class Item(
name: _root_.scala.Predef.String = "",
qty: _root_.scala.Int = 0,
amount: squants.market.Money = com.rockthejvm.protos.orders.Item._typemapper_amount.toCustom(0.0),
unknownFields: _root_.scalapb.UnknownFieldSet = _root_.scalapb.UnknownFieldSet.empty
)
Let’s also make the following change to OrderClient.scala
.
private def processOrders(
orderStub: OrderFs2Grpc[IO, Metadata],
orders: Stream[IO, OrderRequest]
): IO[List[String]] = {
for {
response <- orderStub.sendOrderStream(
orders,
new Metadata()
)
str <- Stream.eval(
- IO(s"Processed orderid: ${response.orderid} for items ${response.names
- .mkString(",")} totaling to USD ${response.total}")
+ IO(s"Processed orderid: ${response.orderid} for items: ${formatItemsToStr(response.items)
+ .mkString(" and ")}, totaling to ${response.total.toString}"
)
)
} yield str
}.compile.toList
Here we use Squants
’ toString
method which formats our currency values as follows: xxx USD
.
Lastly, let’s edit AppRoutes.scala
as well.
...
given itemDecoder: Decoder[Item] = Decoder.instance { h =>
for {
name <- h.get[String]("name")
qty <- h.get[Int]("quantity")
amount <- h.get[Double]("amount")
} yield Item.of(
name,
qty,
- amount
+ USD(amount)
)
}
...
Here we add USD(amount)
of type Money
for our itemDecoder
.
The application should now run with the updated changes.
9. Conclusion
In this article, we’ve looked at how to create a gRPC
service in Scala using Fs2Grpc
, this includes server and client implementations and how they communicate with each other. gRPC
is particularly popular for communication between microservices and uses HTTP/2
which provides better performance and reduced latency.
Scalapb
has a lot more to offer than what’s been discussed in this article such as more customizations, transformations, extra sbt settings as well as some extra guides on usage with other Scala libraries, I encourage you to dig into the documentation for more in-depth knowledge on gRPC
in Scala.
The complete code for this article can be found (in a slightly modified version) over on GitHub.