Real-time event streaming using Spring Webflux

4 min read ⋅ 1906 views

In this article, we're going to create a simple application using Spring Webflux. This application will showcase how to create a reactive web server that sends an event every second to the client. The client will then simply log the events as they come.

Maven Dependencies

Let's start this article off by defining the Maven Dependencies needed for this project to work:

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.0.2.RELEASE</version>

<relativePath/> <!-- lookup parent from repository -->

</parent>

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<java.version>1.8</java.version>

</properties>

<dependencies>

<dependency>

<groupId>org.projectlombok</groupId>

<artifactId>lombok</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-webflux</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-test</artifactId>

<scope>test</scope>

</dependency>

<dependency>

<groupId>io.projectreactor</groupId>

<artifactId>reactor-test</artifactId>

<scope>test</scope>

</dependency>

</dependencies>


Here, besides the needed dependencies for the project, we added an optional dependency for Project Lombok. If you're not familiar with it - Lombok is used to reduce the amount of boilerplate code in the domain model.

For an example - using it, you can completely skip writing setters and getters for your models by annotating the class with @Data. Similarly, annotating the class with @AllArgsConstructor allows you to skip writing a constructor by generating it with one parameter for each field in your class.

Working with Spring Webflux

This framework was born as Spring's answer to Reactive programming, and it features both client and server side support for creating reactive web applications. The main building blocks in Spring Webflux are Mono and Flux.

When a certain operation returns a single entity or object, the operation returns a Mono type of the said object. On the other hand, operations that return multiple elements, return a Flux type.

 Creating a model

Now, with all of the above in mind, we can start building our application, starting with the domain model and our Event class:

@Data

@AllArgsConstructor

public class Event {

private long id;

private Date date;

}

We've made a simple model containing only an id and a date
Note the use of Lombok when defining the Event model. We have no need to define a constructor for this class, neither do we need to write getters and setters for these fields.

 Creating a reactive server

With our domain model done, let's create our reactive server:

@SpringBootApplication

@RestController

public class ReactiveServer {

@GetMapping("/events/{id}")

Mono<Event> eventById(@PathVariable long id) {

return Mono.just(new Event(id, new Date()));

}

@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/events")

Flux<Event> events() {

Flux<Event> eventFlux = Flux.fromStream(Stream.generate(() -> new Event(System.currentTimeMillis(), new Date())));

Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(1));

return Flux.zip(eventFlux, durationFlux).map(Tuple2::getT1);

}

public static void main(String[] args) {

SpringApplication.run(ReactiveServer.class, args);

}

}

Note that the method that returns a single Event returns a Mono type, while the second method which deals with multiple events returns a Flux

The second method produces a TEXT_EVENT_STREAM_VALUE. This indicates that the data is being sent in the form of Server Sent Events, shortly known as SSE. 

We want the server to send a new Event every second so we define a Flux of Events with the id value of System.currentTimeMillis(), and a new Date() object as the date. This flux is used to generate the events, while on the other hand, the second flux is used to generate a new value each second. Now we need to combine these two fluxes - one to generate Events, and the other to dictate how often to do it.

To merge these two separate fluxes into one, we zip them together and then map the resulting flux, ultimately returning it the end of the method.

 Creating a reactive client

@SpringBootApplication

public class ReactiveClient {

@Bean

WebClient client() {

return WebClient.create("http://localhost:8080");

}

@Bean

CommandLineRunner demo(WebClient client) {

return args -> {

client.get()

.uri("/events")

.accept(MediaType.TEXT_EVENT_STREAM)

.exchange()

.flatMapMany(cr -> cr.bodyToFlux(Event.class))

.subscribe(System.out::println);

};

}

public static void main(String[] args) {

new SpringApplicationBuilder(ReactiveClient.class)

.properties(Collections.singletonMap("server.port", "8081"))

.run(args);

}

}

We're defining this client to work on port 8081, while pointing to the already existing server on 8080. Afterwards, we use a CommandLineRunner and inject our WebClient into it. 

Using the client, we firstly prepare an HTTP GET request to the server, and then define that the client is accepting a TEXT_EVENT_STREAM. Using exchange() we actually send the request, which provides us with a ClientResponse object, which we shortened to cr for convenience. 

At this point, we need to tell the WebClient what kind of response we are expecting. Since we are expecting a flux of events, we simply extract the ClientResponse's body to a Flux accordingly. 

As we've converted the result to our desired format, we can now consume it via a method reference, effectively finishing the application.

 Result

To test out our application, run both the ReactiveServer and the ReactiveClient

On Windows - open command prompt.
On Macintosh and Linux - open terminal.

We'll use curl as our command line tool to send HTTP requests to our server.

C:\Users\User>curl localhost:8080/events

Running this command will in turn return something like this:

data:{"id":1528794287030,"date":"2018-06-12T09:04:47.030+0000"}

data:{"id":1528794288032,"date":"2018-06-12T09:04:48.032+0000"}

data:{"id":1528794289032,"date":"2018-06-12T09:04:49.032+0000"}

data:{"id":1528794290031,"date":"2018-06-12T09:04:50.031+0000"}

data:{"id":1528794291031,"date":"2018-06-12T09:04:51.031+0000"}

data:{"id":1528794292032,"date":"2018-06-12T09:04:52.032+0000"}

You can see that all of these events occurred 1 second from each other, both in the time on the right, as well as the millisecond count on the left.

 Spring WebClient

If you're not entirely familiar with the Spring WebClient, let's dedicate this short section to explaining it. 

WebClient is an interface in the Spring Framework. It's included in the spring-boot-starter-webflux dependency. The main job of this interface is to act as an entry point for requests in your web application. 

It was developed along with the Spring Web Reactive module, and will from Spring 5 serve to replace RestTemplate in a non-blocking way.

Conclusion

This quick and easy tutorial covers how to create a simple reactive server that sends a custom object to the client that consumes it and logs them as they come each second in real-time using Spring Webflux.

If you'd like to take a look and play around with the source code, feel free to check out the repo on GitHub below:



Spring Java



MORE ARTICLES

New features in Java 10

This article is targeted towards the recent development in the Java programming community, regarding the newest Java SDK...

6946 views

Best (and Worst) Java Exception Handling Practices

 Handling Exceptions in Java is one of the fundamental things a developer should be experienced in - handling them is as...

5231 views

A Guide to Java IDEs

An integrated development environment (IDE) is the place a developer will spend most of their time. Having your IDE well...

3132 views