Skip to content

Commit 3ddaad9

Browse files
authored
GH-3578: Fix JdbcMessageStore.getMessageGroup() (#3579)
Fixes #3578 The `JdbcTemplate.queryForMap()` extract values for columns to the closer target driver types. For example H2 and Derby return `Long` for `BIGINT`. Oracle for its `NUMBER(19,0)` returns `BigInteger`. This makes the code in the `JdbcMessageStore.getMessageGroup()` not platform independent. * Fix `JdbcMessageStore` to map `ResultSet` to the `MessageGroupMetadata` directly. Mostly reinstating the previous behavior * For that reason expose a default ctor for `MessageGroupMetadata` and extract some setters to make code in the `JdbcMessageStore` more cleaner. * This opens for us a possibility to implement a `MessageGroupStore.getGroupMetadata(groupId)` for `JdbcMessageStore` * Fix deprecation for `Flux.limitRequest()`
1 parent 703ddc8 commit 3ddaad9

File tree

3 files changed

+46
-30
lines changed

3 files changed

+46
-30
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -391,10 +391,10 @@ private Flux<Message<?>> createFluxGenerator() {
391391
fluxSink.complete();
392392
}
393393
})
394-
.limitRequest(
395-
this.maxMessagesPerPoll < 0
394+
.take(this.maxMessagesPerPoll < 0
396395
? Long.MAX_VALUE
397-
: this.maxMessagesPerPoll);
396+
: this.maxMessagesPerPoll,
397+
true);
398398
}
399399
})
400400
.subscribeOn(Schedulers.fromExecutor(this.taskExecutor))

spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupMetadata.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,7 +40,7 @@ public class MessageGroupMetadata implements Serializable {
4040

4141
private static final long serialVersionUID = 1L;
4242

43-
private List<UUID> messageIds = new LinkedList<>();
43+
private final List<UUID> messageIds = new LinkedList<>();
4444

4545
private long timestamp;
4646

@@ -52,8 +52,7 @@ public class MessageGroupMetadata implements Serializable {
5252

5353
private volatile String condition;
5454

55-
private MessageGroupMetadata() {
56-
//For Jackson deserialization
55+
public MessageGroupMetadata() {
5756
}
5857

5958
public MessageGroupMetadata(MessageGroup messageGroup) {
@@ -79,7 +78,7 @@ boolean add(UUID messageId) {
7978
return !this.messageIds.contains(messageId) && this.messageIds.add(messageId);
8079
}
8180

82-
void setLastModified(long lastModified) {
81+
public void setLastModified(long lastModified) {
8382
this.lastModified = lastModified;
8483
}
8584

@@ -107,7 +106,7 @@ public List<UUID> getMessageIds() {
107106
return new LinkedList<UUID>(this.messageIds);
108107
}
109108

110-
void complete() {
109+
public void complete() {
111110
this.complete = true;
112111
}
113112

@@ -123,11 +122,15 @@ public long getTimestamp() {
123122
return this.timestamp;
124123
}
125124

125+
public void setTimestamp(long timestamp) {
126+
this.timestamp = timestamp;
127+
}
128+
126129
public int getLastReleasedMessageSequenceNumber() {
127130
return this.lastReleasedMessageSequenceNumber;
128131
}
129132

130-
void setLastReleasedMessageSequenceNumber(int lastReleasedMessageSequenceNumber) {
133+
public void setLastReleasedMessageSequenceNumber(int lastReleasedMessageSequenceNumber) {
131134
this.lastReleasedMessageSequenceNumber = lastReleasedMessageSequenceNumber;
132135
}
133136

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.sql.Timestamp;
2222
import java.util.Arrays;
2323
import java.util.Collection;
24-
import java.util.Collections;
2524
import java.util.HashMap;
2625
import java.util.Iterator;
2726
import java.util.List;
@@ -38,6 +37,7 @@
3837
import org.springframework.dao.IncorrectResultSizeDataAccessException;
3938
import org.springframework.integration.store.AbstractMessageGroupStore;
4039
import org.springframework.integration.store.MessageGroup;
40+
import org.springframework.integration.store.MessageGroupMetadata;
4141
import org.springframework.integration.store.MessageMetadata;
4242
import org.springframework.integration.store.MessageStore;
4343
import org.springframework.integration.store.SimpleMessageGroup;
@@ -334,11 +334,13 @@ public <T> Message<T> addMessage(final Message<T> message) {
334334
@Override
335335
public void addMessagesToGroup(Object groupId, Message<?>... messages) {
336336
String groupKey = getKey(groupId);
337-
Map<String, Object> groupInfo = getGroupMetadata(groupKey);
337+
MessageGroupMetadata groupMetadata = getGroupMetadata(groupKey);
338338

339-
Timestamp updatedDate = new Timestamp(System.currentTimeMillis());
340-
boolean groupNotExist = groupInfo.isEmpty();
341-
Timestamp createdDate = groupNotExist ? updatedDate : (Timestamp) groupInfo.get("CREATED_DATE");
339+
boolean groupNotExist = groupMetadata == null;
340+
Timestamp createdDate =
341+
groupNotExist
342+
? new Timestamp(System.currentTimeMillis())
343+
: new Timestamp(groupMetadata.getTimestamp());
342344

343345
for (Message<?> message : messages) {
344346
addMessage(message);
@@ -397,28 +399,39 @@ public int messageGroupSize(Object groupId) {
397399

398400
@Override
399401
public MessageGroup getMessageGroup(Object groupId) {
400-
String key = getKey(groupId);
401-
Map<String, Object> groupInfo = getGroupMetadata(key);
402-
403-
if (groupInfo.isEmpty()) {
402+
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
403+
if (groupMetadata != null) {
404+
MessageGroup messageGroup =
405+
getMessageGroupFactory()
406+
.create(this, groupId, groupMetadata.getTimestamp(), groupMetadata.isComplete());
407+
messageGroup.setLastModified(groupMetadata.getLastModified());
408+
messageGroup.setLastReleasedMessageSequenceNumber(groupMetadata.getLastReleasedMessageSequenceNumber());
409+
messageGroup.setCondition(groupMetadata.getCondition());
410+
return messageGroup;
411+
}
412+
else {
404413
return new SimpleMessageGroup(groupId);
405414
}
406-
407-
MessageGroup messageGroup = getMessageGroupFactory()
408-
.create(this, groupId, ((Timestamp) groupInfo.get("CREATED_DATE")).getTime(),
409-
((Long) groupInfo.get("COMPLETE")) > 0);
410-
messageGroup.setLastModified(((Timestamp) groupInfo.get("UPDATED_DATE")).getTime());
411-
messageGroup.setLastReleasedMessageSequenceNumber(((Long) groupInfo.get("LAST_RELEASED_SEQUENCE")).intValue());
412-
messageGroup.setCondition((String) groupInfo.get("CONDITION"));
413-
return messageGroup;
414415
}
415416

416-
private Map<String, Object> getGroupMetadata(String groupKey) {
417+
@Override
418+
public MessageGroupMetadata getGroupMetadata(Object groupId) {
419+
String key = getKey(groupId);
417420
try {
418-
return this.jdbcTemplate.queryForMap(getQuery(Query.GET_GROUP_INFO), groupKey, this.region);
421+
return this.jdbcTemplate.queryForObject(getQuery(Query.GET_GROUP_INFO), (rs, rowNum) -> {
422+
MessageGroupMetadata groupMetadata = new MessageGroupMetadata();
423+
if (rs.getInt("COMPLETE") > 0) {
424+
groupMetadata.complete();
425+
}
426+
groupMetadata.setTimestamp(rs.getTimestamp("CREATED_DATE").getTime());
427+
groupMetadata.setLastModified(rs.getTimestamp("UPDATED_DATE").getTime());
428+
groupMetadata.setLastReleasedMessageSequenceNumber(rs.getInt("LAST_RELEASED_SEQUENCE"));
429+
groupMetadata.setCondition(rs.getString("CONDITION"));
430+
return groupMetadata;
431+
}, key, this.region);
419432
}
420433
catch (IncorrectResultSizeDataAccessException ex) {
421-
return Collections.emptyMap();
434+
return null;
422435
}
423436
}
424437

0 commit comments

Comments
 (0)