Question about where I/O and subscribe are run #1793
Closed
hishamnajemrel
started this conversation in
General
Replies: 1 comment
-
|
See https://quarkus.io/guides/quarkus-reactive-architecture (and also, please format your markdown, especially code blocks) |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
This is all within the context of a Quarkus application.
Say I have a synchronous DB call:
public List getEnabledChecks(String orgId) {
return list("organizationReference.organizationId = ?1 and isEnabled = true", orgId).stream()
.map(CheckConfigurationEntity::getName)
.toList();
}
I make it async by doing:
public Uni<List> getEnabledChecksAsync(String orgId) {
var future = CompletableFuture.supplyAsync(() -> getEnabledChecks(orgId), managedExecutor);
return Uni.createFrom().completionStage(future);
}
@Inject
@ManagedExecutorConfig(propagated = ThreadContext.CDI)
ManagedExecutor managedExecutor;
Now, in a different CDI controlled class that is @ApplicationScoped, I consume Messages and depending on the message, I do some functionality: both checkConfigurationService.getEnabledChecksAsync and handleDefaultStatesForNewCar are async and run on a managedexecutor
private void processCarEvent(PortalEvent event) {
switch (event.eventType()) {
case CREATED -> {
checkConfigurationService
.getEnabledChecksAsync(event.organizationId())
.onItem().transformToUni(checks -> {
List<Uni> saveOperations = checks.stream()
.map(checkName ->
checkStateManager.handleDefaultStatesForNewCar(
event.organizationId(),
checkName,
CheckEnabledStateTransition.DISABLED_TO_ENABLED)
)
.toList();
return Uni.combine().all().unis(saveOperations).discardItems();
})
.emitOn(managedExecutor)
.subscribe().with(
result -> {
LOG.debug("Car default states added for CAR CREATED event");
},
failure -> {
// Log or handle error
LOG.error("Failed to process CAR CREATED event: {}", failure.getMessage());
}
);
}
Beta Was this translation helpful? Give feedback.
All reactions