Las llamadas asíncronas suponen una muy buena ventaja con respecto a realizar ejecuciones en el main thread alias (blocking call, blocking api, BlockingIO etc), nos permite de cierto modo ser más eficientes, hacer cosas en paralelo a la vez, sin esperar por esas operaciones que toman tiempo en finalizar, hasta la experiencia de usuario mejora drásticamente.
La jdk de java nos permite crear ambientes asíncronos, con clases como:
Algunas clases para acceso concurrente by Doug Lea
|
Esta ultima clase es una de mis preferidas, es como un híbrido entre
Runnable y Callable, pero mejor, y no Callable sino Supplier, sin descartar también el uso de CountDownLatch, CyclicBarrier, Phaser
, para un modo más enficiente y no usar el vulgar join o get.
Una muy breve historia
Muchísimas aplicaciones hoy en dia son no bloqueantes, muchas, Netflix
es una de ellas, que incluso inicialmente estuvo con el diseño del
protocolo RSocket un protocolo
binario que según fue diseñado para ofrecer un ambiente más reactivo,
superior al http, viendo la necesidad de añadir mas asincronismo, sin
descartar a Facebook que también lo usa, Salesforce que tiene una
implementacion reactiva del framework
gRPC
para operar con otras tecnologías que soporten los streams reactivos, la
Azure SDKs
usa de manera nativa Project Reactor
para crear
clientes reactivos.
Especificación de los streams reactivos
Desde el año 2015 varias empresas se juntaron para lograr crear una especificación, empresas como Pivotal, Netflix, Lightbend(né “Typesafe”) y la Eclipse Foundation, para extraer interfaces comunes que representaran el ambiente asíncrono.
Dicha especificación define 4 interfaces que hacen la verdadera magia
-
Publisher<T>
el que produce los datos, ella no produciria más datos a los que el suscriptor necesite. -
Subscriber<T>
el que consume esos datos, una de las mas importantes, esta usa el backpressure para solicitar más datos al productor, controla la velocidad de ese procesamiento. -
Processor<T,R>
esta es como un puente que extiende al Publisher y Subscriber, es decir es productor y consumidor. -
Subscription
esta nos da ese enlace entre productor y consumidor/suscriptor.
Backpressure
Aquí es donde hablamos un poco sobre el push-based
que estan por
defecto en el iterator y Stream de la jdk, que en realidad seria un
issue en operaciones tipo Input/Ouput, necesitamos ese push-back
,
como un consumidor de los datos producidos de manera asíncrona, no
tenemos idea 🤔, como o cuanta cantidad de datos está en el pipeline. No
sabemos si en el próximo callback leeremos un byte o un terabyte, cuando
usamos un InputStream definimos un buffer con un tamaño dado, un limite,
sabemos su tamaño para leer lo que podemos, en el mundo asíncrono
necesitamos comunicarnos con el productor de dichos datos para saber que
cantidad estaríamos dispuestos a manejar…
🤔 pero falta algo, necesitamos, queremos algo que comodamente, nos de
ese push-back
ese mecanismo o flow control, que en la programacíon
reactiva es la habilidad que tiene el cliente de señalar, de emitir esa
señal de cuanto trabajo puede manejar.
La suscripción permite al suscriptor solicitar más datos cuando esté listo para procesar esos datos, este consumo regulado de datos es control de flujo flow control, En el mundo de la programación reactiva, a veces el flow control se denomina, quizás tanto como una cuestión de marketing como cualquier otra cosa, backpressure.
Las colas, los distintos tipos de colas estilo
-
Unbounded queue
-
Bounded drop queue
-
Bounded blocking queue
Colas que estan basadas en push model puro y duro, eso trae como
consecuencias muchos efectos secundarios, bloqueantes y más, muchas de
ellas, una vez que el productor alcanza el limete de la cola, esta se
bloqueara hasta que el consumidor drene un elemento y espacio libre en
la cola para que este disponible, en caso de consumidores lentos el
rendimiento se decrementa drasticamente, negando el comportamiento
asíncrono
, dichas tecnicas niegan el eficiente uso de recursos.
Ninguno de esos casos son aceptables en el manifiesto reactivo de ser resiliente, responsivo, elástico. Los métodos en project reactor como block, blockLast… se pueden incluir aquí como antipatrones, los cuales debemos evitar. |
En terminos generales ese push model de esas colas, viola, el manifiesto reactivo, en donde menciona la importancia del mecanismo que permite a un sistema responder a la carga, en otras palabras la necesidad de un mecanismo como backpressure.
- Reactor Reference
Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high.
Un simple 🎲 ? síncrono? o asíncrono? o reactivo ?
Usaremos aquí una simple app, que tira un 🎲 unas cuannntas veces, de
ambas maneras tanto síncrona como asíncrona, y reactiva, donde se
apreciará una sutil diferencia, y esa diferencia es tannn especial 🤙🏿
usaremos algunas de las clases antes nombradas, lanzandolo millones de
veces y contaremos la frecuencia
de repeticiones, repeticiones de
cada cara, de la 1 hasta la 6.
Vamos al start.vaadin.com para
descargarnos nuestra app base, la haremos con Vaadin Flow 14, este
framework nos permite crear una app con java, sin tocar javascript, y de
manera muy rápida, sin pasar por esa curva larga de front-end, a la
misma vez, que añade esa capa de abstracción dura, al lado del servidor,
generalmente estamos programando del lado del servidor server-side
,
podemos también hacerlo del lado del cliente client-side
con
typescript este seria otro tema.
Project reactor
En nuestro pom.xml necesitamos la dependencia de project reactor
este es un projecto open-source idenpendiente de pivotal pero
patrocinado principalmente por ellos, que nos permitira crear nuestras
apis de manera reactiva por medio de un DSL bastante elegante y mas
facil de leer, sin callbacks, sin Futures.
<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.2</version>
</dependency>
Versión síncrona, valida también para ejecución asíncrona
@Service
public class SyncRandomNumbers {
private static final SecureRandom SECURE_RANDOM = new SecureRandom();
private static final int RANDOM_NUMBER_ORIGIN = 1;
private static final int RANDOM_NUMBER_BOUND = 7;
/**
*
* @param size of stream
* @return Map<Integer,Long>
*/
public Map<Integer, Long> syncFrencuency(Long size) { (1)
return SECURE_RANDOM.ints(size, RANDOM_NUMBER_ORIGIN, RANDOM_NUMBER_BOUND)
.boxed()
(2)
.collect(Collectors.groupingBy(e -> e, Collectors.counting()));
}
}
1 | Al llamar a syncFrencuency bloquearemos de seguro el main thread. |
2 | Esta linea también podriamos hacerla digamos mas síncrona con este
método, habilitando acceso atomico.
|
.collect(Collectors.groupingByConcurrent(e -> e, Collectors.counting()));
Versión reactiva
/**
* @author rubn
*/
@Service
public class ReactiveRandomNumbers {
private static final SecureRandom SECURE_RANDOM = new SecureRandom();
private static final int RANDOM_NUMBER_ORIGIN = 1;
private static final int RANDOM_NUMBER_BOUND = 7;
/**
*
* @param size of stream
* @return Mono<Map<Integer,Long>>
*/
public Mono<Map<Integer, Long>> monoFrecuency(Long size) {
(1)
return Mono.just(SECURE_RANDOM.ints(size, RANDOM_NUMBER_ORIGIN, RANDOM_NUMBER_BOUND)
.boxed()
.collect(Collectors.groupingBy(e -> e, Collectors.counting())));
}
}
1 | Se aprecia que el bloque de código bloqueante lo metemos dentro del
Mono.just , un Mono.justOrEmpty o lo mísmo a decir un wrapper porque necesitamos un
Publisher. |
Con Vaadin necesitamos tener en nuestra MainView.class
una
instancia del la UI que la obtenemos con getUI()
preferiblemente,
a parte, necesitamos la anotación @Push
para habilitar los
websockets, es decir comunicación bidireccional, de cliente a servidor y
viceversa.
@Override
protected void onAttach(AttachEvent attachEvent) { (1)
super.onAttach(attachEvent);
this.getUI().ifPresent(ui -> { (2)
this.initReactiveFrecuency(ui);(3)
this.initWithCompletableFuture(ui);
});
}
1 | Las llamadas que ejecutemos desde otro hilo lo correcto con Vaadin es
sobreescribir el siguiente método, onAttach(AttachEvent attachEvent)
para pasar la instancia de la UI a los métodos necesarios, este es
heredado de la clase abstracta Component la mas alta de la
jerarquía, en dicha vista que no es la MainView sino otra, extendemos
del componente VerticalLayout hija de Component se encarga de
añadir componentes verticalmente. |
2 | Para que dicha anotación sea usada correctamente desde un
Thread , necesitamos invocar al método access(() -> )) este ayuda
a evitar deadlocks entre otros… gracias a la referencia de la ui |
3 | Este método y el de abajo, invocaran al método access |
Observando comportamientos bloqueantes y no bloqueantes
- Combo síncrono
Este es el método que ejecuta el servicio síncrono, es una ejecución normal común y corriente, esta se ejecutara en el main-thread.
private void initSyncFrecuency() {
syncComboBox.addValueChangeListener(event -> {
this.execute(event.getValue().getSize(),
e -> syncRandomNumbers.syncFrencuency(event.getValue().getSize()));
});
}
log en consola
2022-04-04 01:41:58.086 INFO 89645 --- [io-8088-exec-10] c.s.s.views.helloworld.SyncVsAsync : Result map: {1=1665157, 2=1667816, 3=1665566, 4=1666292, 5=1666392, 6=1668777}
2022-04-04 01:41:58.086 INFO 89645 --- [io-8088-exec-10] c.s.s.views.helloworld.SyncVsAsync : Daemon: true
2022-04-04 01:41:58.086 INFO 89645 --- [io-8088-exec-10] c.s.s.views.helloworld.SyncVsAsync : Thread name: http-nio-8088-exec-10
2022-04-04 01:41:58.086 INFO 89645 --- [io-8088-exec-10] c.s.s.views.helloworld.SyncVsAsync : ThreadGroup: java.lang.ThreadGroup[name=main,maxpri=10]
2022-04-04 01:41:58.086 INFO 89645 --- [io-8088-exec-10] c.s.s.views.helloworld.SyncVsAsync : Thread state: RUNNABLE
Aquí se aprecia su comportamiento, con la barra azul en la parte superior (Otros frameworks indican eso de otras maneras, pero Vaadin lo hace así), que estamos bloqueando el Main Thread
, por lo tanto, esa operación bloqueante debe terminar para usar la aplicación u otros componentes de la misma, funciona, pero se puede mejorar muchísimo mas.
- Combo asíncrono
private void initWithCompletableFuture(final UI ui) {
asyncComboBoxWithCompletableFuture.addValueChangeListener(event -> {
(1)
CompletableFuture.supplyAsync(() -> this.syncRandomNumbers.syncFrencuency(event.getValue().getSize()))
.whenCompleteAsync((map, error) -> { (2)
ui.access(() -> { (3)
log.info("Result map: {}", map); (4)
(5)
log.info("Daemon: {}", Thread.currentThread().isDaemon());
log.info("Thread name: {}", Thread.currentThread().getName());
(6)
log.info("ThreadGroup: {}", Thread.currentThread().getThreadGroup());
this.execute(event.getValue().getSize(), e -> map);
});
});
});
}
1 | Aqui la situación mejora mucho, porque tenemos dicha ejecución en otro
Thread no bloqueante, ForkJoinPool por defecto con el supplyAsync internamente el hilo engendrado aqui es de máxima prioridad, también un daemon , pero muchas veces no es asi, dando como resultado que el main-thread no espere que su ejecucion termine por eso muchos se ven en la necesidad de usar join() para obligar finalizar la operación. |
2 | Recalcando también que no usamos métodos bloqueantes como get o join. Sino whenCompleteAsync, existen mas variantes, como thenApply, thenApplyAsync, thenAccept etc. |
3 | El método access para notificar al cliente, obligatorio. |
4 | Hacemos loggin del resultado del map. |
5 | Mostramos cierta info del ForkJoinPool y el Thread en ese momento, el log de abajo muestra ciertas cosas. |
log en consola
2022-04-04 01:11:38.262 INFO 82217 --- [nPool-worker-17] c.s.s.views.helloworld.SyncVsAsync : Result map: {1=1665475, 2=1667095, 3=1665609, 4=1667555, 5=1666059, 6=1668207}
2022-04-04 01:11:38.262 INFO 82217 --- [nPool-worker-17] c.s.s.views.helloworld.SyncVsAsync : Daemon: true
(1)
2022-04-04 01:11:38.262 INFO 82217 --- [nPool-worker-17] c.s.s.views.helloworld.SyncVsAsync : Thread name: ForkJoinPool.commonPool-worker-17
(2)
2022-04-04 01:11:38.262 INFO 82217 --- [nPool-worker-17] c.s.s.views.helloworld.SyncVsAsync : ThreadGroup: java.lang.ThreadGroup[name=main,maxpri=10]
1 | ForkJoinPool.commonPool-worker-17 el numero 17 nos servira para rastrearlo a la hora de hacer un thread-dump justo al darle click al combo, con la VisualVM |
2 | Este hilo tiene la prioridad máxima de 10 Thread.MAX_PRIORITY, lo que indica que el main-thread espera a que esta operación termine, dado que muchas veces no es así. |
- Combo reactivo
private void initReactiveFrecuency(final UI ui) {
reactiveComboBox.addValueChangeListener(event -> {
(1)
Mono.defer(() -> this.reactiveRandomNumbers.monoFrecuency(event.getValue().getSize()))
.subscribeOn(Schedulers.boundedElastic()) (2)
.subscribe(subscribeMap -> { (3)
ui.access(() -> { (4)
this.execute(event.getValue().getSize(), e -> subscribeMap);
});
});
});
}
1 | El Operador defer necesario para hacer este código lazy y revaluar el contenido del lambda, cada vez que exista un nuevo subscriptor. |
2 | Permite aislar esta secuencia desde el inicio en un Scheduler proporcionado, creando un pool de hilos que está sometido a la demanda de datos, para ejecuciones largas y es una alternativa para tareas bloqueantes donde el número de ellas e hilos es limitada. |
3 | Nos suscribimos/consumimos esos datos. |
4 | El método access dentro del método subscribe. |
Log en consola
2022-04-04 01:33:04.866 INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync : Thread name doOnNext(): boundedElastic-1
2022-04-04 01:33:04.866 INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync : Thread name doOnNext(): boundedElastic-1
2022-04-04 01:33:04.867 INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync : Result map: {1=1666276, 2=1667401, 3=1666503, 4=1665615, 5=1667927, 6=1666278}
2022-04-04 01:33:04.867 INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync : Daemon: true
2022-04-04 01:33:04.867 INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync : Thread name: boundedElastic-1
2022-04-04 01:33:04.867 INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync : ThreadGroup: java.lang.ThreadGroup[name=main,maxpri=10]
2022-04-04 01:33:04.867 INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync : Thread state: RUNNABLE
2022-04-04 01:33:04.867 INFO 89645 --- [oundedElastic-1] c.s.s.views.helloworld.SyncVsAsync : Thread name subscribe(): boundedElastic-1
Y aquí es con project reactor
necesitamos invocar el método
defer(() -> )
también un Scheduler para no bloquearnos destacando
que dependiendo el operador/método que se llame se establece
backpressure por defecto con un buffer con tamaño fixeado como por
ejemplo:
reactor.util.concurrent.Queues.XS_BUFFER_SIZE
reactor.util.concurrent.Queues.SMALL_BUFFER_SIZE
las VM properties siguientes se usan para incrementar el performance de
la JVM junto con setear un garbage collector tipo -XX:+UseParallelGC
|
El resultado es muy parecido al GIF anterior con
CompletableFuture
, solo que aquí es un entorno controlado y más fácil,
en un caso real, no podremos editar el código original del servicio
ReactiveRandomNumbers, y tocaría convertir nuestra llamada bloqueante(
o ese impostor reactivo) en un Publisher para introducirlo en nuestro
defer del método initReactiveFrecuency.
Además de la ventaja de aplazar/diferir el publisher en otro Scheduler,
es que si esa llamada por alguna razon contiene algun tipo de codigo
bloqueante( un Thread.sleep() por ejemplo
) la situacion podria
volverse extraña si tenemos un endpoint que lo invoque, porque nos dara una excepción
detectada por ejemplo en la especificación servlet 3.1
En casos así, seria muy útil usar a BlockHound
un agente para detectar llamadas bloqueantes para los Schedulers de project reactor ( por defecto en el Schedulers.paralell() ) e internamente esta construida sobre byte-buddy.
Y cuál sera mejor de esos paradigmas ?
Por mi parte tengo muchas razaones para ir por un modelo reactivo sin problemas, mejor rendimiento a la larga, mejor administración del hardware por asi decirlo, en cierto punto es más facil hacer cosas complejas concurrentes, que reactor facilita, sin eso, tendríamos que usar Java un poco mas pura, aunque podria ser mas pura aún.
Usar un modelo síncrono a día de hoy? mmm para que ? ya esta apis concurrentes vienen trabajando desde hace muchos años dando un buen soporte y calidad validada, project reactor su primer release fue en 2011, quien se imagina cargando Instagram, Facebook, NetFlix jajaj de manera sincrona y todo bloqueado ? uffff que ineficiencia y perdedera de tiempo tan abismal…
Paul Bakker
Donde comenta que aún tienen código síncrono en su arquitectura excepto Zuul.
We’ve moved away from most RX code actually. There are exceptions (e.g Zuul), but most of Netflix runs on blocking style APIs. That doesn’t mean it doesn’t work, but it does mean its not necessarily the best tool for every job.
— Paul Bakker (@pbakker) May 8, 2022
PoC en Oracle adios heroku
A veces toca esperar para ver la app, heroku le hace idle(duerme) las apps cuando no se usan.
Ahora usando Oracle cloud ☠ |