Skip to content

Commit ed079d8

Browse files
authored
fix(java): enforce immediate timeouts (#5264)
--------- Signed-off-by: Avi Fenesh <aviarchi1994@gmail.com>
1 parent 7a6992f commit ed079d8

File tree

14 files changed

+815
-196
lines changed

14 files changed

+815
-196
lines changed

java/client/src/main/java/glide/api/BaseClient.java

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,9 @@ protected static CommandManager buildCommandManager(ConnectionManager connection
519519
// We'll update this once the connection provides the native handle
520520
GlideCoreClient core =
521521
new GlideCoreClient(
522-
connectionManager.getNativeClientHandle(), connectionManager.getMaxInflightRequests());
522+
connectionManager.getNativeClientHandle(),
523+
connectionManager.getMaxInflightRequests(),
524+
connectionManager.getRequestTimeoutMs());
523525
// Register for PubSub push delivery
524526
try {
525527
GlideCoreClient.registerClient(connectionManager.getNativeClientHandle(), null);
@@ -2703,13 +2705,14 @@ public CompletableFuture<Map<GlideString, Double>> zpopmin(@NonNull GlideString
27032705
@Override
27042706
public CompletableFuture<Object[]> bzpopmin(@NonNull String[] keys, double timeout) {
27052707
String[] arguments = ArrayUtils.add(keys, Double.toString(timeout));
2706-
return commandManager.submitNewCommand(BZPopMin, arguments, this::handleArrayOrNullResponse);
2708+
return commandManager.submitBlockingCommand(
2709+
BZPopMin, arguments, this::handleArrayOrNullResponse);
27072710
}
27082711

27092712
@Override
27102713
public CompletableFuture<Object[]> bzpopmin(@NonNull GlideString[] keys, double timeout) {
27112714
GlideString[] arguments = ArrayUtils.add(keys, gs(Double.toString(timeout)));
2712-
return commandManager.submitNewCommand(
2715+
return commandManager.submitBlockingCommand(
27132716
BZPopMin, arguments, this::handleArrayOrNullResponseBinary);
27142717
}
27152718

@@ -2741,13 +2744,14 @@ public CompletableFuture<Map<GlideString, Double>> zpopmax(@NonNull GlideString
27412744
@Override
27422745
public CompletableFuture<Object[]> bzpopmax(@NonNull String[] keys, double timeout) {
27432746
String[] arguments = ArrayUtils.add(keys, Double.toString(timeout));
2744-
return commandManager.submitNewCommand(BZPopMax, arguments, this::handleArrayOrNullResponse);
2747+
return commandManager.submitBlockingCommand(
2748+
BZPopMax, arguments, this::handleArrayOrNullResponse);
27452749
}
27462750

27472751
@Override
27482752
public CompletableFuture<Object[]> bzpopmax(@NonNull GlideString[] keys, double timeout) {
27492753
GlideString[] arguments = ArrayUtils.add(keys, gs(Double.toString(timeout)));
2750-
return commandManager.submitNewCommand(
2754+
return commandManager.submitBlockingCommand(
27512755
BZPopMax, arguments, this::handleArrayOrNullResponseBinary);
27522756
}
27532757

@@ -3374,13 +3378,20 @@ public CompletableFuture<Map<GlideString, Map<GlideString, GlideString[][]>>> xr
33743378
public CompletableFuture<Map<String, Map<String, String[][]>>> xread(
33753379
@NonNull Map<String, String> keysAndIds, @NonNull StreamReadOptions options) {
33763380
String[] arguments = options.toArgs(keysAndIds);
3381+
if (options.isBlocking()) {
3382+
return commandManager.submitBlockingCommand(XRead, arguments, this::handleXReadResponse);
3383+
}
33773384
return commandManager.submitNewCommand(XRead, arguments, this::handleXReadResponse);
33783385
}
33793386

33803387
@Override
33813388
public CompletableFuture<Map<GlideString, Map<GlideString, GlideString[][]>>> xreadBinary(
33823389
@NonNull Map<GlideString, GlideString> keysAndIds, @NonNull StreamReadOptions options) {
33833390
GlideString[] arguments = options.toArgsBinary(keysAndIds);
3391+
if (options.isBlocking()) {
3392+
return commandManager.submitBlockingCommand(
3393+
XRead, arguments, this::handleXReadResponseBinary);
3394+
}
33843395
return commandManager.submitNewCommand(XRead, arguments, this::handleXReadResponseBinary);
33853396
}
33863397

@@ -3650,6 +3661,9 @@ public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
36503661
@NonNull String consumer,
36513662
@NonNull StreamReadGroupOptions options) {
36523663
String[] arguments = options.toArgs(group, consumer, keysAndIds);
3664+
if (options.isBlocking()) {
3665+
return commandManager.submitBlockingCommand(XReadGroup, arguments, this::handleXReadResponse);
3666+
}
36533667
return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse);
36543668
}
36553669

@@ -3660,6 +3674,10 @@ public CompletableFuture<Map<GlideString, Map<GlideString, GlideString[][]>>> xr
36603674
@NonNull GlideString consumer,
36613675
@NonNull StreamReadGroupOptions options) {
36623676
GlideString[] arguments = options.toArgsBinary(group, consumer, keysAndIds);
3677+
if (options.isBlocking()) {
3678+
return commandManager.submitBlockingCommand(
3679+
XReadGroup, arguments, this::handleXReadResponseBinary);
3680+
}
36633681
return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponseBinary);
36643682
}
36653683

@@ -4142,14 +4160,14 @@ public CompletableFuture<Long> linsert(
41424160
@Override
41434161
public CompletableFuture<String[]> blpop(@NonNull String[] keys, double timeout) {
41444162
String[] arguments = ArrayUtils.add(keys, Double.toString(timeout));
4145-
return commandManager.submitNewCommand(
4163+
return commandManager.submitBlockingCommand(
41464164
BLPop, arguments, response -> castArray(handleArrayOrNullResponse(response), String.class));
41474165
}
41484166

41494167
@Override
41504168
public CompletableFuture<GlideString[]> blpop(@NonNull GlideString[] keys, double timeout) {
41514169
GlideString[] arguments = ArrayUtils.add(keys, gs(Double.toString(timeout)));
4152-
return commandManager.submitNewCommand(
4170+
return commandManager.submitBlockingCommand(
41534171
BLPop,
41544172
arguments,
41554173
response -> castArray(handleArrayOrNullResponseBinary(response), GlideString.class));
@@ -4158,14 +4176,14 @@ public CompletableFuture<GlideString[]> blpop(@NonNull GlideString[] keys, doubl
41584176
@Override
41594177
public CompletableFuture<String[]> brpop(@NonNull String[] keys, double timeout) {
41604178
String[] arguments = ArrayUtils.add(keys, Double.toString(timeout));
4161-
return commandManager.submitNewCommand(
4179+
return commandManager.submitBlockingCommand(
41624180
BRPop, arguments, response -> castArray(handleArrayOrNullResponse(response), String.class));
41634181
}
41644182

41654183
@Override
41664184
public CompletableFuture<GlideString[]> brpop(@NonNull GlideString[] keys, double timeout) {
41674185
GlideString[] arguments = ArrayUtils.add(keys, gs(Double.toString(timeout)));
4168-
return commandManager.submitNewCommand(
4186+
return commandManager.submitBlockingCommand(
41694187
BRPop,
41704188
arguments,
41714189
response -> castArray(handleArrayOrNullResponseBinary(response), GlideString.class));
@@ -4324,7 +4342,7 @@ public CompletableFuture<Map<String, Object>> bzmpop(
43244342
new String[] {Double.toString(timeout), Integer.toString(keys.length)},
43254343
keys,
43264344
new String[] {modifier.toString()});
4327-
return commandManager.submitNewCommand(
4345+
return commandManager.submitBlockingCommand(
43284346
BZMPop,
43294347
arguments,
43304348
response -> convertKeyValueArrayToMap(handleArrayOrNullResponse(response), Double.class));
@@ -4338,7 +4356,7 @@ public CompletableFuture<Map<GlideString, Object>> bzmpop(
43384356
new GlideString[] {gs(Double.toString(timeout)), gs(Integer.toString(keys.length))},
43394357
keys,
43404358
new GlideString[] {gs(modifier.toString())});
4341-
return commandManager.submitNewCommand(
4359+
return commandManager.submitBlockingCommand(
43424360
BZMPop,
43434361
arguments,
43444362
response ->
@@ -4354,7 +4372,7 @@ public CompletableFuture<Map<String, Object>> bzmpop(
43544372
new String[] {Double.toString(timeout), Integer.toString(keys.length)},
43554373
keys,
43564374
new String[] {modifier.toString(), COUNT_VALKEY_API, Long.toString(count)});
4357-
return commandManager.submitNewCommand(
4375+
return commandManager.submitBlockingCommand(
43584376
BZMPop,
43594377
arguments,
43604378
response -> convertKeyValueArrayToMap(handleArrayOrNullResponse(response), Double.class));
@@ -4370,7 +4388,7 @@ public CompletableFuture<Map<GlideString, Object>> bzmpop(
43704388
new GlideString[] {
43714389
gs(modifier.toString()), gs(COUNT_VALKEY_API), gs(Long.toString(count))
43724390
});
4373-
return commandManager.submitNewCommand(
4391+
return commandManager.submitBlockingCommand(
43744392
BZMPop,
43754393
arguments,
43764394
response ->
@@ -4768,7 +4786,7 @@ public CompletableFuture<Map<String, String[]>> blmpop(
47684786
new String[] {Double.toString(timeout), Long.toString(keys.length)},
47694787
keys,
47704788
new String[] {direction.toString(), COUNT_FOR_LIST_VALKEY_API, Long.toString(count)});
4771-
return commandManager.submitNewCommand(
4789+
return commandManager.submitBlockingCommand(
47724790
BLMPop,
47734791
arguments,
47744792
response -> castMapOfArrays(handleMapOrNullResponse(response), String.class));
@@ -4784,7 +4802,7 @@ public CompletableFuture<Map<GlideString, GlideString[]>> blmpop(
47844802
new GlideString[] {
47854803
gs(direction.toString()), gs(COUNT_FOR_LIST_VALKEY_API), gs(Long.toString(count))
47864804
});
4787-
return commandManager.submitNewCommand(
4805+
return commandManager.submitBlockingCommand(
47884806
BLMPop,
47894807
arguments,
47904808
response ->
@@ -4800,7 +4818,7 @@ public CompletableFuture<Map<String, String[]>> blmpop(
48004818
new String[] {Double.toString(timeout), Long.toString(keys.length)},
48014819
keys,
48024820
new String[] {direction.toString()});
4803-
return commandManager.submitNewCommand(
4821+
return commandManager.submitBlockingCommand(
48044822
BLMPop,
48054823
arguments,
48064824
response -> castMapOfArrays(handleMapOrNullResponse(response), String.class));
@@ -4814,7 +4832,7 @@ public CompletableFuture<Map<GlideString, GlideString[]>> blmpop(
48144832
new GlideString[] {gs(Double.toString(timeout)), gs(Long.toString(keys.length))},
48154833
keys,
48164834
new GlideString[] {gs(direction.toString())});
4817-
return commandManager.submitNewCommand(
4835+
return commandManager.submitBlockingCommand(
48184836
BLMPop,
48194837
arguments,
48204838
response ->
@@ -4868,7 +4886,8 @@ public CompletableFuture<String> blmove(
48684886
new String[] {
48694887
source, destination, wherefrom.toString(), whereto.toString(), Double.toString(timeout)
48704888
};
4871-
return commandManager.submitNewCommand(BLMove, arguments, this::handleStringOrNullResponse);
4889+
return commandManager.submitBlockingCommand(
4890+
BLMove, arguments, this::handleStringOrNullResponse);
48724891
}
48734892

48744893
@Override
@@ -4886,7 +4905,7 @@ public CompletableFuture<GlideString> blmove(
48864905
gs(whereto.toString()),
48874906
gs(Double.toString(timeout))
48884907
};
4889-
return commandManager.submitNewCommand(
4908+
return commandManager.submitBlockingCommand(
48904909
BLMove, arguments, this::handleGlideStringOrNullResponse);
48914910
}
48924911

@@ -6055,15 +6074,15 @@ public CompletableFuture<Object[]> hscan(
60556074

60566075
@Override
60576076
public CompletableFuture<Long> wait(long numreplicas, long timeout) {
6058-
return commandManager.submitNewCommand(
6077+
return commandManager.submitBlockingCommand(
60596078
Wait,
60606079
new String[] {Long.toString(numreplicas), Long.toString(timeout)},
60616080
this::handleLongResponse);
60626081
}
60636082

60646083
@Override
60656084
public CompletableFuture<Long[]> waitaof(long numlocal, long numreplicas, long timeout) {
6066-
return commandManager.submitNewCommand(
6085+
return commandManager.submitBlockingCommand(
60676086
WaitAof,
60686087
new String[] {Long.toString(numlocal), Long.toString(numreplicas), Long.toString(timeout)},
60696088
response -> castArray(handleArrayResponse(response), Long.class));

java/client/src/main/java/glide/api/GlideClient.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import static command_request.CommandRequestOuterClass.RequestType.ConfigResetStat;
88
import static command_request.CommandRequestOuterClass.RequestType.ConfigRewrite;
99
import static command_request.CommandRequestOuterClass.RequestType.ConfigSet;
10-
import static command_request.CommandRequestOuterClass.RequestType.CustomCommand;
1110
import static command_request.CommandRequestOuterClass.RequestType.DBSize;
1211
import static command_request.CommandRequestOuterClass.RequestType.Echo;
1312
import static command_request.CommandRequestOuterClass.RequestType.FlushAll;
@@ -136,13 +135,12 @@ public static CompletableFuture<GlideClient> createClient(
136135

137136
@Override
138137
public CompletableFuture<Object> customCommand(@NonNull String[] args) {
139-
return commandManager.submitNewCommand(CustomCommand, args, this::handleObjectOrNullResponse);
138+
return commandManager.submitCustomCommand(args, this::handleObjectOrNullResponse);
140139
}
141140

142141
@Override
143142
public CompletableFuture<Object> customCommand(@NonNull GlideString[] args) {
144-
return commandManager.submitNewCommand(
145-
CustomCommand, args, this::handleBinaryObjectOrNullResponse);
143+
return commandManager.submitCustomCommand(args, this::handleBinaryObjectOrNullResponse);
146144
}
147145

148146
@Deprecated

java/client/src/main/java/glide/api/GlideClusterClient.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import static command_request.CommandRequestOuterClass.RequestType.ConfigResetStat;
88
import static command_request.CommandRequestOuterClass.RequestType.ConfigRewrite;
99
import static command_request.CommandRequestOuterClass.RequestType.ConfigSet;
10-
import static command_request.CommandRequestOuterClass.RequestType.CustomCommand;
1110
import static command_request.CommandRequestOuterClass.RequestType.DBSize;
1211
import static command_request.CommandRequestOuterClass.RequestType.Echo;
1312
import static command_request.CommandRequestOuterClass.RequestType.FCall;
@@ -167,31 +166,29 @@ public static CompletableFuture<GlideClusterClient> createClient(
167166
@Override
168167
public CompletableFuture<ClusterValue<Object>> customCommand(@NonNull String[] args) {
169168
// TODO if a command returns a map as a single value, ClusterValue misleads user
170-
return commandManager.submitNewCommand(
171-
CustomCommand, args, response -> ClusterValue.of(handleObjectOrNullResponse(response)));
169+
return commandManager.submitCustomCommand(
170+
args, response -> ClusterValue.of(handleObjectOrNullResponse(response)));
172171
}
173172

174173
@Override
175174
public CompletableFuture<ClusterValue<Object>> customCommand(@NonNull GlideString[] args) {
176175
// TODO if a command returns a map as a single value, ClusterValue misleads user
177-
return commandManager.submitNewCommand(
178-
CustomCommand,
179-
args,
180-
response -> ClusterValue.of(handleBinaryObjectOrNullResponse(response)));
176+
return commandManager.submitCustomCommand(
177+
args, response -> ClusterValue.of(handleBinaryObjectOrNullResponse(response)));
181178
}
182179

183180
@Override
184181
public CompletableFuture<ClusterValue<Object>> customCommand(
185182
@NonNull String[] args, @NonNull Route route) {
186-
return commandManager.submitNewCommand(
187-
CustomCommand, args, route, response -> handleCustomCommandResponse(route, response));
183+
return commandManager.submitCustomCommand(
184+
args, route, response -> handleCustomCommandResponse(route, response));
188185
}
189186

190187
@Override
191188
public CompletableFuture<ClusterValue<Object>> customCommand(
192189
@NonNull GlideString[] args, @NonNull Route route) {
193-
return commandManager.submitNewCommand(
194-
CustomCommand, args, route, response -> handleCustomCommandBinaryResponse(route, response));
190+
return commandManager.submitCustomCommand(
191+
args, route, response -> handleCustomCommandBinaryResponse(route, response));
195192
}
196193

197194
@SuppressWarnings("unchecked")
@@ -1350,7 +1347,7 @@ public CompletableFuture<ClusterValue<GlideString[]>> keys(
13501347
@Override
13511348
public CompletableFuture<Long> wait(long numreplicas, long timeout) {
13521349
String[] arguments = new String[] {Long.toString(numreplicas), Long.toString(timeout)};
1353-
return commandManager.submitNewCommand(
1350+
return commandManager.submitBlockingCommand(
13541351
Wait, arguments, SimpleSingleNodeRoute.RANDOM, this::handleLongResponse);
13551352
}
13561353

@@ -1359,7 +1356,7 @@ public CompletableFuture<Long[]> waitaof(
13591356
long numlocal, long numreplicas, long timeout, @NonNull Route route) {
13601357
String[] arguments =
13611358
new String[] {Long.toString(numlocal), Long.toString(numreplicas), Long.toString(timeout)};
1362-
return commandManager.submitNewCommand(
1359+
return commandManager.submitBlockingCommand(
13631360
WaitAof,
13641361
arguments,
13651362
route,

java/client/src/main/java/glide/api/models/commands/stream/StreamReadOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ public class StreamReadOptions {
3636
*/
3737
protected Long count;
3838

39+
/**
40+
* Returns true if this options object specifies a BLOCK timeout, making the command a blocking
41+
* command.
42+
*
43+
* <p>Note: We check {@code block != null} rather than {@code block != null && block != 0} because
44+
* {@code BLOCK 0} means "block indefinitely" in Valkey/Redis, which is still a blocking command
45+
* that should skip Java-side timeout enforcement.
46+
*
47+
* @return true if BLOCK option is set (including BLOCK 0 for indefinite blocking)
48+
*/
49+
public boolean isBlocking() {
50+
return this.block != null;
51+
}
52+
3953
/**
4054
* Converts options and the key-to-id input for {@link StreamBaseCommands#xread(Map,
4155
* StreamReadOptions)} into a String[].

java/client/src/main/java/glide/ffi/resolvers/NativeUtils.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class NativeUtils {
2929
public static final String NATIVE_FOLDER_PATH_PREFIX = "nativeutils";
3030

3131
/** Temporary directory which will contain the dynamic library files. */
32-
private static File temporaryDir;
32+
private static volatile File temporaryDir;
3333

3434
/** Track if the Glide library has already been loaded */
3535
private static volatile boolean glideLibLoaded = false;
@@ -95,12 +95,17 @@ public static void loadLibraryFromJar(String path) throws IOException {
9595
}
9696

9797
// Prepare temporary file
98-
if (temporaryDir == null) {
99-
temporaryDir = createTempDirectory(NATIVE_FOLDER_PATH_PREFIX);
100-
temporaryDir.deleteOnExit();
98+
File localTempDir;
99+
synchronized (NativeUtils.class) {
100+
if (temporaryDir == null) {
101+
File createdDir = createTempDirectory(NATIVE_FOLDER_PATH_PREFIX);
102+
createdDir.deleteOnExit();
103+
temporaryDir = createdDir;
104+
}
105+
localTempDir = temporaryDir;
101106
}
102107

103-
File temp = new File(temporaryDir, filename);
108+
File temp = new File(localTempDir, filename);
104109

105110
try (InputStream is = NativeUtils.class.getResourceAsStream(path)) {
106111
if (is == null) {

0 commit comments

Comments
 (0)