Vamos a ver como usar redisson como estrategia de lock distribuido para varias instancias que tienen un sección crítca, como escritura en un fichero, además alguno que otro tip para evitar bloquear el main-thread (el origen del mal)…
Lock
Con java tenemos muchas clases para aplicar algún tipo de lock, incluso la más simple e infame synchronize
típo
synchonize(this) {
//sección critica
}
Yo prefiero algo que desbloque el main thread más rápido sobre todo para escritura el StampeLock de esta manera tampoco es complicado, a diferencia de usar un AtomicReference, aunque muchos dicen lo inverso, también ReentrantReadWriteLock tiene su parecido al Stamped.
Esta versión de codigo es vieja, ya finalizando se hace uso del método onSpinWait |
El código actual y que hace
Este proyecto stream-function-samples-consumer-producer inspirando en un trabajo previo sobre dlq de Oleg Zhurakousky
Tenemos un productor que con configuración de ThreadPoolTaskExecutor via configuración se ajustaran el número de threads para el productor (ProducerTaskExecutor-N, la N es el numero del Thread) , al momento de enviar con el método send
de spring cloud stream añadimos el timestamp en ese momento con System.currentTimeMillis
dicho mensaje llegara a una cola llamada performance-queue, luego el mensaje se procesara por la cantidad de consumidores que tengamos, estos calcularán el tiempo total de envío, y lo escribiran en formado HH:mm;latency en un fichero que esta en el NAS.
private void sendMessage(final String input, final long delay, final long docCount, String message) {
log.info("Enviando mensaje con delay de {} ms para docCount {}", delay, docCount);
MessageDto messageDto = new MessageDto();
messageDto.setMessage(message);
for (int index = 0; index < docCount; index++) {
System.out.println("Uppercasing " + input + " " + Thread.currentThread().getName());
// var stamped = STAMPED_LOCK.writeLock();
//
// try {
Message<MessageDto> messageToSend = MessageBuilder.withPayload(messageDto)
.setHeader("timeStamp", System.currentTimeMillis())
.build();
this.streamBridge.send(PERFORMANCE_QUEUE, messageToSend);
if (input.equals("fail")) {
System.out.println("throwing exception");
throw new RuntimeException("Itentional");
}
// } finally {
// STAMPED_LOCK.unlockWrite(stamped); (1)
// }
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
1 | Hay que tener precaución aquí, al hacer send y aplicar un lock, en la versión 3 de spring cloud stream, que es la que tengo en el ejemplo,
si hacemos locking evitara crear channels en el productor, el comportamiendo ideal, es si tenemos un número de Threads en el productor se creara un channel por Thread. |
justamente aqui tenemos por prueba una instancia, un solo canal/channel, un solo Thread por parte del producer. |
Aquí ya se aprecia que cuando aumentamos los Threads se creas mas conexiones. |
El consumidor no se le aplica un Thread/ThreadPool explicito como tal, sino que por medio de la configuración del SCS(spring cloud stream) aumentamos la concurrencia, por ejemplo:
spring:
cloud:
function:
definition: consumer
stream:
default-binder: rabbit
rabbit:
bindings:
consumer-in-0:
consumer:
prefetch: 1
binders:
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: ${URL_BROKER}
port: 5672
username: ${USER}
password: ${PASSWORD}
bindings:
consumer-in-0: # Canal de entrada (Consumidor)
destination: performance-queue
group: my-consumer-group
consumer:
concurrency: 5 # Número de consumidores concurrentes (1)
prefetch: 1
1 | Aquí incrementamos la concurrencia como tal, y en la UI de Rabbit se verán la cantidad de consumidores, claramente tenemos 5, pero si tenemos 3 réplicas, 15 que se van a crear, como en la imagen siguiente: |
Para crear las instancias con este Bastillefile será un pelin más rápido, para un entorno de de developers con java.
PKG -y bash zip unzip curl git nano tmux
# jails.conf
CONFIG set allow.mlock;
CONFIG set allow.raw_sockets;
SYSRC defaultrouter="192.168.1.1"
# Install zsh
PKG zsh
CMD chsh -s /usr/local/bin/zsh root
PKG ohmyzsh
CMD cp /usr/local/share/ohmyzsh/templates/zshrc.zsh-template ~/.zshrc
# zsh-autosuggestions
PKG powerline-fonts zsh-autosuggestions
CMD echo 'source /usr/local/share/zsh-autosuggestions/zsh-autosuggestions.zsh' >> ~/.zshrc
# Install openjdk
PKG openjdk11 openjdk17 openjdk21
# Install sdkman
CMD curl -s "https://get.sdkman.io" | bash
CMD /usr/local/bin/zsh -c "echo 'export SDKMAN_DIR="$HOME/.sdkman"' >> ~/.zshrc"
CMD /usr/local/bin/zsh -c "echo '[[ -s "$HOME/.sdkman/bin/sdkman-init.sh" ]] && source "$HOME/.sdkman/bin/sdkman-init.sh"' >> ~/.zshrc"
# Copy available openjdk to candidates and then remove it
CMD mkdir -p ~/.sdkman/candidates/java
CMD cp -r /usr/local/openjdk11 ~/.sdkman/candidates/java/11
CMD cp -r /usr/local/openjdk17 ~/.sdkman/candidates/java/17
CMD cp -r /usr/local/openjdk21 ~/.sdkman/candidates/java/21
# Create symbolic links before removing packages
CMD ln -s ~/.sdkman/candidates/java/11/bin/java /usr/local/openjdk11/java
CMD ln -s ~/.sdkman/candidates/java/11/bin/javac /usr/local/openjdk11/javac
CMD ln -s ~/.sdkman/candidates/java/17/bin/java /usr/local/openjdk17/java
CMD ln -s ~/.sdkman/candidates/java/17/bin/javac /usr/local/openjdk17/javac
CMD ln -s ~/.sdkman/candidates/java/21/bin/java /usr/local/openjdk21/java
CMD ln -s ~/.sdkman/candidates/java/21/bin/javac /usr/local/openjdk21/javac
CMD pkg remove -y openjdk11 openjdk17 openjdk21
CMD /usr/local/bin/zsh -c "source \$HOME/.sdkman/bin/sdkman-init.sh && sdk install maven"
CMD /usr/local/bin/zsh -c "source \$HOME/.sdkman/bin/sdkman-init.sh && sdk install jbang"
CMD /usr/local/bin/zsh -c "source \$HOME/.zshrc"
# Set default java version to 21
CMD /usr/local/bin/zsh -c "source \$HOME/.sdkman/bin/sdkman-init.sh && sdk default java 21"
CMD /usr/local/bin/zsh -c "rehash"
tmux en nuetra ayuda por usar muchas ventanas
|
El problema
3 instancias escribiendo en un fichero una latencia(puede ser cualquier cosa), por ejemplo, pero cuando se realiza la escritura, algo pasa entre líneas como:
Estas fueron las latencias iniciales, ya no 😂 |
Por más que cada instancia tenga su lock propio, igualmente podrían interfieren en cierto punto como los casos de arriba, que no se si considerarla un race-condition (no creo este caso), pero igualmente se produce ese side efect, el método write del BuffereWriter contienen internamente en la jdk11 un synchronize propio como este en la clase Writer:
public void write(String str, int off, int len) throws IOException {
synchronized (lock) {
char cbuf[];
if (len <= WRITE_BUFFER_SIZE) {
if (writeBuffer == null) {
writeBuffer = new char[WRITE_BUFFER_SIZE];
}
cbuf = writeBuffer;
} else { // Don't permanently allocate very large buffers.
cbuf = new char[len];
}
str.getChars(off, (off + len), cbuf, 0);
write(cbuf, 0, len);
}
}
Instalando Servidor Redis
Redis tiene un cliente llamado Redisson, para manipular/abstraer todas las estructuras relacionadas con redis, una de ellas es el uso del RLock que nos servira para probar en este caso y poder remediar nuestra issue de concurrencia a la hora de escribir
Con este Bastillefile, lo instalamos
# Install redis
PKG redis
SYSRC redis_enable="YES"
CONFIG set allow.mlock;
CONFIG set allow.raw_sockets;
SYSRC defaultrouter="192.168.1.1"
CMD redis-cli ping (1)
1 | Esto debe responder PONG en la consola |
Vamos editar el fichero redis.conf
que esta en la ruta /usr/local/etc/redis.conf
bind * -::*
requirepass ultrapassword
service redis restart (1)
1 | Si que fue necesario reiniciar. |
Config para Redisson
la config que vamos usar en el consumer es la siguiente:
@Configuration
public class ReddisonClientConfiguration {
@Bean
public RedissonClient redissonClient(RedissonCredentialsConfiguration credentialsConfiguration) {
Config config = new Config();
config.useSingleServer().setAddress(credentialsConfiguration.getUrl());
config.useSingleServer().setPassword(credentialsConfiguration.getPassword());
return Redisson.create(config);
}
}
Ahora con el RLock para la sección critica practicamente la mísma historia que con el StampedLock pinta un poco mejor la cosa: |
private void writeLine(BufferedWriter writer, long latency) throws IOException {
final RLock lock = redissonClient.getLock("logFileLock");
lock.lock();
try {
//HH:mm;latency
final String line = FORMATER.format(LocalTime.now()) + ";" + latency;
writer.write(line);
writer.newLine();
} finally {
lock.unlock();
}
}
Ese video fue enviando 100 m/s mensajes por segundo
|
{
"values": [
{
"dato": 1,
"doc_count": 6000
},
{
"dato": 2,
"doc_count": 6000
},
{
"dato": 3,
"doc_count": 6000
},
{
"dato": 4,
"doc_count": 6000
},
{
"dato": 5,
"doc_count": 6000
},
{
"dato": 6,
"doc_count": 6000
},
{
"dato": 7,
"doc_count": 6000
},
{
"dato": 8,
"doc_count": 6000
},
{
"dato": 9,
"doc_count": 6000
},
{
"dato": 10,
"doc_count": 6000
}
]
}
No se aprecia solape, o líneas raras en el ficherito .txt nada mal creo yo, también que si la cosa se complica mucho, cada instancia puede escribir en su propio fichero.
Evitando contencíon en el main-thread
Bien, esta parte también es lo más dura de la prueba, al introducir threads y de paso que se nos ocurra añadir un Thread.sleep(), y luego peor aún, esperar buen o regular rendimiento y más en una app low latency
regulera.
El código anterior, teniamos Thread.sleep() justo en el hilo principal para ajustar la tasa de mensajes, cada thread pausado por eso, colocandolo en estado de sleeping a running y viceversa, estos estados como park, running, wait, monitor, podemos verlos más comodamente con la JVisualVM, la Java-Mision-Control muy recomendada igualmente.
La idea es ajustar nuestra app, visualizando su comportamiento en pocas palabras en tiempo real, y mirar como se comportan los threads, el sleep() es bloqueante y simplemten añade contención, reduciendo el rendimiento throughput a la larga por ejemplo:
sendMessage con Thread.onSpinWait()
for (int index = 0; index < totalDocCountToProcess; index++) {
// Intentamos obtener el siguiente slot de tiempo disponible
// mantiene ritmo global entre Threads, sin context switching
long currentSlot;
long nextSlot;
do {
currentSlot = lastGlobalSendTime.get();
nextSlot = currentSlot + globalDelayPerMessage;
} while (!lastGlobalSendTime.compareAndSet(currentSlot, nextSlot)); //CAS
// Esperamos hasta que sea nuestro turno
while (System.nanoTime() < nextSlot) {
Thread.onSpinWait();
}
Message<MessageDto> messageToSend = MessageBuilder.withPayload(messageDto)
.setHeader("timestamp_ms", System.currentTimeMillis())
.build();
this.streamBridge.send(PERFORMANCE_QUEUE, messageToSend);
COUNTER.incrementAndGet();
}
Aquí se aprecian 3 Threads llamados RabbitProducerTaskExecutor-N
, y siempre estarán en running es lo ideal, los pequeños
delay son aplicados con onSpinWait y un AtomicLong, aumentando el rendimiento considerablemente, y haciendo que se envie
una tasa de mensajes/message rate, mas estable y precisa
sendMessage con Thread.sleep()
for (int index = 0; index < totalDocCountToProcess; index++) {
Message<MessageDto> messageToSend = MessageBuilder.withPayload(messageDto)
.setHeader("timestamp_ms", System.currentTimeMillis())
.build();
this.streamBridge.send(PERFORMANCE_QUEUE, messageToSend);
COUNTER.incrementAndGet();
try {
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(globalDelayPerMessage)); (1)
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
1 | Los nanos los pasamos a millis que es lo que necesita el sleep() |
Se aprecia que ahora los Threads estan más en sleeping que en running