diff --git a/blibli-backend-framework-reactor/README.md b/blibli-backend-framework-reactor/README.md index 7c6df13..0c4682d 100644 --- a/blibli-backend-framework-reactor/README.md +++ b/blibli-backend-framework-reactor/README.md @@ -111,4 +111,50 @@ blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.maximum-pool-si blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.queue-type=linked blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.queue-size=100 blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.allow-core-thread-time-out=true +``` + +## Reactive Wrapper + +Sometimes we want to need to use non-reactive library in our application, like Spring Data, or SDK Client. +This module product Reactive Wrapper, it's library to split scheduler usage between reactive code and non-reactive code. + +```properties +blibli.backend.reactor.wrapper.configs.WRAPPER_NAME.subscriber=NEW_PARALLEL +blibli.backend.reactor.wrapper.configs.WRAPPER_NAME.publisher=NEW_SINGLE +``` + +You also can set default scheduler for reactive wrapper, so if for example no scheduler is found, it will use default scheduler + +```properties +blibli.backend.reactor.wrapper.default-subscriber=IMMEDIATE +blibli.backend.reactor.wrapper.default-publisher=IMMEDIATE +``` + +To use reactive wrapper, you can use `ReactiveWrapper` interface. + +```java +class ReactiveWrapperTest { + + @Autowired + private SchedulerHelper schedulerHelper; + + @Autowired + private ReactiveWrapperHelper reactiveWrapperHelper; + + private ReactiveWrapper reactiveWrapperRepository; + + private ReactiveWrapper reactiveWrapperNotFound; + + @BeforeEach + void setUp() { + reactiveWrapperRepository = reactiveWrapperHelper.of("REPOSITORY"); + reactiveWrapperNotFound = reactiveWrapperHelper.of("WRAPPER_NAME"); + } + + @Test + void testMono() { + Mono.fromCallable(() -> "Eko") + .flatMap(value -> reactiveWrapperRepository.mono(value::toUpperCase)) // reactive wrapper will executed in different scheduler + .subscribeOn(schedulerHelper.of("YOUR_SCHEDULER")); + } ``` \ No newline at end of file diff --git a/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/ReactorAutoConfiguration.java b/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/ReactorAutoConfiguration.java index 4a1fe13..d9b66be 100644 --- a/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/ReactorAutoConfiguration.java +++ b/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/ReactorAutoConfiguration.java @@ -1,14 +1,18 @@ package com.blibli.oss.backend.reactor; +import com.blibli.oss.backend.reactor.factory.ReactiveWrapperHelperFactoryBean; import com.blibli.oss.backend.reactor.factory.SchedulerHelperFactoryBean; +import com.blibli.oss.backend.reactor.properties.ReactiveWrapperProperties; import com.blibli.oss.backend.reactor.properties.SchedulerProperties; +import com.blibli.oss.backend.reactor.scheduler.SchedulerHelper; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableConfigurationProperties({ - SchedulerProperties.class + SchedulerProperties.class, + ReactiveWrapperProperties.class }) public class ReactorAutoConfiguration { @@ -19,4 +23,13 @@ public SchedulerHelperFactoryBean schedulerHelper(SchedulerProperties schedulerP return factoryBean; } + @Bean + public ReactiveWrapperHelperFactoryBean reactiveWrapperHelper(SchedulerHelper schedulerHelper, + ReactiveWrapperProperties reactiveWrapperProperties) { + ReactiveWrapperHelperFactoryBean factoryBean = new ReactiveWrapperHelperFactoryBean(); + factoryBean.setSchedulerHelper(schedulerHelper); + factoryBean.setProperties(reactiveWrapperProperties); + return factoryBean; + } + } diff --git a/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/factory/ReactiveWrapperHelperFactoryBean.java b/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/factory/ReactiveWrapperHelperFactoryBean.java new file mode 100644 index 0000000..19acfb1 --- /dev/null +++ b/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/factory/ReactiveWrapperHelperFactoryBean.java @@ -0,0 +1,80 @@ +package com.blibli.oss.backend.reactor.factory; + +import com.blibli.oss.backend.reactor.properties.ReactiveWrapperProperties; +import com.blibli.oss.backend.reactor.scheduler.SchedulerHelper; +import com.blibli.oss.backend.reactor.wrapper.ReactiveWrapper; +import com.blibli.oss.backend.reactor.wrapper.ReactiveWrapperHelper; +import lombok.AllArgsConstructor; +import lombok.Setter; +import org.springframework.beans.factory.FactoryBean; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Stream; + +public class ReactiveWrapperHelperFactoryBean implements FactoryBean { + + @Setter + private SchedulerHelper schedulerHelper; + + @Setter + private ReactiveWrapperProperties properties; + + @Override + public ReactiveWrapperHelper getObject() throws Exception { + Map map = new HashMap<>(); + properties.getConfigs().forEach((key, item) -> { + map.put(key, new ReactiveWrapperImpl(schedulerHelper, item.getSubscriber(), item.getPublisher())); + }); + + ReactiveWrapperImpl defaultReactiveWrapper = new ReactiveWrapperImpl(schedulerHelper, properties.getDefaultSubscriber(), properties.getDefaultPublisher()); + return new ReactiveWrapperHelperImpl(map, defaultReactiveWrapper); + } + + @Override + public Class getObjectType() { + return ReactiveWrapperHelper.class; + } + + @AllArgsConstructor + private static class ReactiveWrapperHelperImpl implements ReactiveWrapperHelper { + + private Map map; + + private ReactiveWrapper defaultWrapper; + + @Override + public ReactiveWrapper of(String name) { + return map.getOrDefault(name, defaultWrapper); + } + } + + @AllArgsConstructor + private static class ReactiveWrapperImpl implements ReactiveWrapper { + + private SchedulerHelper schedulerHelper; + + private String subscriber; + + private String publisher; + + @Override + public Mono mono(Supplier supplier) { + return Mono.fromSupplier(supplier) + .doOnNext(t -> System.out.println("THREAD " + Thread.currentThread())) + .subscribeOn(schedulerHelper.of(subscriber)) + .publishOn(schedulerHelper.of(publisher)); + } + + @Override + public Flux flux(Supplier> supplier) { + return Flux.fromStream(supplier) + .doOnNext(t -> System.out.println("THREAD " + Thread.currentThread())) + .subscribeOn(schedulerHelper.of(subscriber)) + .publishOn(schedulerHelper.of(publisher)); + } + } +} diff --git a/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/properties/ReactiveWrapperProperties.java b/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/properties/ReactiveWrapperProperties.java new file mode 100644 index 0000000..3856e74 --- /dev/null +++ b/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/properties/ReactiveWrapperProperties.java @@ -0,0 +1,28 @@ +package com.blibli.oss.backend.reactor.properties; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +import java.util.HashMap; +import java.util.Map; + +@Data +@ConfigurationProperties("blibli.backend.reactor.wrapper") +public class ReactiveWrapperProperties { + + private String defaultSubscriber; + + private String defaultPublisher; + + private Map configs = new HashMap<>(); + + @Data + public static class ReactiveWrapperItemProperties { + + private String subscriber; + + private String publisher; + + } + +} diff --git a/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/wrapper/ReactiveWrapper.java b/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/wrapper/ReactiveWrapper.java new file mode 100644 index 0000000..dab3f17 --- /dev/null +++ b/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/wrapper/ReactiveWrapper.java @@ -0,0 +1,15 @@ +package com.blibli.oss.backend.reactor.wrapper; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.function.Supplier; +import java.util.stream.Stream; + +public interface ReactiveWrapper { + + Mono mono(Supplier supplier); + + Flux flux(Supplier> supplier); + +} diff --git a/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/wrapper/ReactiveWrapperHelper.java b/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/wrapper/ReactiveWrapperHelper.java new file mode 100644 index 0000000..68724cd --- /dev/null +++ b/blibli-backend-framework-reactor/src/main/java/com/blibli/oss/backend/reactor/wrapper/ReactiveWrapperHelper.java @@ -0,0 +1,7 @@ +package com.blibli.oss.backend.reactor.wrapper; + +public interface ReactiveWrapperHelper { + + ReactiveWrapper of(String name); + +} diff --git a/blibli-backend-framework-reactor/src/test/java/com/blibli/oss/backend/reactor/wrapper/ReactiveWrapperTest.java b/blibli-backend-framework-reactor/src/test/java/com/blibli/oss/backend/reactor/wrapper/ReactiveWrapperTest.java new file mode 100644 index 0000000..dbd0ce1 --- /dev/null +++ b/blibli-backend-framework-reactor/src/test/java/com/blibli/oss/backend/reactor/wrapper/ReactiveWrapperTest.java @@ -0,0 +1,68 @@ +package com.blibli.oss.backend.reactor.wrapper; + +import com.blibli.oss.backend.reactor.scheduler.SchedulerHelper; +import com.blibli.oss.backend.reactor.scheduler.SchedulerTest; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.stream.Stream; + +@ExtendWith(SpringExtension.class) +@SpringBootTest(classes = SchedulerTest.Application.class) +class ReactiveWrapperTest { + + @Autowired + private SchedulerHelper schedulerHelper; + + @Autowired + private ReactiveWrapperHelper reactiveWrapperHelper; + + private ReactiveWrapper reactiveWrapperRepository; + + private ReactiveWrapper reactiveWrapperNotFound; + + @BeforeEach + void setUp() { + reactiveWrapperRepository = reactiveWrapperHelper.of("REPOSITORY"); + reactiveWrapperNotFound = reactiveWrapperHelper.of("NOTFOUND"); + } + + @Test + void testMono() { + Mono.fromCallable(() -> "Eko") + .doOnNext(value -> System.out.println(value + "FIRST:" + Thread.currentThread())) + .flatMap(value -> reactiveWrapperRepository.mono(value::toUpperCase)) + .doOnNext(value -> System.out.println(value + "THIRD:" + Thread.currentThread())) + .subscribeOn(schedulerHelper.of("NEW_SINGLE")) + .block(); + } + + @Test + void testFlux() { + Flux.just("Eko", "Kurniawan", "Khannedy") + .doOnNext(value -> System.out.println(value + "FIRST:" + Thread.currentThread())) + .flatMap(value -> reactiveWrapperRepository.flux(() -> Stream.of(value.toUpperCase()))) + .doOnNext(value -> System.out.println(value + "THIRD:" + Thread.currentThread())) + .subscribeOn(schedulerHelper.of("NEW_SINGLE")) + .collectList() + .block(); + } + + @Test + void testFluxSchedulerNotFound() { + Flux.just("Eko", "Kurniawan", "Khannedy") + .doOnNext(value -> System.out.println(value + "FIRST:" + Thread.currentThread())) + .flatMap(value -> reactiveWrapperNotFound.flux(() -> Stream.of(value.toUpperCase()))) + .doOnNext(value -> System.out.println(value + "THIRD:" + Thread.currentThread())) + .subscribeOn(schedulerHelper.of("NOTFOUND")) + .collectList() + .block(); + } + +} \ No newline at end of file diff --git a/blibli-backend-framework-reactor/src/test/resources/application.properties b/blibli-backend-framework-reactor/src/test/resources/application.properties index a39d38c..289c29a 100644 --- a/blibli-backend-framework-reactor/src/test/resources/application.properties +++ b/blibli-backend-framework-reactor/src/test/resources/application.properties @@ -49,4 +49,9 @@ blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.core-pool-size= blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.maximum-pool-size=1000 blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.queue-type=linked blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.queue-size=100 -blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.allow-core-thread-time-out=true \ No newline at end of file +blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.allow-core-thread-time-out=true + +blibli.backend.reactor.wrapper.default-subscriber=IMMEDIATE +blibli.backend.reactor.wrapper.default-publisher=IMMEDIATE +blibli.backend.reactor.wrapper.configs.REPOSITORY.subscriber=NEW_PARALLEL +blibli.backend.reactor.wrapper.configs.REPOSITORY.publisher=NEW_SINGLE \ No newline at end of file