Sincronismo asincronismo y algo de streams reactivos

Las llamadas asíncronas suponen una muy buena ventaja con respecto a realizar ejecuciones en el main thread alias (blocking call, blocking api, BlockingIO etc), nos permite de cierto modo ser más eficientes, hacer cosas en paralelo a la vez, sin esperar por esas operaciones que toman tiempo en finalizar, hasta la experiencia de usuario mejora drásticamente.

La jdk de java nos permite crear ambientes asíncronos, con clases como:

Algunas clases para acceso concurrente by Doug Lea

  • java.lang.Runnable

  • java.lang.Thread

  • java.util.concurrent.Future

  • java.util.concurrent.Callable

  • java.util.concurrent.CompletableFuture

Esta ultima clase es una de mis preferidas, es como un híbrido entre Runnable y Callable, pero mejor, y no Callable sino Supplier, sin descartar también el uso de CountDownLatch, CyclicBarrier, Phaser, para un modo más enficiente y no usar el vulgar join o get.


Una muy breve historia

Muchísimas aplicaciones hoy en dia son no bloqueantes, muchas, Netflix es una de ellas, que incluso inicialmente estuvo con el diseño del protocolo RSocket un protocolo binario que según fue diseñado para ofrecer un ambiente más reactivo, superior al http, viendo la necesidad de añadir mas asincronismo, sin descartar a Facebook que también lo usa, Salesforce que tiene una implementacion reactiva del framework gRPC para operar con otras tecnologías que soporten los streams reactivos, la Azure SDKs usa de manera nativa Project Reactor para crear clientes reactivos.


Especificación de los streams reactivos

Desde el año 2015 varias empresas se juntaron para lograr crear una especificación, empresas como Pivotal, Netflix, Lightbend(né “Typesafe”) y la Eclipse Foundation, para extraer interfaces comunes que representaran el ambiente asíncrono.

Dicha especificación define 4 interfaces que hacen la verdadera magia

  • Publisher<T> el que produce los datos, ella no produciria más datos a los que el suscriptor necesite.

  • Subscriber<T> el que consume esos datos, una de las mas importantes, esta usa el backpressure para solicitar más datos al productor, controla la velocidad de ese procesamiento.

  • Processor<T,R> esta es como un puente que extiende al Publisher y Subscriber, es decir es productor y consumidor.

  • Subscription esta nos da ese enlace entre productor y consumidor/suscriptor.


Backpressure

backpressure

Aquí es donde hablamos un poco sobre el push-based que estan por defecto en el iterator y Stream de la jdk, que en realidad seria un issue en operaciones tipo Input/Ouput, necesitamos ese push-back, como un consumidor de los datos producidos de manera asíncrona, no tenemos idea 🤔, como o cuanta cantidad de datos está en el pipeline. No sabemos si en el próximo callback leeremos un byte o un terabyte, cuando usamos un InputStream definimos un buffer con un tamaño dado, un limite, sabemos su tamaño para leer lo que podemos, en el mundo asíncrono necesitamos comunicarnos con el productor de dichos datos para saber que cantidad estaríamos dispuestos a manejar…

🤔 pero falta algo, necesitamos, queremos algo que comodamente, nos de ese push-back ese mecanismo o flow control, que en la programacíon reactiva es la habilidad que tiene el cliente de señalar, de emitir esa señal de cuanto trabajo puede manejar.

La suscripción permite al suscriptor solicitar más datos cuando esté listo para procesar esos datos, este consumo regulado de datos es control de flujo flow control, En el mundo de la programación reactiva, a veces el flow control se denomina, quizás tanto como una cuestión de marketing como cualquier otra cosa, backpressure.

Las colas, los distintos tipos de colas estilo

  • Unbounded queue

  • Bounded drop queue

  • Bounded blocking queue

Colas que estan basadas en push model puro y duro, eso trae como consecuencias muchos efectos secundarios, bloqueantes y más, muchas de ellas, una vez que el productor alcanza el limete de la cola, esta se bloqueara hasta que el consumidor drene un elemento y espacio libre en la cola para que este disponible, en caso de consumidores lentos el rendimiento se decrementa drasticamente, negando el comportamiento asíncrono, dichas tecnicas niegan el eficiente uso de recursos.

Ninguno de esos casos son aceptables en el manifiesto reactivo de ser resiliente, responsivo, elástico. Los métodos en project reactor como block, blockLast… se pueden incluir aquí como antipatrones, los cuales debemos evitar.

En terminos generales ese push model de esas colas, viola, el manifiesto reactivo, en donde menciona la importancia del mecanismo que permite a un sistema responder a la carga, en otras palabras la necesidad de un mecanismo como backpressure.

- Reactor Reference

Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high.


Un simple 🎲 ? síncrono? o asíncrono? o reactivo ?

Usaremos aquí una simple app, que tira un 🎲 unas cuannntas veces, de ambas maneras tanto síncrona como asíncrona, y reactiva, donde se apreciará una sutil diferencia, y esa diferencia es tannn especial 🤙🏿 usaremos algunas de las clases antes nombradas, lanzandolo millones de veces y contaremos la frecuencia de repeticiones, repeticiones de cada cara, de la 1 hasta la 6.

Vamos al start.vaadin.com para descargarnos nuestra app base, la haremos con Vaadin Flow 14, este framework nos permite crear una app con java, sin tocar javascript, y de manera muy rápida, sin pasar por esa curva larga de front-end, a la misma vez, que añade esa capa de abstracción dura, al lado del servidor, generalmente estamos programando del lado del servidor server-side, podemos también hacerlo del lado del cliente client-side con typescript este seria otro tema.

Project reactor

En nuestro pom.xml necesitamos la dependencia de project reactor este es un projecto open-source idenpendiente de pivotal pero patrocinado principalmente por ellos, que nos permitira crear nuestras apis de manera reactiva por medio de un DSL bastante elegante y mas facil de leer, sin callbacks, sin Futures.

callbacksFutures

<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.2</version>
</dependency>

Versión síncrona, valida también para ejecución asíncrona

@Service
public class SyncRandomNumbers {

    private static final SecureRandom SECURE_RANDOM = new SecureRandom();
    private static final int RANDOM_NUMBER_ORIGIN = 1;
    private static final int RANDOM_NUMBER_BOUND = 7;

    /**
     *
     * @param size of stream
     * @return Map<Integer,Long>
     */
    public Map<Integer, Long> syncFrencuency(Long size) { (1)
        return SECURE_RANDOM.ints(size, RANDOM_NUMBER_ORIGIN, RANDOM_NUMBER_BOUND)
                .boxed()
                (2)
                .collect(Collectors.groupingBy(e -> e, Collectors.counting()));
    }

}
1 Al llamar a syncFrencuency bloquearemos de seguro el main thread.
2 Esta linea también podriamos hacerla digamos mas síncrona con este método, habilitando acceso atomico.
  • groupingByConcurrent ya que el método syncFrencuency estará dentro de un Thread

.collect(Collectors.groupingByConcurrent(e -> e, Collectors.counting()));

Versión reactiva

/**
* @author rubn
*/
@Service
public class ReactiveRandomNumbers {

  private static final SecureRandom SECURE_RANDOM = new SecureRandom();
  private static final int RANDOM_NUMBER_ORIGIN = 1;
  private static final int RANDOM_NUMBER_BOUND = 7;

  /**
   *
   * @param size of stream
   * @return Mono<Map<Integer,Long>>
   */
  public Mono<Map<Integer, Long>> monoFrecuency(Long size) {
                (1)
    return Mono.just(SECURE_RANDOM.ints(size, RANDOM_NUMBER_ORIGIN, RANDOM_NUMBER_BOUND)
            .boxed()
            .collect(Collectors.groupingBy(e -> e, Collectors.counting())));
  }

}
1 Se aprecia que el bloque de código bloqueante lo metemos dentro del Mono.just , un Mono.justOrEmpty o lo mísmo a decir un wrapper porque necesitamos un Publisher.

Con Vaadin necesitamos tener en nuestra MainView.class una instancia del la UI que la obtenemos con getUI() preferiblemente, a parte, necesitamos la anotación @Push para habilitar los websockets, es decir comunicación bidireccional, de cliente a servidor y viceversa.

@Override
protected void onAttach(AttachEvent attachEvent) { (1)
    super.onAttach(attachEvent);
    this.getUI().ifPresent(ui -> { (2)
        this.initReactiveFrecuency(ui);(3)
        this.initWithCompletableFuture(ui);
    });
}
1 Las llamadas que ejecutemos desde otro hilo lo correcto con Vaadin es sobreescribir el siguiente método, onAttach(AttachEvent attachEvent) para pasar la instancia de la UI a los métodos necesarios, este es heredado de la clase abstracta Component la mas alta de la jerarquía, en dicha vista que no es la MainView sino otra, extendemos del componente VerticalLayout hija de Component se encarga de añadir componentes verticalmente.
2 Para que dicha anotación sea usada correctamente desde un Thread, necesitamos invocar al método access(() -> )) este ayuda a evitar deadlocks entre otros…​ gracias a la referencia de la ui
3 Este método y el de abajo, invocaran al método access

Observando comportamientos bloqueantes y no bloqueantes

- Combo síncrono

sync

Este es el método que ejecuta el servicio síncrono, es una ejecución normal común y corriente, esta se ejecutara en el main-thread.

private void initSyncFrecuency() {
    syncComboBox.addValueChangeListener(event -> {
        this.execute(event.getValue().getSize(),
               e -> syncRandomNumbers.syncFrencuency(event.getValue().getSize()));
    });
}

log en consola

2022-04-04 01:41:58.086  INFO 89645 --- [io-8088-exec-10] c.s.s.views.helloworld.SyncVsAsync       : Result map: {1=1665157, 2=1667816, 3=1665566, 4=1666292, 5=1666392, 6=1668777}
2022-04-04 01:41:58.086  INFO 89645 --- [io-8088-exec-10] c.s.s.views.helloworld.SyncVsAsync       : Daemon: true
2022-04-04 01:41:58.086  INFO 89645 --- [io-8088-exec-10] c.s.s.views.helloworld.SyncVsAsync       : Thread name: http-nio-8088-exec-10
2022-04-04 01:41:58.086  INFO 89645 --- [io-8088-exec-10] c.s.s.views.helloworld.SyncVsAsync       : ThreadGroup: java.lang.ThreadGroup[name=main,maxpri=10]
2022-04-04 01:41:58.086  INFO 89645 --- [io-8088-exec-10] c.s.s.views.helloworld.SyncVsAsync       : Thread state: RUNNABLE

Aquí se aprecia su comportamiento, con la barra azul en la parte superior (Otros frameworks indican eso de otras maneras, pero Vaadin lo hace así), que estamos bloqueando el Main Thread, por lo tanto, esa operación bloqueante debe terminar para usar la aplicación u otros componentes de la misma, funciona, pero se puede mejorar muchísimo mas.


- Combo asíncrono

asyncCompletablefuture

private void initWithCompletableFuture(final UI ui) {
    asyncComboBoxWithCompletableFuture.addValueChangeListener(event -> {
                            (1)
        CompletableFuture.supplyAsync(() -> this.syncRandomNumbers.syncFrencuency(event.getValue().getSize()))
                .whenCompleteAsync((map, error) -> { (2)
                    ui.access(() -> { (3)
                        log.info("Result map: {}", map); (4)
                        (5)
                        log.info("Daemon: {}", Thread.currentThread().isDaemon());
                        log.info("Thread name: {}", Thread.currentThread().getName());
                        (6)
                        log.info("ThreadGroup: {}", Thread.currentThread().getThreadGroup());
                        this.execute(event.getValue().getSize(), e -> map);
                    });
                });
    });
}
1 Aqui la situación mejora mucho, porque tenemos dicha ejecución en otro Thread no bloqueante, ForkJoinPool por defecto con el supplyAsync internamente el hilo engendrado aqui es de máxima prioridad, también un daemon, pero muchas veces no es asi, dando como resultado que el main-thread no espere que su ejecucion termine por eso muchos se ven en la necesidad de usar join() para obligar finalizar la operación.
2 Recalcando también que no usamos métodos bloqueantes como get o join. Sino whenCompleteAsync, existen mas variantes, como thenApply, thenApplyAsync, thenAccept etc.
3 El método access para notificar al cliente, obligatorio.
4 Hacemos loggin del resultado del map.
5 Mostramos cierta info del ForkJoinPool y el Thread en ese momento, el log de abajo muestra ciertas cosas.

log en consola

2022-04-04 01:11:38.262  INFO 82217 --- [nPool-worker-17] c.s.s.views.helloworld.SyncVsAsync       : Result map: {1=1665475, 2=1667095, 3=1665609, 4=1667555, 5=1666059, 6=1668207}
2022-04-04 01:11:38.262  INFO 82217 --- [nPool-worker-17] c.s.s.views.helloworld.SyncVsAsync       : Daemon: true
(1)
2022-04-04 01:11:38.262  INFO 82217 --- [nPool-worker-17] c.s.s.views.helloworld.SyncVsAsync       : Thread name: ForkJoinPool.commonPool-worker-17
(2)
2022-04-04 01:11:38.262  INFO 82217 --- [nPool-worker-17] c.s.s.views.helloworld.SyncVsAsync       : ThreadGroup: java.lang.ThreadGroup[name=main,maxpri=10]
1 ForkJoinPool.commonPool-worker-17 el numero 17 nos servira para rastrearlo a la hora de hacer un thread-dump justo al darle click al combo, con la VisualVM
2 Este hilo tiene la prioridad máxima de 10 Thread.MAX_PRIORITY, lo que indica que el main-thread espera a que esta operación termine, dado que muchas veces no es así.

threadump combo completable future


- Combo reactivo

reactiveFrecuencyMono

private void initReactiveFrecuency(final UI ui) {
    reactiveComboBox.addValueChangeListener(event -> {
              (1)
        Mono.defer(() -> this.reactiveRandomNumbers.monoFrecuency(event.getValue().getSize()))
            .subscribeOn(Schedulers.boundedElastic()) (2)
            .subscribe(subscribeMap -> { (3)
                  ui.access(() -> { (4)
                      this.execute(event.getValue().getSize(), e -> subscribeMap);
                  });
            });
    });
}
1 El Operador defer necesario para hacer este código lazy y revaluar el contenido del lambda, cada vez que exista un nuevo subscriptor.
2 Permite aislar esta secuencia desde el inicio en un Scheduler proporcionado, creando un pool de hilos que está sometido a la demanda de datos, para ejecuciones largas y es una alternativa para tareas bloqueantes donde el número de ellas e hilos es limitada.
3 Nos suscribimos/consumimos esos datos.
4 El método access dentro del método subscribe.

Log en consola

2022-04-04 01:33:04.866  INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync       : Thread name doOnNext(): boundedElastic-1
2022-04-04 01:33:04.866  INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync       : Thread name doOnNext(): boundedElastic-1
2022-04-04 01:33:04.867  INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync       : Result map: {1=1666276, 2=1667401, 3=1666503, 4=1665615, 5=1667927, 6=1666278}
2022-04-04 01:33:04.867  INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync       : Daemon: true
2022-04-04 01:33:04.867  INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync       : Thread name: boundedElastic-1
2022-04-04 01:33:04.867  INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync       : ThreadGroup: java.lang.ThreadGroup[name=main,maxpri=10]
2022-04-04 01:33:04.867  INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync       : Thread state: RUNNABLE
2022-04-04 01:33:04.867  INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync       : Thread name subscribe(): boundedElastic-1

Y aquí es con project reactor necesitamos invocar el método defer(() -> ) también un Scheduler para no bloquearnos destacando que dependiendo el operador/método que se llame se establece backpressure por defecto con un buffer con tamaño fixeado como por ejemplo:

reactor.util.concurrent.Queues.XS_BUFFER_SIZE
reactor.util.concurrent.Queues.SMALL_BUFFER_SIZE

las VM properties siguientes se usan para incrementar el performance de la JVM junto con setear un garbage collector tipo -XX:+UseParallelGC

  • Dreactor.bufferSize.x=512 o 1024

  • Dreactor.bufferSize.small=2048

El resultado es muy parecido al GIF anterior con CompletableFuture, solo que aquí es un entorno controlado y más fácil, en un caso real, no podremos editar el código original del servicio ReactiveRandomNumbers, y tocaría convertir nuestra llamada bloqueante( o ese impostor reactivo) en un Publisher para introducirlo en nuestro defer del método initReactiveFrecuency.

Además de la ventaja de aplazar/diferir el publisher en otro Scheduler, es que si esa llamada por alguna razon contiene algun tipo de codigo bloqueante( un Thread.sleep() por ejemplo) la situacion podria volverse extraña si tenemos un endpoint que lo invoque, porque nos dara una excepción detectada por ejemplo en la especificación servlet 3.1

En casos así, seria muy útil usar a BlockHound un agente para detectar llamadas bloqueantes para los Schedulers de project reactor ( por defecto en el Schedulers.paralell() ) e internamente esta construida sobre byte-buddy.

Y cuál sera mejor de esos paradigmas ?

Por mi parte tengo muchas razaones para ir por un modelo reactivo sin problemas, mejor rendimiento a la larga, mejor administración del hardware por asi decirlo, en cierto punto es más facil hacer cosas complejas concurrentes, que reactor facilita, sin eso, tendríamos que usar Java un poco mas pura, aunque podria ser mas pura aún.

Usar un modelo síncrono a día de hoy? mmm para que ? ya esta apis concurrentes vienen trabajando desde hace muchos años dando un buen soporte y calidad validada, project reactor su primer release fue en 2011, quien se imagina cargando Instagram, Facebook, NetFlix jajaj de manera sincrona y todo bloqueado ? uffff que ineficiencia y perdedera de tiempo tan abismal…​

Un Tweet muy interesante de Paul Bakker donde comenta que aún tienen código síncrono en su arquitectura excepto Zuul.

paulBakker

PoC en heroku

A veces toca esperar para ver la app, heroku le hace idle(duerme) las apps cuando no se usan.

?app=poc sync async reactive&style=flat