Reactive Programming basics in Java

Satyajit Nath Bhowmik
Javarevisited
Published in
4 min readFeb 21, 2021

--

When I started learning about reactive programming I found a lot of important concepts here and there, so thought of making notes, then later realized why not to publish it as a blog. So here you go,

What is reactive programming in java?

Reactive programming is a programming paradigm that promotes an asynchronous, non-blocking, event-driven approach to data processing.

The term, “reactive,” refers to programming models that are built around reacting to change — network components reacting to I/O events, UI controllers reacting to mouse events, and others.

In that sense, non-blocking is reactive, because, instead of being blocked, we are now in the mode of reacting to notifications as operations complete or data becomes available.

What are Mono and Flux?

Mono and Flux both are publishers. in java, both are implements from CorePublisher, and CorePublisher is extending from Publisher.

Flux implements CorePublisher
Core Publisher extends Publisher

Flux is a publisher that publishes zero or more values and Mono publishes zero or one value.

Let's see some examples now

Below is the example of Flux,

And to make the data flow you have to do subscribe, without subscription data never flows. By default stream is lazy and that means without you consume nothing is executed.

Output

And flux can be infinite. We can make use of functions like takeUntil or take to stop.

Usages of takeUntil

Flux.fromIterable(lists).takeUntil(s -> s.equalsIgnoreCase("End")).subscribe(System.out::println);

What is publishOn and subscribeOn ?

publishOn & subscribeOn are convenient methods in Project Reactor which accepts any of the Schedulers to change the task execution context for the operations in a reactive pipeline. While subscribeOn forces the source emission to use specific Schedulers, publishOn changes Schedulers for all the downstream operations.

publishOn Example :

Flux<Integer> flux = Flux.range(0, 2)
.publishOn(Schedulers.parallel())
.map(i -> {
System.out.println("First Block Thread : " + Thread.currentThread().getName());
return i;
}).map(i-> {
System.out.println("Second Block Thread : " + Thread.currentThread().getName());
return i;
});

flux.subscribe();
Thread.sleep(1000);

So, above example after publishOn all the operation runs on same thread until we see another publishOn, when we see publishOn method is encountered it changes the context and run in the separate thread (based on the schedulers mentioned).

SubscribeOn Example :

Flux<Integer> flux = Flux.range(0, 2)
.subscribeOn(Schedulers.boundedElastic())
.map(i -> {
System.out.println("First Block Thread : " + Thread.currentThread().getName());
return i;
}).
subscribeOn(Schedulers.single())
.map(i-> {
System.out.println("Second Block Thread : " + Thread.currentThread().getName());
return i;
});

flux.subscribe();
Thread.sleep(1000);

Above Example, Even though we have defined multiple subscribeOn methods ( one is boundedElastic and another is single scheduler) only the first subscribeOn method which is close to source takes precedence.

Note : We can have multiple publishOn methods which will keep switching the context. However, the subscribeOn method can not do like that. Only the very first subscribeOn method which is close to the source takes precedence.

What is schedulers and their types?

A Scheduler is an abstraction that gives the user control over threading. There are mainly four types of it or you can choose to write your own scheduler as well.

  1. Schedulers.single() — A single reusable thread for all the callers.
  2. Schedulers.immediate() — To keep the execution in the current thread.
  3. Schedulers.boundedElastic() — same as elastic, but its thread pool size (10* number of cpu cores), it is good choice for IO operations/ Non blocking call.
  4. Schedulers.parallel() — is good for CPU-intensive but short-lived tasks. It can execute N such tasks in parallel (by default N == number of CPUs)

Schedulers.elastic() — This is a thread pool with unlimited threads which is no longer preferred.

Important References :

  1. https://spring.io/blog/2016/06/07/notes-on-reactive-programming-part-i-the-reactive-landscape
  2. https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html
  3. https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers

--

--