Skip to content

Commit a3f539c

Browse files
committed
RTSDK-10072 RTSDK-10184 Fix exceptions when multiple Consumer threads are started in the same application
1 parent e9c377a commit a3f539c

File tree

3 files changed

+100
-58
lines changed

3 files changed

+100
-58
lines changed

Java/Eta/Converter/src/main/java/com/refinitiv/eta/json/util/ByteArrayPool.java

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
*| This source code is provided under the Apache 2.0 license
33
*| and is provided AS IS with no warranty or guarantee of fit for purpose.
44
*| See the project's LICENSE.md for details.
5-
*| Copyright (C) 2021-2022,2024 LSEG. All rights reserved.
5+
*| Copyright (C) 2021-2022,2024-2025 LSEG. All rights reserved.
66
*|-----------------------------------------------------------------------------
77
*/
88

@@ -12,6 +12,7 @@
1212
import java.util.LinkedList;
1313
import java.util.List;
1414
import java.util.Map;
15+
import java.util.concurrent.locks.ReentrantLock;
1516

1617
public class ByteArrayPool {
1718

@@ -21,6 +22,8 @@ public class ByteArrayPool {
2122
private int DEFAULT_ARRAY_SIZE = 4096;
2223
private int DEFAULT_NUM_OF_POOLS = 8;
2324

25+
private ReentrantLock _lock = new ReentrantLock();
26+
2427

2528
public ByteArrayPool() {
2629
arrayPools = new HashMap<>();
@@ -39,23 +42,41 @@ public ByteArrayPool() {
3942

4043
public byte[] poll(int length) {
4144

42-
int n = length / DEFAULT_ARRAY_SIZE + 1;
43-
int newLen = DEFAULT_ARRAY_SIZE * n;
44-
ObjectPool<byte[]> pool = arrayPools.get(newLen);
45-
if (pool == null) {
46-
pool = new ObjectPool<>(true, () -> new byte[newLen]);
47-
arrayPools.put(newLen, pool);
48-
}
45+
try
46+
{
47+
_lock.lock();
4948

50-
return pool.get();
49+
int n = length / DEFAULT_ARRAY_SIZE + 1;
50+
int newLen = DEFAULT_ARRAY_SIZE * n;
51+
ObjectPool<byte[]> pool = arrayPools.get(newLen);
52+
if (pool == null) {
53+
pool = new ObjectPool<>(true, () -> new byte[newLen]);
54+
arrayPools.put(newLen, pool);
55+
}
56+
57+
return pool.get();
58+
}
59+
finally
60+
{
61+
_lock.unlock();
62+
}
5163
}
5264

5365
public void putBack(byte[] arr) {
5466

55-
if (arr != null) {
56-
ObjectPool<byte[]> pool = arrayPools.get(arr.length);
57-
if (pool != null)
58-
pool.release(arr);
67+
try
68+
{
69+
_lock.lock();
70+
71+
if (arr != null) {
72+
ObjectPool<byte[]> pool = arrayPools.get(arr.length);
73+
if (pool != null)
74+
pool.release(arr);
75+
}
76+
}
77+
finally
78+
{
79+
_lock.unlock();
5980
}
6081
}
6182

Java/Eta/Converter/src/main/java/com/refinitiv/eta/json/util/JsonFactory.java

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
*| This source code is provided under the Apache 2.0 license
33
*| and is provided AS IS with no warranty or guarantee of fit for purpose.
44
*| See the project's LICENSE.md for details.
5-
*| Copyright (C) 2021-2022,2024 LSEG. All rights reserved.
5+
*| Copyright (C) 2021-2022,2024-2025 LSEG. All rights reserved.
66
*|-----------------------------------------------------------------------------
77
*/
88

@@ -14,6 +14,7 @@
1414
import com.refinitiv.eta.codec.*;
1515

1616
import java.nio.ByteBuffer;
17+
import java.util.concurrent.locks.ReentrantLock;
1718

1819
public class JsonFactory {
1920
private static ObjectPool<Int> intPool = new ObjectPool<>(true, CodecFactory::createInt);
@@ -56,6 +57,8 @@ public class JsonFactory {
5657
private static boolean isInitialized = false;
5758
public static final int JSON_CONVERTER_DEFAULT_POOLS_SIZE = 10;
5859

60+
private static final ReentrantLock _initLock = new ReentrantLock();
61+
5962
private JsonFactory() {
6063
throw new AssertionError();
6164
}
@@ -335,45 +338,53 @@ public static void releaseByteArray(byte[] array) {
335338
byteArrayPool.putBack(array);
336339
}
337340

338-
339341
public static void initPools(int numOfObjects)
340342
{
341-
if(!isInitialized && numOfObjects > 0)
343+
_initLock.lock();
344+
345+
try
346+
{
347+
if(!isInitialized && numOfObjects > 0)
348+
{
349+
intPool.growPool(numOfObjects);
350+
uintPool.growPool(numOfObjects);
351+
elementListPool.growPool(numOfObjects);
352+
elementEntryPool.growPool(numOfObjects);
353+
bufferPool.growPool(numOfObjects);
354+
fieldListPool.growPool(numOfObjects);
355+
fieldEntryPool.growPool(numOfObjects);
356+
vectorPool.growPool(numOfObjects);
357+
vectorEntryPool.growPool(numOfObjects);
358+
seriesPool.growPool(numOfObjects);
359+
seriesEntryPool.growPool(numOfObjects);
360+
filterListPool.growPool(numOfObjects);
361+
filterEntryPool.growPool(numOfObjects);
362+
realPool.growPool(numOfObjects);
363+
doublePool.growPool(numOfObjects);
364+
floatPool.growPool(numOfObjects);
365+
statePool.growPool(numOfObjects);
366+
qosPool.growPool(numOfObjects);
367+
mapPool.growPool(numOfObjects);
368+
mapEntryPool.growPool(numOfObjects);
369+
enumPool.growPool(numOfObjects);
370+
timePool.growPool(numOfObjects);
371+
datePool.growPool(numOfObjects);
372+
dateTimePool.growPool(numOfObjects);
373+
arrayPool.growPool(numOfObjects);
374+
arrayEntryPool.growPool(numOfObjects);
375+
msgPool.growPool(numOfObjects);
376+
fieldSetDefDbPool.growPool(numOfObjects);
377+
elementSetDefDbPool.growPool(numOfObjects);
378+
decodeIterPool.growPool(numOfObjects);
379+
encodeIteratorPool.growPool(numOfObjects);
380+
byteBufferPool.growPool(numOfObjects);
381+
382+
isInitialized = true;
383+
}
384+
}
385+
finally
342386
{
343-
intPool.growPool(numOfObjects);
344-
uintPool.growPool(numOfObjects);
345-
elementListPool.growPool(numOfObjects);
346-
elementEntryPool.growPool(numOfObjects);
347-
bufferPool.growPool(numOfObjects);
348-
fieldListPool.growPool(numOfObjects);
349-
fieldEntryPool.growPool(numOfObjects);
350-
vectorPool.growPool(numOfObjects);
351-
vectorEntryPool.growPool(numOfObjects);
352-
seriesPool.growPool(numOfObjects);
353-
seriesEntryPool.growPool(numOfObjects);
354-
filterListPool.growPool(numOfObjects);
355-
filterEntryPool.growPool(numOfObjects);
356-
realPool.growPool(numOfObjects);
357-
doublePool.growPool(numOfObjects);
358-
floatPool.growPool(numOfObjects);
359-
statePool.growPool(numOfObjects);
360-
qosPool.growPool(numOfObjects);
361-
mapPool.growPool(numOfObjects);
362-
mapEntryPool.growPool(numOfObjects);
363-
enumPool.growPool(numOfObjects);
364-
timePool.growPool(numOfObjects);
365-
datePool.growPool(numOfObjects);
366-
dateTimePool.growPool(numOfObjects);
367-
arrayPool.growPool(numOfObjects);
368-
arrayEntryPool.growPool(numOfObjects);
369-
msgPool.growPool(numOfObjects);
370-
fieldSetDefDbPool.growPool(numOfObjects);
371-
elementSetDefDbPool.growPool(numOfObjects);
372-
decodeIterPool.growPool(numOfObjects);
373-
encodeIteratorPool.growPool(numOfObjects);
374-
byteBufferPool.growPool(numOfObjects);
375-
376-
isInitialized = true;
387+
_initLock.unlock();
377388
}
378389
}
379390
}

Java/Eta/Core/src/main/java/com/refinitiv/eta/transport/HeaderWebSocketSessionKeyHandler.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424
class HeaderWebSocketSessionKeyHandler implements HeaderWebSocketHandler {
2525

26+
private static final ReentrantLock _encodeLock = new ReentrantLock();
27+
2628
private static final String SHA_1 = "SHA-1";
2729

2830
private static final String WEB_SOCKET_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
@@ -31,10 +33,6 @@ class HeaderWebSocketSessionKeyHandler implements HeaderWebSocketHandler {
3133

3234
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
3335

34-
private final Buffer internalKeyBuffer;
35-
36-
private final ByteBuffer uuidBuffer = ByteBuffer.allocate(16);
37-
3836
private final boolean response;
3937

4038
static {
@@ -48,7 +46,6 @@ class HeaderWebSocketSessionKeyHandler implements HeaderWebSocketHandler {
4846

4947
public HeaderWebSocketSessionKeyHandler(boolean response) {
5048
this.response = response;
51-
this.internalKeyBuffer = CodecFactory.createBuffer();
5249
}
5350

5451
@Override
@@ -70,6 +67,7 @@ public int decodeWebSocketHeader(WebSocketSession session, HttpMessageHandler pa
7067
wsKey = wsKeyHeaders.get(0);
7168
}
7269

70+
Buffer internalKeyBuffer = CodecFactory.createBuffer();
7371
final Buffer keyBuffer = response ? internalKeyBuffer : session.getWsSessionKey();
7472
keyBuffer.clear();
7573
keyBuffer.data(wsKey);
@@ -91,8 +89,19 @@ public int encodeWebSocketHeader(WebSocketSession session, HttpHeaders httpHeade
9189
generateWebSocketSessionKey(session.getWsSessionKey());
9290
}
9391
byte[] concatenatedWebSocketAcceptKey = (session.getWsSessionKey() + WEB_SOCKET_GUID).getBytes(StandardCharsets.US_ASCII);
94-
SHA_MSG_DIGEST.update(concatenatedWebSocketAcceptKey);
95-
final byte[] wsKeyAccepted = BASE64_ENCODER.encode(SHA_MSG_DIGEST.digest());
92+
byte[] wsKeyAccepted = null;
93+
94+
try
95+
{
96+
_encodeLock.lock();
97+
SHA_MSG_DIGEST.update(concatenatedWebSocketAcceptKey);
98+
wsKeyAccepted = BASE64_ENCODER.encode(SHA_MSG_DIGEST.digest());
99+
}
100+
finally
101+
{
102+
_encodeLock.unlock();
103+
}
104+
96105
session.getWsSessionAcceptKey().data(new String(wsKeyAccepted));
97106
final String wsKeyHeader = response ? session.getWsSessionAcceptKey().toString() : session.getWsSessionKey().toString();
98107
httpHeaders.addHeader(headerName, HttpHeaderLineInfo.valueOf(wsKeyHeader));
@@ -101,7 +110,8 @@ public int encodeWebSocketHeader(WebSocketSession session, HttpHeaders httpHeade
101110

102111
private void generateWebSocketSessionKey(Buffer webSocketSessionKeyBuffer) {
103112
final UUID uuid = UUID.randomUUID();
104-
uuidBuffer.clear();
113+
ByteBuffer uuidBuffer = ByteBuffer.allocate(16);
114+
105115
uuidBuffer
106116
.putLong(uuid.getMostSignificantBits())
107117
.putLong(uuid.getLeastSignificantBits());

0 commit comments

Comments
 (0)