Logo

dev-resources.site

for different kinds of informations.

How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?

Published at
7/26/2024
Categories
java
webflux
springboot
asynchronous
Author
subrata71
Author
9 person written this
subrata71
open
How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?

I'm trying to execute a method asynchronously within an existing reactive chain in my Project Reactor-based application. The method doUpdateLayoutInAsync is intended to perform a heavy background task, but it seems like my approach isn't working as expected. Here's my current implementation:

public Mono<Boolean> publishPackage(String branchedPackageId) {
    PackagePublishingMetaDTO publishingMetaDTO = new PackagePublishingMetaDTO();
    publishingMetaDTO.setPublishEvent(true);

    return packageRepository
            .findById(branchedPackageId, packagePermission.getPublishPermission())
            .switchIfEmpty(Mono.error(new AppsmithException(
                    AppsmithError.ACL_NO_RESOURCE_FOUND, FieldName.PACKAGE_ID, branchedPackageId)))
            .flatMap(originalPackage -> {
                String nextVersion = PackageUtils.getNextVersion(originalPackage.getVersion());

                Package packageToBePublished = constructPackageToBePublished(originalPackage);

                originalPackage.setVersion(nextVersion);
                originalPackage.setLastPublishedAt(packageToBePublished.getLastPublishedAt());
                publishingMetaDTO.setOriginPackageId(branchedPackageId);
                publishingMetaDTO.setWorkspaceId(originalPackage.getWorkspaceId());

                Mono<Void> unsetCurrentLatestMono = packageRepository.unsetLatestPackageByOriginId(originalPackage.getId(), null);
                Mono<Package> saveOriginalPackage = packageRepository.save(originalPackage);
                Mono<Package> savePackageToBePublished = packageRepository.save(packageToBePublished);

                return unsetCurrentLatestMono
                        .then(Mono.zip(saveOriginalPackage, savePackageToBePublished))
                        .flatMap(tuple2 -> {
                            Package publishedPackage = tuple2.getT2();
                            publishingMetaDTO.setPublishedPackage(publishedPackage);

                            return modulePackagePublishableService
                                    .publishEntities(publishingMetaDTO)
                                    .flatMap(publishedModules -> {
                                        if (publishedModules.isEmpty()) {
                                            return Mono.error(new AppsmithException(
                                                    AppsmithError.PACKAGE_CANNOT_BE_PUBLISHED,
                                                    originalPackage.getUnpublishedPackage().getName()));
                                        }
                                        return moduleInstancePackagePublishableService
                                                .publishEntities(publishingMetaDTO)
                                                .then(Mono.defer(() ->
                                                        newActionPackagePublishableService.publishEntities(publishingMetaDTO))
                                                        .then(Mono.defer(() ->
                                                                actionCollectionPackagePublishableService
                                                                        .publishEntities(publishingMetaDTO))));
                                    })
                                    .then(Mono.defer(() -> autoUpgradeService.handleAutoUpgrade(publishingMetaDTO)));
                        })
                        .as(transactionalOperator::transactional)
                        .then(Mono.defer(() -> doUpdateLayoutInAsync(publishingMetaDTO)));
            });
}

private Mono<Boolean> doUpdateLayoutInAsync(PackagePublishingMetaDTO publishingMetaDTO) {
    Mono<List<String>> updateLayoutsMono = Flux.fromIterable(publishingMetaDTO.getAutoUpgradedPageIds())
            .flatMap(pageId -> updateLayoutService
                    .updatePageLayoutsByPageId(pageId)
                    .onErrorResume(throwable -> {
                        log.warn("Update layout failed for pageId: {} with error: {}", pageId, throwable.getMessage());
                        return Mono.just(pageId);
                    }))
            .collectList();

    // Running the updateLayoutsMono task asynchronously
    updateLayoutsMono.subscribeOn(Schedulers.boundedElastic()).subscribe();

    return Mono.just(Boolean.TRUE);
}
Enter fullscreen mode Exit fullscreen mode

Issue: I want doUpdateLayoutInAsync to run in the background while the rest of the reactive chain completes. However, the method seems to execute synchronously, and the reactive chain does not continue as expected.

Question: How can I ensure that doUpdateLayoutInAsync runs asynchronously and does not block the continuation of the reactive chain?

webflux Article's
24 articles in total
Favicon
Introducing Java Library for Backend Microservice Webflux (Reactor-core)
Favicon
Making reactive applications with a Kitten Care Example
Favicon
Reactive Programming applied to Legacy Services β€” A WebFlux example
Favicon
Getting Started with Spring WebFlux
Favicon
Implementing Soft Delete in Spring WebFlux with R2DBC
Favicon
Java library for developing backend with reactive programming
Favicon
How to Run an Asynchronous Task in Spring WebFlux Without Blocking the Main Response?
Favicon
How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?
Favicon
Spring WebFlux Retry Mechanism
Favicon
Implementing Context Propagation in Reactive Programming Projects πŸ”„
Favicon
Ability to unlearn in Software: Reactive Programming
Favicon
Create DTO using get results from repository returns duplicate values in Spring Boot Reactive WebFlux
Favicon
Spring R2DBC, MariaDB, and JSON columns
Favicon
SpringBoot WebFlux Annotation-based RestAPIs
Favicon
SpringBoot WebFlux Functional RestAPIs
Favicon
Spring Webflux testing with Mockito
Favicon
A Short Example of Real-Time Event Streaming Using Spring WebFlux
Favicon
Global Error Handling In Webflux (Reactive World)
Favicon
Spring Webflux - Reactive Java Applications - Part 2
Favicon
Building an URL Shortening API with Spring WebFlux (and a lot of supporting cast)
Favicon
Spring Webflux - Reactive Java Applications - Part 1
Favicon
Spring Webflux - Aplicaçáes reativas em Java - Parte 1
Favicon
KVision v2.0.0 - with Bootstrap 4, Spring Webflux and lots of other improvements
Favicon
Sending Multipart Form Data Using Spring WebTestClient

Featured ones: