Skip to content

Commit 98dd021

Browse files
authored
Merge pull request #1100 from the-thing/jdbc-store-sequence-bug
`JdbcStore` concurrency bug fix
2 parents 0b05c8d + ae30291 commit 98dd021

File tree

5 files changed

+78
-38
lines changed

5 files changed

+78
-38
lines changed

quickfixj-core/src/main/java/quickfix/JdbcStore.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ class JdbcStore implements MessageStore {
4343
private final String defaultSessionIdPropertyValue;
4444
private final boolean persistMessages;
4545

46-
private String SQL_UPDATE_SEQNUMS;
46+
private String SQL_UPDATE_INCOMING_SEQNUM;
47+
private String SQL_UPDATE_OUTGOING_SEQNUM;
4748
private String SQL_INSERT_SESSION;
4849
private String SQL_GET_SEQNUMS;
4950
private String SQL_UPDATE_MESSAGE;
@@ -96,8 +97,12 @@ public static String getMessageTableName(SessionSettings settings, SessionID ses
9697
}
9798
}
9899

99-
public static String getUpdateSequenceNumsSql(String sessionTableName, String idWhereClause) {
100-
return "UPDATE " + sessionTableName + " SET incoming_seqnum=?, " + "outgoing_seqnum=? WHERE " + idWhereClause;
100+
public static String getUpdateIncomingSequenceNumberSql(String sessionTableName, String idWhereClause) {
101+
return "UPDATE " + sessionTableName + " SET incoming_seqnum=? WHERE " + idWhereClause;
102+
}
103+
104+
public static String getUpdateOutgoingSequenceNumberSql(String sessionTableName, String idWhereClause) {
105+
return "UPDATE " + sessionTableName + " SET outgoing_seqnum=? WHERE " + idWhereClause;
101106
}
102107

103108
public static String getInsertSessionSql(String sessionTableName, String idColumns, String idPlaceholders) {
@@ -133,7 +138,8 @@ private void setSqlStrings() {
133138
String idColumns = JdbcUtil.getIDColumns(extendedSessionIdSupported);
134139
String idPlaceholders = JdbcUtil.getIDPlaceholders(extendedSessionIdSupported);
135140

136-
SQL_UPDATE_SEQNUMS = getUpdateSequenceNumsSql(sessionTableName, idWhereClause);
141+
SQL_UPDATE_INCOMING_SEQNUM = getUpdateIncomingSequenceNumberSql(sessionTableName, idWhereClause);
142+
SQL_UPDATE_OUTGOING_SEQNUM = getUpdateOutgoingSequenceNumberSql(sessionTableName, idWhereClause);
137143
SQL_INSERT_SESSION = getInsertSessionSql(sessionTableName, idColumns, idPlaceholders);
138144
SQL_GET_SEQNUMS = getSequenceNumsSql(sessionTableName, idWhereClause);
139145
SQL_UPDATE_MESSAGE = getUpdateMessageSql(messageTableName, idWhereClause);
@@ -293,23 +299,22 @@ public boolean set(int sequence, String message) throws IOException {
293299

294300
public void setNextSenderMsgSeqNum(int next) throws IOException {
295301
cache.setNextSenderMsgSeqNum(next);
296-
storeSequenceNumbers();
302+
storeSequenceNumber(SQL_UPDATE_OUTGOING_SEQNUM, next);
297303
}
298304

299305
public void setNextTargetMsgSeqNum(int next) throws IOException {
300306
cache.setNextTargetMsgSeqNum(next);
301-
storeSequenceNumbers();
307+
storeSequenceNumber(SQL_UPDATE_INCOMING_SEQNUM, next);
302308
}
303309

304-
private void storeSequenceNumbers() throws IOException {
310+
private void storeSequenceNumber(String sequenceUpdateSql, int sequence) throws IOException {
305311
Connection connection = null;
306312
PreparedStatement update = null;
307313
try {
308314
connection = dataSource.getConnection();
309-
update = connection.prepareStatement(SQL_UPDATE_SEQNUMS);
310-
update.setInt(1, cache.getNextTargetMsgSeqNum());
311-
update.setInt(2, cache.getNextSenderMsgSeqNum());
312-
setSessionIdParameters(update, 3);
315+
update = connection.prepareStatement(sequenceUpdateSql);
316+
update.setInt(1, sequence);
317+
setSessionIdParameters(update, 2);
313318
update.execute();
314319
} catch (SQLException e) {
315320
throw new IOException(e.getMessage(), e);

quickfixj-core/src/main/java/quickfix/MemoryStore.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,15 @@ public Calendar getCreationTimeCalendar() throws IOException {
7474
return creationTime;
7575
}
7676

77-
/* package */void setCreationTime(Calendar creationTime) {
77+
void setCreationTime(Calendar creationTime) {
7878
this.creationTime = creationTime;
7979
}
8080

81-
public int getNextSenderMsgSeqNum() throws IOException {
81+
public int getNextSenderMsgSeqNum() {
8282
return nextSenderMsgSeqNum;
8383
}
8484

85-
public int getNextTargetMsgSeqNum() throws IOException {
85+
public int getNextTargetMsgSeqNum() {
8686
return nextTargetMsgSeqNum;
8787
}
8888

quickfixj-core/src/test/java/quickfix/JdbcStoreTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,36 @@ public void testSequenceNumbersWithCustomSessionsTableName() throws Exception {
104104
assertEquals("wrong value", 1, store.getNextTargetMsgSeqNum());
105105
}
106106

107+
public void testIncrementSequences() throws ConfigError, SQLException, IOException {
108+
initializeTableDefinitions("xsessions", "messages");
109+
JdbcStore store = (JdbcStore) getMessageStoreFactory("xsessions", "messages").create(
110+
getSessionID());
111+
store.reset();
112+
113+
assertEquals("wrong value", 1, store.getNextSenderMsgSeqNum());
114+
assertEquals("wrong value", 1, store.getNextTargetMsgSeqNum());
115+
116+
store.incrNextSenderMsgSeqNum();
117+
118+
assertEquals("wrong value", 2, store.getNextSenderMsgSeqNum());
119+
assertEquals("wrong value", 1, store.getNextTargetMsgSeqNum());
120+
121+
store.incrNextTargetMsgSeqNum();
122+
123+
assertEquals("wrong value", 2, store.getNextSenderMsgSeqNum());
124+
assertEquals("wrong value", 2, store.getNextTargetMsgSeqNum());
125+
126+
store.incrNextTargetMsgSeqNum();
127+
128+
assertEquals("wrong value", 2, store.getNextSenderMsgSeqNum());
129+
assertEquals("wrong value", 3, store.getNextTargetMsgSeqNum());
130+
131+
store.incrNextSenderMsgSeqNum();
132+
133+
assertEquals("wrong value", 3, store.getNextSenderMsgSeqNum());
134+
assertEquals("wrong value", 3, store.getNextTargetMsgSeqNum());
135+
}
136+
107137
public void testMessageStorageMessagesWithCustomMessagesTableName() throws Exception {
108138
initializeTableDefinitions("sessions", "xmessages");
109139
JdbcStore store = (JdbcStore) getMessageStoreFactory("sessions", "xmessages").create(

quickfixj-stress-test/src/main/java/quickfix/JdbcStoreStressTest.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Calendar;
3535
import java.util.concurrent.locks.Lock;
3636
import java.util.concurrent.locks.ReentrantLock;
37+
import java.util.function.IntConsumer;
3738

3839
import static org.mockito.Mockito.doAnswer;
3940
import static org.mockito.Mockito.doReturn;
@@ -85,8 +86,15 @@ private static JdbcStoreWrapper createWrapper() throws Exception {
8586
// UPDATE SEQUENCE NUMS
8687
Database database = new Database();
8788

88-
String updateSequenceNumsSql = JdbcStore.getUpdateSequenceNumsSql(sessionTableName, idWhereClause);
89-
doAnswer(invocationOnMock -> new UpdateSequenceStatement(database)).when(connection).prepareStatement(updateSequenceNumsSql);
89+
String updateIncomingSequenceNumberSql = JdbcStore.getUpdateIncomingSequenceNumberSql(sessionTableName, idWhereClause);
90+
doAnswer(invocationOnMock -> new UpdateSequenceStatement(database::updateTargetSequence))
91+
.when(connection)
92+
.prepareStatement(updateIncomingSequenceNumberSql);
93+
94+
String updateOutgoingSequenceNumberSql = JdbcStore.getUpdateOutgoingSequenceNumberSql(sessionTableName, idWhereClause);
95+
doAnswer(invocationOnMock -> new UpdateSequenceStatement(database::updateSenderSequence))
96+
.when(connection)
97+
.prepareStatement(updateOutgoingSequenceNumberSql);
9098

9199
JdbcStore jdbcStore = new JdbcStore(settings, SESSION_ID, dataSource);
92100

@@ -280,11 +288,20 @@ public Database() {
280288
this.targetSequence = -1;
281289
}
282290

283-
public void update(int senderSequence, int targetSequence) {
291+
public void updateSenderSequence(int senderSequence) {
284292
lock.lock();
285293

286294
try {
287295
this.senderSequence = senderSequence;
296+
} finally {
297+
lock.unlock();
298+
}
299+
}
300+
301+
public void updateTargetSequence(int targetSequence) {
302+
lock.lock();
303+
304+
try {
288305
this.targetSequence = targetSequence;
289306
} finally {
290307
lock.unlock();
@@ -294,14 +311,12 @@ public void update(int senderSequence, int targetSequence) {
294311

295312
private static final class UpdateSequenceStatement implements PreparedStatement {
296313

297-
private final Database database;
298-
private int senderSequence;
299-
private int targetSequence;
314+
private final IntConsumer dbUpdater;
315+
private int sequence;
300316

301-
public UpdateSequenceStatement(Database database) {
302-
this.database = database;
303-
this.senderSequence = -1;
304-
this.targetSequence = -1;
317+
public UpdateSequenceStatement(IntConsumer dbUpdater) {
318+
this.dbUpdater = dbUpdater;
319+
this.sequence = -1;
305320
}
306321

307322
@Override
@@ -337,9 +352,7 @@ public void setShort(int parameterIndex, short x) {
337352
@Override
338353
public void setInt(int parameterIndex, int x) {
339354
if (parameterIndex == 1) {
340-
targetSequence = x;
341-
} else if (parameterIndex == 2) {
342-
senderSequence = x;
355+
sequence = x;
343356
}
344357
}
345358

@@ -419,7 +432,7 @@ public void setObject(int parameterIndex, Object x) {
419432

420433
@Override
421434
public boolean execute() {
422-
database.update(senderSequence, targetSequence);
435+
dbUpdater.accept(sequence);
423436
return true;
424437
}
425438

quickfixj-stress-test/src/main/java/quickfix/MemoryStoreStressTest.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -126,19 +126,11 @@ public void incrementTargetSequence() {
126126
}
127127

128128
public int getSenderSequence() {
129-
try {
130-
return store.getNextSenderMsgSeqNum();
131-
} catch (IOException e) {
132-
throw new RuntimeException(e);
133-
}
129+
return store.getNextSenderMsgSeqNum();
134130
}
135131

136132
public int getTargetSequence() {
137-
try {
138-
return store.getNextTargetMsgSeqNum();
139-
} catch (IOException e) {
140-
throw new RuntimeException(e);
141-
}
133+
return store.getNextTargetMsgSeqNum();
142134
}
143135
}
144136
}

0 commit comments

Comments
 (0)