3. Reactive Stream messaging in Java using Quarkus

3. Reactive Stream messaging in Java using Quarkus

This is the 3rd blog as part of the series Full Stack: Remastering Master Data Management into graph like data. Hope you enjoy the series and find it useful !!

Introduction

We often come around a situation where we get a need to process asynchronous I/O operation like emitting some data or message asynchronously or consume some data as and when received and forward to database or any other messaging systems. In this kind of situations, we often look out for continuous Reactive streaming or processing of data.

In this post, we are going to talk about Reactive Programming, i.e. a development model structured around asynchronous data streams. When using reactive programming, data streams are going to be the spine of our application. Events, messages, calls, and even failures are going to be conveyed by a data stream. With reactive programming, you observe these streams and react when a value is emitted or received.

As we have discussed in our previous post much about the benefits in using Quarkus, lets see what more it can provide. So, Quarkus supports SmallRye Reactive Messaging which is an implementation of the Eclipse MicroProfile Reactive Messaging specification 1.0. It lets your application interact using various messaging technologies such as Apache Kafka, AMQP or MQTT. The framework provides a flexible programming model bridging CDI and event-driven. So lets start.

Core concept

SmallRye Reactive Messaging provides a way to implement reactive data streaming application using a CDI development model. In a nutshell MicroProfile Reactive Messaging is based around three main concepts:

  • Message: A Message (defined by org.eclipse.microprofile.reactive.messaging.Message) is an envelope around a payload.

  • Incoming: Incoming is an annotation indicating that the method consumes a stream. The name of the stream is given as attribute such as in:

@Incoming("data-in")
public void consume(Message<String> s) {
  // ...
}
  • Outgoing: Outgoing is an annotation indicating that the method feeds a stream. The name of the stream is given as attribute:
@Outgoing("data-out")
public Message<String> produce() {
  // ...
}

Of course, methods can also use both annotations to transform the incoming messages:

@Incoming("data-in")
@Outgoing("data-out")
public String toUpperCase(String input) {
  return input.toUpperCase();
}

SmallRye Reactive Messaging automatically binds matching @Outgoing to @Incoming to form a chain.

Dependencies

Lets start by adding our dependencies. If you open the Get Started page, you will find that there are multiple extensions or libraries to support Reactive Messaging.

SmallRye.PNG

We will use SmallRye Reactive Messaging- Kafka Connector. We will also use SmallRye Health and SmallRye Metrics to know and monitor the health and performance of my system. You can simply add dependencies like:

<dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-health</artifactId>
</dependency>
<dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-metrics</artifactId>
</dependency>

Kafka Sink

Now let's see how we can implement a simple Kafka sink. We would like to fetch data in a periodic sequence of interval and dump it into a Kafka topic. So we should be using the functionality of Outgoing:

@Outgoing("entity-identity")
public Flowable<String> stream() {
    Gson gson = new Gson();
    return Flowable
        .interval(1, 360, TimeUnit.MINUTES) // (io.reactivex.Flowable.interval(long initialDelay, long period, TimeUnit unit))
        .map(new Function<Long, List<User>>() {
            @Override
            public List<User> apply(Long t) throws Exception {
                return userRepository.listAll();
            }
        })
        .flatMap(list -> Flowable.fromIterable(list))
        .map(u -> gson.toJson(u));            
}

The configuration for this sink would look like:

#
# Configure the Kafka sink (we write to it)
#
mp.messaging.outgoing.entity-identity.connector=smallrye-kafka
mp.messaging.outgoing.entity-identity.bootstrap.servers=10.204.102.187:9092
mp.messaging.outgoing.entity-identity.topic=entity_identity
mp.messaging.outgoing.entity-identity.value.serializer=org.apache.kafka.common.serialization.StringSerializer

Kafka Source

Now let's see how we can implement a simple Kafka surce. We would like to fetch data as and when anything is dumped it into another Kafka topic. So we should be using the functionality of Incoming in this case:

@Incoming("entity-identity-delta")
public void saveToMySQL(String json) {
     Gson gson = new Gson();
     User user = gson.fromJson(json, User.class);
     userRepository.create(user);
}

The basic configuration for a source would look like:

#
# Configure the Kafka source (we fetch from it)
#
mp.messaging.incoming.entity-identity-delta.connector=smallrye-kafka
mp.messaging.incoming.entity-identity-delta.bootstrap.servers=127.0.0.1:9092
mp.messaging.incoming.entity-identity-delta.topic=entity_identity_delta
mp.messaging.incoming.entity-identity-delta.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

MicroProfile Health

MicroProfile Health allows applications to provide information about their state to external viewers which is typically useful in cloud environments where automated processes must be able to determine whether the application should be discarded or restarted.

@Liveness
@ApplicationScoped
public class SimpleHealthCheck implements HealthCheck {

    @Override
    public HealthCheckResponse call() {
        return HealthCheckResponse.up("Simple health check");
    }
}
@Readiness
@ApplicationScoped
public class DatabaseConnectionHealthCheck implements HealthCheck {

    @Override
    public HealthCheckResponse call() {
        return HealthCheckResponse.up("Database connection health check");
    }
}

@Liveness - accessible at /health/live. It states whether our application is running or not.

@Readiness - accessible at /health/ready. It simulates a connection to an external service provider such as a database and states whether the server or the application is ready or not.

MicroProfile Metrics

MicroProfile Metrics allows applications to gather various metrics and statistics that provide insights into what is happening inside the application.

The metrics can be read remotely using JSON format or the OpenMetrics format, so that they can be processed by additional tools such as Prometheus, and stored for analysis and visualization.

A simple Metrics can be added into a controller to see its performance or statistics like below:

    @Path("/all")
    @GET
    @Counted(name = "performedChecks", description = "How many checks have been performed.")
    @Timed(name = "checksTimer", description = "A measure of how long it takes to execute the query", unit = MetricUnits.MILLISECONDS)
    @Produces(MediaType.APPLICATION_JSON)
    public Response getAllUsers() {
        List<User> users = userService.getAllUsers();
        return Response.ok(users).build();
    }

With this, you can monitor the statistics by calling localhost:8080/users/metrics/application, you will receive additional response like:

{
    "org.practice.controllers.UserController.performedChecks" : 2,
    "org.practice.controllers.UserController.checksTimer" : 400
}

That's it ! Now you can run the application using:

mvn compile quarkus:dev

and you would see that your application periodically fetches the data from database and forward it to the above mentioned Kafka topic. Similarly, the Kafka source as and when receives data in the above mentioned Kafka topic will start consuming and processing the data easily.

Along with this, we have seen how easily we can configure HealthCheck Monitors which can always provide information regarding the application or system's performance or readiness.

Cover Photo credit

Cover Photo by Jeremiah Lawrence on Unsplash.