Síncronismo asíncronismo y algo de streams reactivos.

Las llamadas asíncronas suponen una muy buena ventaja con respecto a realizar ejecuciones en el main thread alias (blocking call, blocking api, BlockingIO etc), nos permite de cierto modo ser más eficientes, hacer cosas en paralelo a la vez, sin esperar por esas operaciones que toman tiempo en finalizar, hasta la experiencia de usuario mejora drásticamente.

La jdk de java nos permite crear ambientes asíncronos, con clases como:

java.lang.Runnable
java.lang.Thread
java.util.concurrent.Future
java.util.concurrent.Callable
java.util.concurrent.CompletableFuture

esta ultima clase es una de mis preferidas, es como un híbrido entre Runnable y Callable, pero mejor, y no Callable sino Supplier


Una muy breve historia

Muchísimas aplicaciones hoy en dia son no bloqueantes, muchas, Netflix es una de ellas, que incluso inicialmente estuvo con el diseño del protocolo Rsocket un protocolo que según fue diseñado para ofrecer un ambiente más reactivo, superior al http, viendo la necesidad de añadir mas asincronismo, sin descartar a Facebook que también lo usa, Saleforce que tiene una implementacion reactiva del gRPC para operar con otras tecnologías que soporten los streams reactivos, la Azure SDKs usa de manera nativa Project Reactor para crear clientes reactivos.


Especificación de los streams reactivos

Desde el año 2015 varias empresas se juntaron para lograr crear una especificación, empresas como Pivotal, Netflix, Lightbend(né “Typesafe”) y la Eclipse Foundation, para extraer interfaces comunes que representaran el ambiente asíncrono.

Dicha especificación define 4 interfaces que hacen la verdadera magia

  • Publisher<T> el que produce los datos, ella no produciria más datos a los que el subscriptor necesite.
  • Subscriber<T> el que consume esos datos, una de las mas importantes, esta usa el backpressure para solicitar más datos al productor, controla la velocidad de ese procesamiento.
  • Processor<T,R> esta es como un puente que implementa al Publisher y Subscriber, es decir es productor y consumidor.
  • Subscription esta nos da ese enlace entre productor y consumidor/subscriptor.

Backpressure

image

Aquí es donde hablamos un poco sobre el push-based que estan por defecto en el iterator y Stream de la jdk, que en realidad seria un issue en operaciones tipo Input/Ouput, necesitamos ese push-back, como un consumidor de los datos producidos de manera asíncrona, no tenemos idea 🤔, como o cuanta cantidad de datos está en el pipeline. No sabemos si en el próximo callback leeremos un byte o un terabyte, cuando usamos un InputStream definimos un buffer con un tamaño dado, un limite, sabemos su tamaño para leer lo que podemos, en el mundo asíncrono necesitamos comunicarnos con el productor de dichos datos para saber que cantidad estaríamos dispuestos a manejar…

🤔 pero falta algo, necesitamos, queremos algo que comodamente, nos de ese push-back ese mecanismo o flow control, que en la programacíon reactiva es la habilidad que tiene el cliente de señalar, de emitir esa señal de cuanto trabajo puede manejar.

La suscripción permite al suscriptor solicitar más datos cuando esté listo para procesar esos datos, este consumo regulado de datos es control de flujo flow control, En el mundo de la programación reactiva, a veces el flow control se denomina, quizás tanto como una cuestión de marketing como cualquier otra cosa, backpressure.

Las colas, los distintos tipos de colas estilo

  • Unbounded queue
  • Bounded drop queue
  • Bounded blocking queue

Colas que estan basadas en push model puro y duro, eso trae como consecuencias muchos efectos secundarios, bloqueantes y más, muchas de ellas, una vez que el productor alcanza el limete de la cola, esta se bloqueara hasta que el consumidor drene un elemento y espacio libre en la cola para que este disponible, en caso de consumidores lentos el rendimiento se decrementa drasticamente, negando el comportamiento asíncrono, dichas tecnicas niegan el eficiente uso de recursos.

Ninguno de esos casos son aceptables en el manifiesto reactivo de ser resilente, responsivo, elástico. Los métodos en project reactor como block, blockLast… se pueden incluir aquí como antipatrones, los cuales debemos evitar.

En terminos generales ese push model de esas colas, viola, el manifiesto reactivo, en donde menciona la importancia del mecanismo que permite a un sistema responder a la carga, en otras palabras la necesidad de un mecanismo como backpressure.

  • Reactor Reference

Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high.


Un simple 🎲 ? síncrono? o asíncrono? o reactivo ? cuál sera mejor?

Usaremos aquí una simple app, que tira un 🎲 unas cuannntas veces, de ambas maneras tanto síncrona como asíncrona, y reactiva, donde se apreciará una sutil diferencia, y esa diferencia es tannn especial 🤙🏿 usaremos algunas de las clases antes nombradas, lanzandolo millones de veces y contaremos la frecuencia de repeticiones, repeticiones de cada cara, de la 1 hasta la 6.

Vamos al start.vaadin.com para descargarnos nuestra app base, la haremos con Vaadin Flow 14, este framework nos permite crear una app con java, sin tocar javascript, y de manera muy rápida, sin pasar por esa curva larga de front-end, a la misma vez, que añade esa capa de abstracción dura, al lado del servidor, generalmente estamos programando del lado del servidor server-side, podemos también hacerlo del lado del cliente client-side con typescript este seria otro tema.

Project reactor

En nuestro pom.xml necesitamos la dependencia de project reactor este es un projecto open-source idenpendiente de pivotal pero patrocinado principalmente por ellos, que nos permitira crear nuestras apis de manera reactiva por medio de un DSL bastante elegante y mas facil de leer, sin callbacks, sin Futures.

image

 <!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.2</version>
</dependency>

Nuestra version síncrona del servicio es esta:

@Service
public class SyncRandomNumbers {

    private static final SecureRandom SECURE_RANDOM = new SecureRandom();
    private static final int RANDOM_NUMBER_ORIGIN = 1;
    private static final int RANDOM_NUMBER_BOUND = 7;

    /**
     *
     * @param size of stream
     * @return Map<Integer,Long>
     */
    public Map<Integer, Long> syncFrencuency(Long size) {
        return SECURE_RANDOM.ints(size, RANDOM_NUMBER_ORIGIN, RANDOM_NUMBER_BOUND)
                .boxed()
                .collect(Collectors.groupingBy(e -> e, Collectors.counting()));
    }

}

Esta linea también podriamos hacerla digamos mas síncrona con

  • groupingByConcurrent

ya que el método syncFrencuency estará dentro de un Thread

.collect(Collectors.groupingByConcurrent(e -> e, Collectors.counting()));

Versión reactiva, se aprecia que el bloque de código bloqueante lo metemos dentro del Flux.just

/**
 *
 */
@Service
public class AsyncRandomNumbersWithFlux {

    private static final SecureRandom SECURE_RANDOM = new SecureRandom();
    private static final int RANDOM_NUMBER_ORIGIN = 1;
    private static final int RANDOM_NUMBER_BOUND = 7;

    /**
     *
     * @param size of stream
     * @return Flux<Map<Integer,Long>>
     */
    public Flux<Map<Integer, Long>> fluxFrecuency(Long size) {
        return Flux.just(SECURE_RANDOM.ints(size, RANDOM_NUMBER_ORIGIN, RANDOM_NUMBER_BOUND)
                .boxed()
                .collect(Collectors.groupingBy(e -> e, Collectors.counting())));
    }

}

Con Vaadin necesitamos tener en nuestra MainView.class una instancia del la UI que la obtenemos con getUI() preferiblemente, a parte, necesitamos la anotación @Push para habilitar los websockets, es decir comunicación bidireccional, de cliente a servidor y viceversa, para que dicha anotación sea usada correctamente desde un Thread, necesitamos invocar al método access(() -> )) este ayuda a evitar deadlocks entre otros…

Las llamadas que ejecutemos desde otro hilo lo correcto con Vaadin es sobreescribir el siguiente método, onAttach(AttachEvent attachEvent) para pasar la instancia de la UI a los métodos necesarios, este es heredado de la clase abstracta Component la mas alta de la jerarquía, en dicha vista que no es la MainView sino otra, extendemos del componente VerticalLayout hija de Component se encarga de añadir componentes verticalmente.

@Override
protected void onAttach(AttachEvent attachEvent) {
    super.onAttach(attachEvent);
    this.getUI().ifPresent(ui -> {
        this.initReactiveFrecuency(ui);
        this.initWithCompletableFuture(ui);
    });
}

Mirando comportamientos

Este es el método que ejecuta el servicio síncrono

private void initSyncFrecuency() {
    syncComboBox.addValueChangeListener(event -> {
        this.execute(event.getValue().getSize(),
               e -> syncRandomNumbers.syncFrencuency(event.getValue().getSize()));
    });
}

Aquí se aprecia su comportamiento, con la barra azul indica, que estamos bloqueados en ese Main Thread, por lo tanto se debe terminar para usar la aplicación u otros componentes de la misma, funciona, pero se puede mejorar muchísimo mas.

image

Este es asíncrono con CompletableFuture

private void initWithCompletableFuture(final UI ui) {
    asyncComboBoxWithCompletableFuture.addValueChangeListener(event -> {
           CompletableFuture.supplyAsync(() -> this.syncRandomNumbers.syncFrencuency(event.getValue().getSize()))
                   .whenCompleteAsync((map, error) -> {
                       ui.access(() -> {
                           this.execute(event.getValue().getSize(), e -> map);
                       });
                   });
    });
}

Aqui la situación mejora mucho, porque tenemos dicha ejecución en otro Thread no bloqueante que es la idea al menos.

image

Y aquí es con project reactor necesitamos invocar el método
defer(() -> ) para no bloquearnos destacando que no hacemos backpressure directamente aquí, para eso necesitamos lo que llaman un operador, como buffer(int maxSize)

private void initReactiveFrecuency(final UI ui) {
    reactiveComboBox.addValueChangeListener(event -> {
        Flux.defer(() -> this.reactiveRandomNumbers.fluxFrecuency(event.getValue().getSize())) //1
            .subscribeOn(Schedulers.boundedElastic())                                          //2 
            .subscribe(subscribeMap -> {                                                      //3
                  ui.access(() -> {                                                            //4
                          this.execute(event.getValue().getSize(), e -> subscribeMap);
                  });
            });
    });
}
  • //1 El Operador defer necesario para hacer este código lazy y revaluar el contenido del lambda, cada vez que exista un nuevo subscriptor.
  • //2 Permite aislar esta secuencia desde el inicio en un Scheduler proporcionado, creando un pool de hilos que está sometido a la demanda de datos, para ejecuciones largas y es una alternativa para tareas bloqueantes donde el número de ellas e hilos es limitada.
  • //3 Nos subscribimos/consumimos esos datos.
  • //4 el método access dentro del método subscribe.

Y el resultado es el siguiente muy parecido al gif anterior.

image


Comments