Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions blibli-backend-framework-reactor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,50 @@ blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.maximum-pool-si
blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.queue-type=linked
blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.queue-size=100
blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.allow-core-thread-time-out=true
```

## Reactive Wrapper

Sometimes we want to need to use non-reactive library in our application, like Spring Data, or SDK Client.
This module product Reactive Wrapper, it's library to split scheduler usage between reactive code and non-reactive code.

```properties
blibli.backend.reactor.wrapper.configs.WRAPPER_NAME.subscriber=NEW_PARALLEL
blibli.backend.reactor.wrapper.configs.WRAPPER_NAME.publisher=NEW_SINGLE
```

You also can set default scheduler for reactive wrapper, so if for example no scheduler is found, it will use default scheduler

```properties
blibli.backend.reactor.wrapper.default-subscriber=IMMEDIATE
blibli.backend.reactor.wrapper.default-publisher=IMMEDIATE
```

To use reactive wrapper, you can use `ReactiveWrapper` interface.

```java
class ReactiveWrapperTest {

@Autowired
private SchedulerHelper schedulerHelper;

@Autowired
private ReactiveWrapperHelper reactiveWrapperHelper;

private ReactiveWrapper reactiveWrapperRepository;

private ReactiveWrapper reactiveWrapperNotFound;

@BeforeEach
void setUp() {
reactiveWrapperRepository = reactiveWrapperHelper.of("REPOSITORY");
reactiveWrapperNotFound = reactiveWrapperHelper.of("WRAPPER_NAME");
}

@Test
void testMono() {
Mono.fromCallable(() -> "Eko")
.flatMap(value -> reactiveWrapperRepository.mono(value::toUpperCase)) // reactive wrapper will executed in different scheduler
.subscribeOn(schedulerHelper.of("YOUR_SCHEDULER"));
}
```
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package com.blibli.oss.backend.reactor;

import com.blibli.oss.backend.reactor.factory.ReactiveWrapperHelperFactoryBean;
import com.blibli.oss.backend.reactor.factory.SchedulerHelperFactoryBean;
import com.blibli.oss.backend.reactor.properties.ReactiveWrapperProperties;
import com.blibli.oss.backend.reactor.properties.SchedulerProperties;
import com.blibli.oss.backend.reactor.scheduler.SchedulerHelper;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableConfigurationProperties({
SchedulerProperties.class
SchedulerProperties.class,
ReactiveWrapperProperties.class
})
public class ReactorAutoConfiguration {

Expand All @@ -19,4 +23,13 @@ public SchedulerHelperFactoryBean schedulerHelper(SchedulerProperties schedulerP
return factoryBean;
}

@Bean
public ReactiveWrapperHelperFactoryBean reactiveWrapperHelper(SchedulerHelper schedulerHelper,
ReactiveWrapperProperties reactiveWrapperProperties) {
ReactiveWrapperHelperFactoryBean factoryBean = new ReactiveWrapperHelperFactoryBean();
factoryBean.setSchedulerHelper(schedulerHelper);
factoryBean.setProperties(reactiveWrapperProperties);
return factoryBean;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.blibli.oss.backend.reactor.factory;

import com.blibli.oss.backend.reactor.properties.ReactiveWrapperProperties;
import com.blibli.oss.backend.reactor.scheduler.SchedulerHelper;
import com.blibli.oss.backend.reactor.wrapper.ReactiveWrapper;
import com.blibli.oss.backend.reactor.wrapper.ReactiveWrapperHelper;
import lombok.AllArgsConstructor;
import lombok.Setter;
import org.springframework.beans.factory.FactoryBean;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Stream;

public class ReactiveWrapperHelperFactoryBean implements FactoryBean<ReactiveWrapperHelper> {

@Setter
private SchedulerHelper schedulerHelper;

@Setter
private ReactiveWrapperProperties properties;

@Override
public ReactiveWrapperHelper getObject() throws Exception {
Map<String, ReactiveWrapper> map = new HashMap<>();
properties.getConfigs().forEach((key, item) -> {
map.put(key, new ReactiveWrapperImpl(schedulerHelper, item.getSubscriber(), item.getPublisher()));
});

ReactiveWrapperImpl defaultReactiveWrapper = new ReactiveWrapperImpl(schedulerHelper, properties.getDefaultSubscriber(), properties.getDefaultPublisher());
return new ReactiveWrapperHelperImpl(map, defaultReactiveWrapper);
}

@Override
public Class<?> getObjectType() {
return ReactiveWrapperHelper.class;
}

@AllArgsConstructor
private static class ReactiveWrapperHelperImpl implements ReactiveWrapperHelper {

private Map<String, ReactiveWrapper> map;

private ReactiveWrapper defaultWrapper;

@Override
public ReactiveWrapper of(String name) {
return map.getOrDefault(name, defaultWrapper);
}
}

@AllArgsConstructor
private static class ReactiveWrapperImpl implements ReactiveWrapper {

private SchedulerHelper schedulerHelper;

private String subscriber;

private String publisher;

@Override
public <T> Mono<T> mono(Supplier<T> supplier) {
return Mono.fromSupplier(supplier)
.doOnNext(t -> System.out.println("THREAD " + Thread.currentThread()))
.subscribeOn(schedulerHelper.of(subscriber))
.publishOn(schedulerHelper.of(publisher));
}

@Override
public <T> Flux<T> flux(Supplier<Stream<? extends T>> supplier) {
return Flux.fromStream(supplier)
.doOnNext(t -> System.out.println("THREAD " + Thread.currentThread()))
.subscribeOn(schedulerHelper.of(subscriber))
.publishOn(schedulerHelper.of(publisher));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.blibli.oss.backend.reactor.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

import java.util.HashMap;
import java.util.Map;

@Data
@ConfigurationProperties("blibli.backend.reactor.wrapper")
public class ReactiveWrapperProperties {

private String defaultSubscriber;

private String defaultPublisher;

private Map<String, ReactiveWrapperItemProperties> configs = new HashMap<>();

@Data
public static class ReactiveWrapperItemProperties {

private String subscriber;

private String publisher;

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.blibli.oss.backend.reactor.wrapper;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.function.Supplier;
import java.util.stream.Stream;

public interface ReactiveWrapper {

<T> Mono<T> mono(Supplier<T> supplier);

<T> Flux<T> flux(Supplier<Stream<? extends T>> supplier);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.blibli.oss.backend.reactor.wrapper;

public interface ReactiveWrapperHelper {

ReactiveWrapper of(String name);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.blibli.oss.backend.reactor.wrapper;

import com.blibli.oss.backend.reactor.scheduler.SchedulerHelper;
import com.blibli.oss.backend.reactor.scheduler.SchedulerTest;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.stream.Stream;

@ExtendWith(SpringExtension.class)
@SpringBootTest(classes = SchedulerTest.Application.class)
class ReactiveWrapperTest {

@Autowired
private SchedulerHelper schedulerHelper;

@Autowired
private ReactiveWrapperHelper reactiveWrapperHelper;

private ReactiveWrapper reactiveWrapperRepository;

private ReactiveWrapper reactiveWrapperNotFound;

@BeforeEach
void setUp() {
reactiveWrapperRepository = reactiveWrapperHelper.of("REPOSITORY");
reactiveWrapperNotFound = reactiveWrapperHelper.of("NOTFOUND");
}

@Test
void testMono() {
Mono.fromCallable(() -> "Eko")
.doOnNext(value -> System.out.println(value + "FIRST:" + Thread.currentThread()))
.flatMap(value -> reactiveWrapperRepository.mono(value::toUpperCase))
.doOnNext(value -> System.out.println(value + "THIRD:" + Thread.currentThread()))
.subscribeOn(schedulerHelper.of("NEW_SINGLE"))
.block();
}

@Test
void testFlux() {
Flux.just("Eko", "Kurniawan", "Khannedy")
.doOnNext(value -> System.out.println(value + "FIRST:" + Thread.currentThread()))
.flatMap(value -> reactiveWrapperRepository.flux(() -> Stream.of(value.toUpperCase())))
.doOnNext(value -> System.out.println(value + "THIRD:" + Thread.currentThread()))
.subscribeOn(schedulerHelper.of("NEW_SINGLE"))
.collectList()
.block();
}

@Test
void testFluxSchedulerNotFound() {
Flux.just("Eko", "Kurniawan", "Khannedy")
.doOnNext(value -> System.out.println(value + "FIRST:" + Thread.currentThread()))
.flatMap(value -> reactiveWrapperNotFound.flux(() -> Stream.of(value.toUpperCase())))
.doOnNext(value -> System.out.println(value + "THIRD:" + Thread.currentThread()))
.subscribeOn(schedulerHelper.of("NOTFOUND"))
.collectList()
.block();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.core-pool-size=
blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.maximum-pool-size=1000
blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.queue-type=linked
blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.queue-size=100
blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.allow-core-thread-time-out=true
blibli.backend.reactor.scheduler.configs.THREAD_POOL.thread-pool.allow-core-thread-time-out=true

blibli.backend.reactor.wrapper.default-subscriber=IMMEDIATE
blibli.backend.reactor.wrapper.default-publisher=IMMEDIATE
blibli.backend.reactor.wrapper.configs.REPOSITORY.subscriber=NEW_PARALLEL
blibli.backend.reactor.wrapper.configs.REPOSITORY.publisher=NEW_SINGLE