Skip to content

Commit a737fbb

Browse files
committed
Fix query jooq jdbc
1 parent 11b9eb6 commit a737fbb

File tree

2 files changed

+84
-53
lines changed

2 files changed

+84
-53
lines changed

thoth-jooq/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ dependencies {
3131
testImplementation("org.testcontainers:kafka:1.15.0")
3232
testImplementation("org.slf4j:slf4j-api:1.7.30")
3333
testImplementation("org.slf4j:slf4j-simple:1.7.30")
34+
35+
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
3436
}
3537

3638
test {

thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java

Lines changed: 82 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919
import io.vavr.control.Either;
2020
import io.vavr.control.Option;
2121
import io.vavr.control.Try;
22-
import org.jooq.Condition;
23-
import org.jooq.DSLContext;
24-
import org.jooq.Field;
25-
import org.jooq.SQLDialect;
22+
import org.jooq.*;
2623
import org.jooq.impl.DSL;
2724
import org.reactivestreams.Publisher;
2825
import org.slf4j.LoggerFactory;
@@ -78,23 +75,22 @@ public class PostgresEventStore<E extends Event, Meta, Context> implements Event
7875
private final JacksonSimpleFormat<Meta> metaFormat;
7976
private final JacksonSimpleFormat<Context> contextFormat;
8077
private final ObjectMapper objectMapper;
81-
private final static String SELECT_CLAUSE =
82-
"SELECT " +
83-
" id," +
84-
" entity_id," +
85-
" sequence_num," +
86-
" event_type," +
87-
" version," +
88-
" transaction_id," +
89-
" event," +
90-
" metadata," +
91-
" emission_date," +
92-
" user_id," +
93-
" system_id," +
94-
" total_message_in_transaction," +
95-
" num_message_in_transaction," +
96-
" context," +
97-
" published ";
78+
private final static String SELECT_FIELDS = " id," +
79+
" entity_id," +
80+
" sequence_num," +
81+
" event_type," +
82+
" version," +
83+
" transaction_id," +
84+
" event," +
85+
" metadata," +
86+
" emission_date," +
87+
" user_id," +
88+
" system_id," +
89+
" total_message_in_transaction," +
90+
" num_message_in_transaction," +
91+
" context," +
92+
" published ";
93+
private final static String SELECT_CLAUSE = "SELECT " + SELECT_FIELDS;
9894

9995
public PostgresEventStore(EventPublisher<E, Meta, Context> eventPublisher, DataSource dataSource, Executor executor, TableNames tableNames, JacksonEventFormat<?, E> eventFormat, JacksonSimpleFormat<Meta> metaFormat, JacksonSimpleFormat<Context> contextFormat) {
10096
this.dataSource = dataSource;
@@ -280,26 +276,43 @@ public CompletionStage<Long> lastPublishedSequence() {
280276

281277
@Override
282278
public Publisher<EventEnvelope<E, Meta, Context>> loadEventsUnpublished(Connection c, ConcurrentReplayStrategy concurrentReplayStrategy) {
283-
String tmpQuery = SELECT_CLAUSE +
284-
" FROM " + this.tableNames.tableName +
285-
" WHERE published = false " +
286-
" order by sequence_num ";
287-
String query;
279+
SelectSeekStep1<Record15<UUID, String, Long, String, Long, String, String, String, String, Integer, Integer, String, String, Timestamp, Boolean>, Long> tmpQuery = DSL.using(c)
280+
.select(
281+
ID,
282+
ENTITY_ID,
283+
SEQUENCE_NUM,
284+
EVENT_TYPE,
285+
VERSION,
286+
TRANSACTION_ID,
287+
EVENT,
288+
METADATA,
289+
CONTEXT,
290+
TOTAL_MESSAGE_IN_TRANSACTION,
291+
NUM_MESSAGE_IN_TRANSACTION,
292+
USER_ID,
293+
SYSTEM_ID,
294+
EMISSION_DATE,
295+
PUBLISHED
296+
)
297+
.from(this.tableNames.tableName)
298+
.where(PUBLISHED.eq(false))
299+
.orderBy(SEQUENCE_NUM);
300+
301+
SelectForStep<Record15<UUID, String, Long, String, Long, String, String, String, String, Integer, Integer, String, String, Timestamp, Boolean>> query;
288302
switch (concurrentReplayStrategy) {
289303
case WAIT:
290-
query = tmpQuery + " for update of " + this.tableNames.tableName;
304+
query = tmpQuery.forUpdate().of(table(this.tableNames.tableName));
291305
break;
292306
case SKIP:
293-
query = tmpQuery + " for update of " + this.tableNames.tableName + " skip locked ";
307+
query = tmpQuery.forUpdate().of(table(this.tableNames.tableName)).skipLocked();
294308
break;
295309
default:
296310
query = tmpQuery;
297311
}
298312

299-
return Flux.fromStream(() -> DSL.using(c)
300-
.resultQuery(query)
313+
return Flux.fromStream(() -> query
301314
.stream()
302-
.map(r -> rsToEnvelope(r.intoResultSet()))
315+
.map(r -> rsToEnvelope(r))
303316
);
304317
}
305318

@@ -324,14 +337,30 @@ public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQueryWithOptions(C
324337
).flatMap(identity());
325338

326339
var tmpJooqQuery = DSL.using(tx)
327-
.selectFrom(SELECT_CLAUSE + " FROM " + this.tableNames.tableName)
340+
.select(
341+
ID,
342+
ENTITY_ID,
343+
SEQUENCE_NUM,
344+
EVENT_TYPE,
345+
VERSION,
346+
TRANSACTION_ID,
347+
EVENT,
348+
METADATA,
349+
CONTEXT,
350+
TOTAL_MESSAGE_IN_TRANSACTION,
351+
NUM_MESSAGE_IN_TRANSACTION,
352+
USER_ID,
353+
SYSTEM_ID,
354+
EMISSION_DATE,
355+
PUBLISHED
356+
)
357+
.from(this.tableNames.tableName)
328358
.where(clauses.toJavaList())
329-
.orderBy(field("sequence_num").asc())
330-
;
359+
.orderBy(field("sequence_num").asc());
331360
var jooqQuery = Objects.nonNull(query.size) ? tmpJooqQuery.limit(query.size) : tmpJooqQuery;
332361

333362
LOGGER.debug("{}", jooqQuery);
334-
return Flux.fromStream(() -> jooqQuery.stream().map(r -> rsToEnvelope(r.intoResultSet())))
363+
return Flux.fromStream(() -> jooqQuery.stream().map(r -> rsToEnvelope(r)))
335364
.doFinally(any -> {
336365
if (autoClose) {
337366
try {
@@ -351,34 +380,34 @@ public Publisher<EventEnvelope<E, Meta, Context>> loadEventsByQuery(Query query)
351380
);
352381
}
353382

354-
private EventEnvelope<E, Meta, Context> rsToEnvelope(ResultSet rs) {
383+
private EventEnvelope<E, Meta, Context> rsToEnvelope(Record15<UUID, String, Long, String, Long, String, String, String, String, Integer, Integer, String, String, Timestamp, Boolean> rs) {
355384
return Try
356385
.of(() -> {
357-
String event_type = rs.getString("event_type");
358-
long version = rs.getLong("version");
359-
JsonNode event = readValue(rs.getString("event")).getOrElse(NullNode.getInstance());
386+
String event_type = rs.get(EVENT_TYPE);
387+
long version = rs.get(VERSION);
388+
JsonNode event = readValue(rs.get(EVENT)).getOrElse(NullNode.getInstance());
360389
Either<?, E> eventRead = eventFormat.read(event_type, version, event);
361390
eventRead.swap().forEach(err -> {
362391
LOGGER.error("Error reading event {} : {}", event, err);
363392
});
364393
EventEnvelope.Builder<E, Meta, Context> builder = EventEnvelope.<E, Meta, Context>builder()
365-
.withId(UUID.fromString(rs.getString("id")))
366-
.withEntityId(rs.getString("entity_id"))
367-
.withSequenceNum(rs.getLong("sequence_num"))
394+
.withId(rs.get(ID))
395+
.withEntityId(rs.get(ENTITY_ID))
396+
.withSequenceNum(rs.get(SEQUENCE_NUM))
368397
.withEventType(event_type)
369398
.withVersion(version)
370-
.withTransactionId(rs.getString("transaction_id"))
399+
.withTransactionId(rs.get(TRANSACTION_ID))
371400
.withEvent(eventRead.get())
372-
.withEmissionDate(rs.getTimestamp("emission_date").toLocalDateTime())
373-
.withPublished(rs.getBoolean("published"))
374-
.withSystemId(rs.getString("system_id"))
375-
.withUserId(rs.getString("user_id"))
376-
.withPublished(rs.getBoolean("published"))
377-
.withNumMessageInTransaction(rs.getInt("num_message_in_transaction"))
378-
.withTotalMessageInTransaction(rs.getInt("total_message_in_transaction"));
379-
380-
metaFormat.read(readValue(rs.getString("metadata"))).forEach(builder::withMetadata);
381-
contextFormat.read(readValue(rs.getString("context"))).forEach(builder::withContext);
401+
.withEmissionDate(rs.get(EMISSION_DATE).toLocalDateTime())
402+
.withPublished(rs.get(PUBLISHED))
403+
.withSystemId(rs.get(SYSTEM_ID))
404+
.withUserId(rs.get(USER_ID))
405+
.withPublished(rs.get(PUBLISHED))
406+
.withNumMessageInTransaction(rs.get(NUM_MESSAGE_IN_TRANSACTION))
407+
.withTotalMessageInTransaction(rs.get(TOTAL_MESSAGE_IN_TRANSACTION));
408+
409+
metaFormat.read(readValue(rs.get(METADATA))).forEach(builder::withMetadata);
410+
contextFormat.read(readValue(rs.get(CONTEXT))).forEach(builder::withContext);
382411
return builder.build();
383412
})
384413
.getOrElseThrow(e ->

0 commit comments

Comments
 (0)