Actualizando filas de grid con push y broadcaster singleton

En mongo existe una manera de suscribirnos a una colección, por lo tanto un cambio se nos refleja a nosotros, sin escanearla con un Scheduler como el que se mostrará este post.

reactiveMongoTemplate.changeStream(Book.class) (1)
    .watchCollection("coding-books") (2)
    .filter(where("bookName").is("clean code")) (3)
    .resumeAt(Instant.now()) (4)
    .listen(); (5)
1 Nuestra entidad
2 Nustra colección
3 Nuestro criterio, el campo bookName que sea clean code
4 Tiempo actual
5 Luego del operador listen, ya tendríamos nuestro publisher para usar.

Este es un pequeño ejemplo de como actualizar las filas de un grid haciendo push por medio de un broadcaster, este tiene el fin de actualizar todas las UI, en este caso me base en la versión Vaadin 8 de un proyecto y la migre a la versión 14, nada fuera de lo común, pero si es una idea de como actualizar unicamente un componente Vaadin sin recargar la pagina completa, puede ser el que queramos 🥰

La magia esta en esta clase, le hize refactorin para entenderla de una manera mas fácil añadiendo el Registration que nos ofrece Vaadin para estos casos, ella propaga los datos del grid en todas las sesiones de los usuarios, es decir visualizaran siempre el mismo grid aun sabiendo que si actualizamos la pestaña los datos se mantienen como estaban.

@Log4j2
public class Broadcaster {
    (1)
    private static final List<SerializableConsumer<List<String>>> listeners = new CopyOnWriteArrayList<>();
    (2)
    private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
    (3)
    public static void broadcast(List<String> updatedTransactionIds) {
        listeners.forEach(listener -> {
            executorService.execute(() -> listener.accept(updatedTransactionIds));
        });
        log.debug("Notified {} broadcast listeners", listeners.size());
    }
    (4)
    public static Registration register(final SerializableConsumer<List<String>> listener) {
        listeners.add(listener);
        return () -> listeners.remove(listener);
    }

}
1 La lista necesaria y concurrente para actualizar las UI cada unas de ellas es agregada aqui al abrir una tab/pestaña del navegador web.
2 Encargado de ejecutar las actualizaciones en un unico Thread distinto al principal.
3 Este método se ejecuta con el botón Init transactions! en la vista, GridPushOnRows.java parte de la magia, el recorre toda la lista y actualiza todas las filas.
4 En este método se añaden la lista de strings(datos de cada fila en realidad) a nuestra lista concurrente y remueve a los elementos si estan presentes.

Esta parte también hace de esa magia, no podemos olvidarla

@Override
protected void onAttach(AttachEvent attachEvent) {
    getUI().ifPresent(ui -> hour.initHour(ui,labelHour));
    getUI().ifPresent(ui -> memoryConsumtion.showMemory(ui,labelMemory));
    if (attachEvent.isInitialAttach()) {
        buttonStart.addClickListener(e -> {
            Broadcaster.broadcastMessage("Init transactions!");
            refreshDataTask.initUpdateGrid();
        });
        buttonStop.addClickListener(e -> {
            Broadcaster.broadcastMessage("Stop transactions!");
            refreshDataTask.stopUpdateGrid();
        });
        (1)
        this.broadcasterRegistration = Broadcaster.register(updatedTransactionIds -> {
            getUI().ifPresent(ui -> {
                ui.access(() -> { (2)
                    updatedTransactionIds.forEach(updateId -> {
                        final Transaction transaction = TransactionRepository.getInstance().find(updateId);
                        if (!transactionList.contains(transaction)) { (3)
                            transactionList.add(transaction);
                            grid.getDataProvider().refreshAll(); (4)
                            log.info("Update caption {}",transactionList.size());
                            preciousStones.setText("Precious stones: " + transactionList.size());
                        } else {
                            transactionListDataProvider.refreshItem(transaction); (5)
                        }
                    });
                });
            });
        });
        //Notifications for all UIs
        this.broadcasterMessage = Broadcaster.registerMessage(message -> { (6)
            getUI().ifPresent(ui -> {
                ui.access(() -> {
                    Notification.show(message);
                });
            });
        });
    }
}
1 invocamos el método register, por medio de una expresion lambda con el SerializableConsumer<List<String>> tenemos la funcionalidad como parametro, esta clase es propia de Vaadin que extiende a la interface Consumer y Serializable.
2 El método access(() -> )) dentro de el por estar en presencia de un background thread recorremos la lista updatedTransactionId.
3 Añadimos items sino estan al transactionList.
4 Refrescamos el grid completo con refreshAll() para que se vean los cambios pusheados por el server.
5 Si las transacciones/items existen se actualizan con el refreshItem().
6 Notificamos a todas las UI.

Una breve animación donde se aprecian dos pestañas, cada una de ellas puede representar a usuarios diferentes, compartiendo cada 2 segundos las mismas actualizaciones del grid sin recargar la pagina completa (solo el grid), y en caso de hacerlo no nos afectaria porque se mantienen los datos, un caso parecido hize aqui Arduino led web with Vaadin 8.

gridpushcells2


Versión con mongodb reactivo

He creado un nueva rama llamada mongo-push en la cual hacemos lo mismo pero usando mongodb versión reactiva, lo de reactiva no tiene mucho que ver, porque la ciencia es casi la misma, pero si que hay ajustar un par de cosas, desde el broadcaster, el onAttach() de la vista nueva etc.

El Broadcaster tiene nuevas cosas como el siguiente método que pasa al register una lista de libros, que usaremos en la vista nueva.

public static void broadcastForGridReactiveBooks(List<Book> books) {
    listenerBooks.forEach(book -> {
        executorService.execute(() -> book.accept(books));
    });
    log.debug("Notified {} broadcast listeners", listenerBooks.size());
}
@Override
protected void onAttach(AttachEvent attachEvent) { (1)
    //required for show time and memmory consumption
    super.onAttach(attachEvent);
    if (attachEvent.isInitialAttach()) {
        this.initData(attachEvent.getUI());
        this.buttonStart.addClickListener(e -> {
            refreshReactiveDataTask.initUpdateGrid("Init refresh items from database");
        });
        this.buttonStop.addClickListener(e -> {
            refreshReactiveDataTask.stopUpdateGrid("Stop refresh items from database");
        });
        /**
        * Cada vez que se abra una tab nueva, se obtienen los libros disponibles en DB
        * pero se refleja en todas las UI
        */
        this.registration = Broadcaster.registerReactiveBooks(booksList -> {
            attachEvent.getUI().access(() -> {
                labelGridCaption.setText("Documents: " + booksList.size());
                reactiveBookGrid.setItems(booksList); (2)
            });
            }, message -> {
            attachEvent.getUI().access(() -> {
                Notification.show(message);
            });
        });
    }
}
1 Nuestro onAttach
2 Ahora nuestro onAttach necesita el método setItems del grid de la siguiente manera esta con menos lineas que el anterior.

Ejecución de Test

Tenemos los siguientes datos simples BookReactiveRepoTest.

Database

Collection

test

books

Para el testing de stream reactivos con project reactor es mucha ayuda el uso de StepVerifier es compatible con otras implementaciones de la especificación del Manifiesto Reactivo, RxJava, AkkaStreams etc, en el método create, recibe un publisher(Flux o Mono estas implementan la interface Publisher) en realidad por eso su compatibilidad.

  • También logra internamente hacer la subscripción por nosotros y mucho más que eso.

  • Como testear los streams/cadenas reactivas que tengan un delay alto, para que terminen de inmediato, gracias a que si seteamos un tiempo virtual, hacemos que el clock del Scheduler se resetee a cero, bastante bien para los test de integración etc.

Test con tiempo virtual

@Test
@DisplayName("Cada item se emite con 700 milisegundos, unos 2800 milisegundos aprox.")
void virtualTime() {
    Flux<Integer> flux = Flux.just(1, 2, 3, 4)
            .delayElements(Duration.ofMillis(700)); (1)

    Flux<Integer> fluxDefer = Flux.defer(() -> Flux.just(1, 2, 3, 4)
            .delayElements(Duration.ofMillis(700))); (2)

    StepVerifier.withVirtualTime(() -> flux)
            .thenAwait(Duration.ofMillis(4 * 700)) (3)
            //.expectNextCount(4)
            .expectNext(1, 2, 3, 4)
            .verifyComplete();
}
1 Sin diferir, por lo tanto, no se va a reevaluar en el método withVirtualTime, y que pasaria? pues, que el test dure todo el tiempo que lo indique su delay.
2 Con defer, aqui el test terminaria de inmediato.
3 thenAwait método necesario para añadir el tiempo total, si añadimos menos tiempo, el test nunca terminara, lo mejor es el tiempo total, los 2800milisegundos en este caso.
void virtualTime2() {
    StepVerifier.withVirtualTime(() -> Flux.just(1, 2, 3, 4)
            .delayElements(Duration.ofMillis(700)))
            .thenAwait(Duration.ofMillis(4 * 700)) (1)
            //.expectNextCount(4)
            .expectNext(1, 2, 3, 4)
            .verifyComplete();
}
1 Lo que va dentro del supplier podriamos hacerlo también y es valido así, esto hace que también se reevalue la cadena reactiva, y termine de inmediato el test.

BookReactiveRepoTest

@Test
@DisplayName("Save some books of programming")
void save() {

    StepVerifier.create(reactiveBookService
            .saveAll(Flux.fromIterable(Arrays.asList(
                    new Book("", "Clean Code", "Rober C. Martin"),
                    new Book("", "BDD IN ACTION", "John Smart"),
                    new Book("", "El feliz abrazo de una madre mocha", "El cuñao"),
                    new Book("", "ITEXT IN ACTION", "Bruno Lowagie"),
                    new Book("", "SPRING BOOT IN ACTION", "Craig Walls"),
                    new Book("", "Kubernetes in Action", "Marko Luksa"),
                    new Book("", "La fuga de los caballos paraliticos", "El cuñao"),
                    new Book("", "Clean Architecture", "Rober C. Martin"))
                    )
            )
            .delayElements(Duration.ofSeconds(1)))
            .expectNextCount(8)
            .verifyComplete();

}
@Log4j2
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@DisplayName("<= Book Reactive Test with Mockito =>")
(1)
@TestPropertySource(properties = "spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.mongo.embedded.EmbeddedMongoAutoConfiguration")
class BookReactiveMockTest {

    @MockBean (2)
    private ReactiveBookService service;

    private Book book = new Book("1","Ruben de noche","Ruben0x52");
    private Book book2 = new Book("2","Diamantes","No c");

    @BeforeEach
    void setup() {
        Mono<Book> monoJustOrEmpty = Mono.just(book);
        Mockito.when(service.findById(anyString())).thenReturn(Mono.just(monoJustOrEmpty));
        Mockito.when(service.findByTitle("Ruben de noche")).thenReturn(Mono.just(book));
        Mockito.when(service.findByAuthor("Ruben0x52")).thenReturn(Mono.just(book));
        Mockito.when(service.findAll()).thenReturn(Flux.just(Arrays.asList(book, book2)));
    }

    @Test
    @DisplayName("Testing findById")
    void findById() {

        final Predicate<Book> predicate = autor -> autor.getId().equalsIgnoreCase("1");

        Mono<Book> mono = this.service.findById("1")
                .flatMap(bookMono -> Mono.just(book));

        StepVerifier.create(mono)
                .expectNextMatches(predicate)
                .verifyComplete();
   }

   @Test
   @DisplayName("Testing find by title")
   void findByTitle() {
       final Predicate<Book> predicate = autor -> autor.getTitle().equalsIgnoreCase("Ruben de noche");

       Mono<Book> mono = this.service.findByTitle("Ruben de noche");

       StepVerifier.create(mono)
               .expectNextMatches(predicate)
               .verifyComplete();
   }

    @Test
    @DisplayName("Testing find by author")
    void findByAuthor() {
        final Predicate<Book> predicate = autor -> autor.getAuthor().equalsIgnoreCase("Ruben0x52");

        Mono<Book> mono = this.service.findByAuthor("Ruben0x52");

        StepVerifier.create(mono)
                .expectNextMatches(predicate)
                .verifyComplete();
    }

    @Test
    @DisplayName("Testing find all books")
    void findByAll() {

        StepVerifier.create(this.service.findAll())
                .expectNextCount(1L)
                .verifyComplete();
    }

}
1 La anotación TestPropertySource inhabilita a la configuración de mongo en memoria y poder realizar la ejecución.
2 El uso de la anotación MockBean para nuestro servicio.

Para los test de integración me viene util bajar la version de flapdoodle a 2.2.0

Actualizando documento

dicha imagen muestra que si editamos en compass un documento sera reflejado en nuestro grid.

UpdateItemFromCompass

Nuevo documento

Aqui es casi lo mismo, pero duplicamos el documento, lo renombramos, y se verá nuestro grid con el nuevo item/row.

InsertNewBook