Some locks

Let’s see how to use redisson as a distributed locking strategy for several instances that have a critical section, like writing to a file, plus some tips to avoid blocking the main-thread (the source of the evil)…​.

Lock

With java we have many classes to apply some kind of lock, even the simplest and infamous synchronize type.

synchonize(this) {
    //sección critica
}

I prefer something that unlocks the main thread faster especially for writing. StampeLock is also uncomplicated, unlike using a AtomicReference although many say the opposite, also ReentrantReadWriteLock has its resemblance to Stamped.

The actual code and what it does

This project stream-function-samples-consumer-producer inspired by previous work on dlq by Oleg Zhurakousky

environment rabbit test

We have a producer that with configuration of ThreadPoolTaskExecutor via configuration will adjust the number of threads for the producer. (ProducerTaskExecutor-N, la N is the number of the Thread), at the time of sending with the spring cloud stream we add the timestamp at that time with System.currentTimeMillis this message will arrive at a queue called performance-queue, then the message will be processed by the number of consumers we have, they will calculate the total sending time, and write it in HH:mm;latency format in a file that is in the NAS.

This is an old version of the code, and at the end of the code we use the method onSpinWait
private void sendMessage(final String input, final long delay, final long docCount, String message) {

    log.info("Enviando mensaje con delay de {} ms para docCount {}", delay, docCount);

    MessageDto messageDto = new MessageDto();
    messageDto.setMessage(message);

    for (int index = 0; index < docCount; index++) {
        System.out.println("Uppercasing " + input + " " + Thread.currentThread().getName());

//     var stamped = STAMPED_LOCK.writeLock();
//
//     try {
         Message<MessageDto> messageToSend = MessageBuilder.withPayload(messageDto)
                 .setHeader("timeStamp", System.currentTimeMillis())
                 .build();

         this.streamBridge.send(PERFORMANCE_QUEUE, messageToSend);

         if (input.equals("fail")) {
             System.out.println("throwing exception");
             throw new RuntimeException("Itentional");
         }
//     } finally {
//         STAMPED_LOCK.unlockWrite(stamped); (1)
//     }


    try {
        Thread.sleep(delay);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
1 Caution must be taken here, when doing send and apply a lock, in version 3 of spring cloud stream, which is the one I have in the example, if we do locking avoid creating channels in the producer, the ideal behavior is if we have a number of Threads in the producer will create a channel per Thread.
Here we have just one instance, one single channel, one single thread on the producer’s side.

un solo producer una instancia

Here we can already see that when we increase the number of threads, more connections are created.

9 channgels 9 producer threads

The consumer does not have an explicit ThreadThreadPool applied to it as such, but by means of the configuration of the SCS(spring cloud stream) we increase concurrency, for example:

spring:
  cloud:
    function:
      definition: consumer
    stream:
      default-binder: rabbit
      rabbit:
        bindings:
          consumer-in-0:
            consumer:
              prefetch: 1
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: ${URL_BROKER}
                port: 5672
                username: ${USER}
                password: ${PASSWORD}
      bindings:
        consumer-in-0:  # Canal de entrada (Consumidor)
          destination: performance-queue
          group: my-consumer-group
          consumer:
            concurrency: 5 # Número de consumidores concurrentes (1)
            prefetch: 1
1 Here we increase the concurrency as such, and in the Rabbit UI you will see the number of consumers, clearly we have 5, but if we have 3 replicas, 15 to be created, as in the following image:

consumers totales

To create the instances with this Bastillefile will be a little faster, for a java development environment.

PKG -y bash zip unzip curl git nano tmux

# jails.conf
CONFIG set allow.mlock;
CONFIG set allow.raw_sockets;
SYSRC defaultrouter="192.168.1.1"

# Install zsh
PKG zsh
CMD chsh -s /usr/local/bin/zsh root
PKG ohmyzsh
CMD cp /usr/local/share/ohmyzsh/templates/zshrc.zsh-template ~/.zshrc

# zsh-autosuggestions
PKG powerline-fonts zsh-autosuggestions
CMD echo 'source /usr/local/share/zsh-autosuggestions/zsh-autosuggestions.zsh' >> ~/.zshrc

# Install openjdk
PKG openjdk11 openjdk17 openjdk21

# Install sdkman
CMD curl -s "https://get.sdkman.io" | bash
CMD /usr/local/bin/zsh -c "echo 'export SDKMAN_DIR="$HOME/.sdkman"' >> ~/.zshrc"
CMD /usr/local/bin/zsh -c "echo '[[ -s "$HOME/.sdkman/bin/sdkman-init.sh" ]] && source "$HOME/.sdkman/bin/sdkman-init.sh"' >> ~/.zshrc"

# Copy available openjdk to candidates and then remove it
CMD mkdir -p ~/.sdkman/candidates/java

CMD cp -r /usr/local/openjdk11 ~/.sdkman/candidates/java/11
CMD cp -r /usr/local/openjdk17 ~/.sdkman/candidates/java/17
CMD cp -r /usr/local/openjdk21 ~/.sdkman/candidates/java/21

# Create symbolic links before removing packages
CMD ln -s ~/.sdkman/candidates/java/11/bin/java /usr/local/openjdk11/java
CMD ln -s ~/.sdkman/candidates/java/11/bin/javac /usr/local/openjdk11/javac
CMD ln -s ~/.sdkman/candidates/java/17/bin/java /usr/local/openjdk17/java
CMD ln -s ~/.sdkman/candidates/java/17/bin/javac /usr/local/openjdk17/javac
CMD ln -s ~/.sdkman/candidates/java/21/bin/java /usr/local/openjdk21/java
CMD ln -s ~/.sdkman/candidates/java/21/bin/javac /usr/local/openjdk21/javac

CMD pkg remove -y openjdk11 openjdk17 openjdk21

CMD /usr/local/bin/zsh -c "source \$HOME/.sdkman/bin/sdkman-init.sh && sdk install maven"
CMD /usr/local/bin/zsh -c "source \$HOME/.sdkman/bin/sdkman-init.sh && sdk install jbang"

CMD /usr/local/bin/zsh -c "source \$HOME/.zshrc"

# Set default java version to 21
CMD /usr/local/bin/zsh -c "source \$HOME/.sdkman/bin/sdkman-init.sh && sdk default java 21"
CMD /usr/local/bin/zsh -c "rehash"
tmux in our help for using many windows
  • Create a file in the home called tmux.conf

  • Add this set -g mouse on

  • Then, tmux source-file tmux.conf

  • At the end it will allow scrolling in each window.

The problem

3 instances writing to a file a latency (it can be anything), for example, but when the write is performed, something happens between lines like:

These were the initial latencies, no longer 😂
race 1

race1

race 2

race2

race 3

race3

race 4

race4

Even if each instance has its own lock, they could still interfere at a certain point as in the cases above, which I do not know whether to consider it a race-condition (I do not believe this case), but still this side effect occurs, the write method of the BuffereWriter contains internally in the jdk11 a synchronize itself as this in the Writer class:

public void write(String str, int off, int len) throws IOException {
    synchronized (lock) {
        char cbuf[];
        if (len <= WRITE_BUFFER_SIZE) {
            if (writeBuffer == null) {
                writeBuffer = new char[WRITE_BUFFER_SIZE];
            }
            cbuf = writeBuffer;
        } else {    // Don't permanently allocate very large buffers.
            cbuf = new char[len];
        }
        str.getChars(off, (off + len), cbuf, 0);
        write(cbuf, 0, len);
    }
}

Installing Redis Server

Logotype

Redis has a client called Redisson, to abstract all the structures related to redis, one of them is the use of RLock that we will use to test in this case and to be able to remedy our concurrency issue at the time of writing

Con este Bastillefile, lo instalamos

# Install redis
PKG redis
SYSRC redis_enable="YES"
CONFIG set allow.mlock;
CONFIG set allow.raw_sockets;
SYSRC defaultrouter="192.168.1.1"
CMD redis-cli ping (1)
1 This should answer PONG on the console

We are going to edit the redis.conf file that is in the path /usr/local/etc/redis.conf

bind * -::*
requirepass ultrapassword
service redis restart (1)
1 Yes, it was necessary to reboot.

Config para Redisson

16851431?s=200&v=4

The configuration we are going to use in the consumer is the following:

@Configuration
public class ReddisonClientConfiguration {

    @Bean
    public RedissonClient redissonClient(RedissonCredentialsConfiguration credentialsConfiguration) {
        Config config = new Config();
        config.useSingleServer().setAddress(credentialsConfiguration.getUrl());
        config.useSingleServer().setPassword(credentialsConfiguration.getPassword());
        return Redisson.create(config);
    }

}
Now with the RLock for the critical section, practically the same story as with the StampedLock, things are looking a little better:
private void writeLine(BufferedWriter writer, long latency) throws IOException {
    final RLock lock = redissonClient.getLock("logFileLock");
    lock.lock();
    try {
        //HH:mm;latency
        final String line = FORMATER.format(LocalTime.now()) + ";" + latency;
        writer.write(line);
        writer.newLine();
    } finally {
        lock.unlock();
    }

}
This video was sent 100 m/s messages per second
{
  "values": [
    {
      "dato": 1,
      "doc_count": 6000
    },
    {
      "dato": 2,
      "doc_count": 6000
    },
    {
      "dato": 3,
      "doc_count": 6000
    },
    {
      "dato": 4,
      "doc_count": 6000
    },
    {
      "dato": 5,
      "doc_count": 6000
    },
    {
      "dato": 6,
      "doc_count": 6000
    },
    {
      "dato": 7,
      "doc_count": 6000
    },
    {
      "dato": 8,
      "doc_count": 6000
    },
    {
      "dato": 9,
      "doc_count": 6000
    },
    {
      "dato": 10,
      "doc_count": 6000
    }

  ]
}

There is no overlap, or strange lines in the little .txt file, not bad I think, also if the thing gets too complicated, each instance can write in its own file.

Avoiding contention in the main-thread

Well, this part is also the hardest part of the test, introducing threads and adding a Thread.sleep(), and then even worse, expecting good or regular performance and even more in a low latency app.

The previous code, we had Thread.sleep() right in the main thread to adjust the message rate, each paused thread for that, placing it in sleeping state to running and vice versa, these states as park, running, wait, monitor, we can see them more comfortably with the JVisualVM, the Java-Mission-Control highly recommended as well.

The idea is to tune our app, visualizing its behavior in a few words in real time, and watch how the threads behave, the sleep() is blocking and simply adds contention, reducing the throughput performance in the long run for example: