Skip to content

Commit b61e631

Browse files
committed
fix: enable JSON operations to participate in Redis transactions
JSON operations now automatically detect and participate in Redis transactions when executed within a SessionCallback/transaction context. This ensures proper atomicity when combining JSON operations with other Redis commands in transactions. - Added transaction detection using RedisConnection.isQueueing() - JSON.SET commands are properly queued using connection.execute() - Maintains backward compatibility with existing non-transactional usage - Added comprehensive test coverage for transaction scenarios Fixes #377
1 parent 0e40303 commit b61e631

File tree

4 files changed

+303
-2
lines changed

4 files changed

+303
-2
lines changed

docs/content/modules/ROOT/pages/overview.adoc

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,7 @@ These features are designed for power users, customization, and advanced integra
477477
* JSON operations via `JSONOperations` for low-level JSON document manipulation
478478
* Search operations via `SearchOperations` for direct RediSearch functionality
479479
* Probabilistic data structure operations: `BloomOperations`, `CuckooFilterOperations`, `CountMinSketchOperations`, `TopKOperations`, `TDigestOperations`
480+
* **Transaction Support**: JSON operations automatically participate in Redis transactions when executed within a transaction context
480481

481482
[source,java]
482483
----
@@ -488,6 +489,30 @@ JSONOperations<String> jsonOps = modulesOperations.opsForJSON();
488489
jsonOps.set("obj", myObject);
489490
MyObject retrieved = jsonOps.get("obj", MyObject.class);
490491
492+
// JSON operations with transactions (automatically participates)
493+
@Autowired
494+
private StringRedisTemplate stringRedisTemplate;
495+
496+
List<Object> results = stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
497+
@Override
498+
public List<Object> execute(RedisOperations operations) throws DataAccessException {
499+
operations.watch("obj"); // Watch for concurrent modifications
500+
501+
// Read current value
502+
MyObject current = jsonOps.get("obj", MyObject.class);
503+
504+
// Start transaction
505+
operations.multi();
506+
507+
// JSON operations are automatically queued in the transaction
508+
jsonOps.set("obj", updatedObject);
509+
jsonOps.set("backup", current);
510+
511+
// Execute transaction (returns empty list if watched key was modified)
512+
return operations.exec();
513+
}
514+
});
515+
491516
// Direct search operations
492517
SearchOperations<String> searchOps = modulesOperations.opsForSearch("myIndex");
493518
SearchResult result = searchOps.search(new Query("@name:redis"));

redis-om-spring/src/main/java/com/redis/om/spring/ops/RedisModulesOperations.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,15 @@ public record RedisModulesOperations<K>(RedisModulesClient client, StringRedisTe
4343
* Creates and returns operations for interacting with RedisJSON module.
4444
* <p>
4545
* RedisJSON operations allow storing, retrieving, and manipulating JSON documents
46-
* directly in Redis with path-based access to JSON elements.
46+
* directly in Redis with path-based access to JSON elements. The operations
47+
* automatically participate in Redis transactions when executed within a transaction context.
4748
* </p>
4849
*
4950
* @return a {@link JSONOperations} instance for JSON document operations
5051
*/
5152
public JSONOperations<K> opsForJSON() {
52-
return new JSONOperationsImpl<>(client, gsonBuilder);
53+
// Pass the template to enable transaction support
54+
return new JSONOperationsImpl<>(client, gsonBuilder, template);
5355
}
5456

5557
/**

redis-om-spring/src/main/java/com/redis/om/spring/ops/json/JSONOperationsImpl.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
import java.util.Objects;
66

77
import org.json.JSONArray;
8+
import org.springframework.data.redis.connection.RedisConnection;
9+
import org.springframework.data.redis.connection.RedisConnectionFactory;
10+
import org.springframework.data.redis.core.RedisConnectionUtils;
11+
import org.springframework.data.redis.core.StringRedisTemplate;
812
import org.springframework.lang.Nullable;
913

1014
import com.google.gson.Gson;
@@ -26,17 +30,33 @@ public class JSONOperationsImpl<K> implements JSONOperations<K> {
2630

2731
private final GsonBuilder builder;
2832
private final RedisModulesClient client;
33+
private final StringRedisTemplate template;
34+
private final RedisConnectionFactory connectionFactory;
2935
private Gson gson;
3036

3137
/**
3238
* Constructs a new JSONOperationsImpl with the specified client and JSON builder.
39+
* This constructor is for backward compatibility when no template is provided.
3340
*
3441
* @param client the Redis modules client for JSON operations
3542
* @param builder the Gson builder for JSON serialization/deserialization
3643
*/
3744
public JSONOperationsImpl(RedisModulesClient client, GsonBuilder builder) {
45+
this(client, builder, null);
46+
}
47+
48+
/**
49+
* Constructs a new JSONOperationsImpl with full transaction support.
50+
*
51+
* @param client the Redis modules client for JSON operations
52+
* @param builder the Gson builder for JSON serialization/deserialization
53+
* @param template the Spring Redis template for transaction management (optional)
54+
*/
55+
public JSONOperationsImpl(RedisModulesClient client, GsonBuilder builder, StringRedisTemplate template) {
3856
this.client = client;
3957
this.builder = builder;
58+
this.template = template;
59+
this.connectionFactory = template != null ? template.getConnectionFactory() : null;
4060
}
4161

4262
/**
@@ -163,24 +183,58 @@ public final <T> List<T> mget(Path2 path, Class<T> clazz, K... keys) {
163183

164184
/**
165185
* Sets a JSON document for the given key.
186+
* Automatically participates in Redis transactions when executed within a transaction context.
166187
*
167188
* @param key the key to store the JSON document under
168189
* @param object the object to serialize and store as JSON
169190
*/
170191
@Override
171192
public void set(K key, Object object) {
193+
// Check for transaction context if template is available
194+
if (connectionFactory != null) {
195+
RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory);
196+
try {
197+
if (connection.isQueueing()) {
198+
// We're in a transaction - use execute to properly queue the command
199+
connection.execute("JSON.SET", key.toString().getBytes(), ".".getBytes(), getGson().toJson(object)
200+
.getBytes());
201+
return;
202+
}
203+
} finally {
204+
RedisConnectionUtils.releaseConnection(connection, connectionFactory);
205+
}
206+
}
207+
208+
// Not in a transaction or no template available - execute normally
172209
client.clientForJSON().jsonSet(key.toString(), Path2.ROOT_PATH, getGson().toJson(object));
173210
}
174211

175212
/**
176213
* Sets JSON data at the specified path for the given key.
214+
* Automatically participates in Redis transactions when executed within a transaction context.
177215
*
178216
* @param key the key identifying the JSON document
179217
* @param object the object to serialize and store as JSON
180218
* @param path the JSON path to set the data at
181219
*/
182220
@Override
183221
public void set(K key, Object object, Path2 path) {
222+
// Check for transaction context if template is available
223+
if (connectionFactory != null) {
224+
RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory);
225+
try {
226+
if (connection.isQueueing()) {
227+
// We're in a transaction - use execute to properly queue the command
228+
connection.execute("JSON.SET", key.toString().getBytes(), path.toString().getBytes(), getGson().toJson(object)
229+
.getBytes());
230+
return;
231+
}
232+
} finally {
233+
RedisConnectionUtils.releaseConnection(connection, connectionFactory);
234+
}
235+
}
236+
237+
// Not in a transaction or no template available - execute normally
184238
client.clientForJSON().jsonSet(key.toString(), path, getGson().toJson(object));
185239
}
186240

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package com.redis.om.spring.ops.json;
2+
3+
import com.redis.om.spring.AbstractBaseDocumentTest;
4+
import com.redis.om.spring.ops.RedisModulesOperations;
5+
import lombok.AllArgsConstructor;
6+
import lombok.Data;
7+
import lombok.NoArgsConstructor;
8+
import org.junit.jupiter.api.BeforeEach;
9+
import org.junit.jupiter.api.Test;
10+
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.beans.factory.annotation.Qualifier;
12+
import org.springframework.dao.DataAccessException;
13+
import org.springframework.data.redis.core.RedisOperations;
14+
import org.springframework.data.redis.core.RedisTemplate;
15+
import org.springframework.data.redis.core.SessionCallback;
16+
import org.springframework.data.redis.core.StringRedisTemplate;
17+
18+
import java.util.List;
19+
20+
import static org.assertj.core.api.Assertions.assertThat;
21+
22+
/**
23+
* Test to reproduce issue #377: JSON operations not participating in Redis transactions
24+
*
25+
* The issue is that when using JSON.SET within a Redis transaction (WATCH/MULTI/EXEC),
26+
* the JSON operation is executed outside the transaction on a different connection,
27+
* defeating the purpose of using transactions for atomicity.
28+
*/
29+
public class RedisModulesOperationsTransactionTest extends AbstractBaseDocumentTest {
30+
31+
@Autowired
32+
private RedisModulesOperations<String> redisModulesOperations;
33+
34+
@Autowired
35+
private StringRedisTemplate stringRedisTemplate;
36+
37+
@Autowired
38+
private RedisTemplate<String, String> template;
39+
40+
private static final String TEST_KEY = "issue377:testkey";
41+
private static final String COUNTER_KEY = "issue377:counter";
42+
43+
@BeforeEach
44+
void setup() {
45+
// Clean up test keys
46+
template.delete(TEST_KEY);
47+
template.delete(COUNTER_KEY);
48+
}
49+
50+
@Test
51+
void testTransactionWithWatchProtectsAgainstConcurrentModification() {
52+
// This test verifies that WATCH properly protects against concurrent modifications
53+
// when JSON operations are correctly participating in transactions
54+
55+
// Set initial value
56+
TestObject initialObject = new TestObject("initial", 1);
57+
redisModulesOperations.opsForJSON().set(TEST_KEY, initialObject);
58+
59+
// First transaction: Read value, then modify it
60+
List<Object> result1 = stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
61+
@Override
62+
public List<Object> execute(RedisOperations operations) throws DataAccessException {
63+
// Watch the key
64+
operations.watch(TEST_KEY);
65+
66+
// Read current value (simulating decision based on current state)
67+
TestObject current = redisModulesOperations.opsForJSON().get(TEST_KEY, TestObject.class);
68+
assertThat(current.getName()).isEqualTo("initial");
69+
70+
// Simulate another client modifying the key between WATCH and EXEC
71+
// This would happen in a real concurrent scenario
72+
redisModulesOperations.opsForJSON().set(TEST_KEY, new TestObject("modified-by-other", 99));
73+
74+
// Now try to execute our transaction
75+
operations.multi();
76+
redisModulesOperations.opsForJSON().set(TEST_KEY, new TestObject("our-change", 2));
77+
return operations.exec();
78+
}
79+
});
80+
81+
// Transaction should fail because the watched key was modified
82+
// Failed transactions return an empty list
83+
assertThat(result1).isEmpty();
84+
85+
// The value should be from the concurrent modification, not our transaction
86+
TestObject finalObject = redisModulesOperations.opsForJSON().get(TEST_KEY, TestObject.class);
87+
assertThat(finalObject.getName()).isEqualTo("modified-by-other");
88+
assertThat(finalObject.getVersion()).isEqualTo(99);
89+
}
90+
91+
@Test
92+
void testTransactionAtomicityForMultipleOperations() {
93+
// This test verifies that JSON and regular Redis operations
94+
// are atomic when executed in a transaction
95+
96+
// Set initial values
97+
TestObject initialObject = new TestObject("initial", 1);
98+
redisModulesOperations.opsForJSON().set(TEST_KEY, initialObject);
99+
stringRedisTemplate.opsForValue().set(COUNTER_KEY, "0");
100+
101+
// Execute a transaction with multiple operations
102+
List<Object> result = stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
103+
@Override
104+
public List<Object> execute(RedisOperations operations) throws DataAccessException {
105+
operations.multi();
106+
107+
// Queue multiple operations
108+
redisModulesOperations.opsForJSON().set(TEST_KEY, new TestObject("updated", 2));
109+
operations.opsForValue().increment(COUNTER_KEY);
110+
operations.opsForValue().increment(COUNTER_KEY);
111+
operations.opsForValue().increment(COUNTER_KEY);
112+
113+
return operations.exec();
114+
}
115+
});
116+
117+
// All operations should have succeeded atomically
118+
assertThat(result).isNotNull();
119+
assertThat(result).hasSize(4); // 1 JSON.SET + 3 INCREMENTs
120+
121+
// Verify final state - all operations executed
122+
String counterValue = stringRedisTemplate.opsForValue().get(COUNTER_KEY);
123+
assertThat(counterValue).isEqualTo("3");
124+
125+
TestObject currentObject = redisModulesOperations.opsForJSON().get(TEST_KEY, TestObject.class);
126+
assertThat(currentObject.getName()).isEqualTo("updated");
127+
assertThat(currentObject.getVersion()).isEqualTo(2);
128+
}
129+
130+
@Test
131+
void testTransactionRollbackOnWatchedKeyChange() {
132+
// This test verifies proper rollback behavior when a watched key changes
133+
134+
// Set initial values
135+
redisModulesOperations.opsForJSON().set(TEST_KEY, new TestObject("initial", 1));
136+
stringRedisTemplate.opsForValue().set(COUNTER_KEY, "10");
137+
138+
// Execute transaction that will be aborted due to watched key change
139+
List<Object> result = stringRedisTemplate.execute(new SessionCallback<List<Object>>() {
140+
@Override
141+
public List<Object> execute(RedisOperations operations) throws DataAccessException {
142+
// Watch the counter key
143+
operations.watch(COUNTER_KEY);
144+
145+
// Read initial value
146+
String initialCounter = (String) operations.opsForValue().get(COUNTER_KEY);
147+
assertThat(initialCounter).isEqualTo("10");
148+
149+
// Modify watched key outside transaction (simulating concurrent modification)
150+
stringRedisTemplate.opsForValue().set(COUNTER_KEY, "99");
151+
152+
// Try to execute transaction - should fail
153+
operations.multi();
154+
redisModulesOperations.opsForJSON().set(TEST_KEY, new TestObject("should-not-be-set", 999));
155+
operations.opsForValue().set(COUNTER_KEY, "100");
156+
return operations.exec();
157+
}
158+
});
159+
160+
// Transaction should have been aborted
161+
// Aborted transactions return an empty list
162+
assertThat(result).isEmpty();
163+
164+
// Neither operation should have been applied
165+
TestObject obj = redisModulesOperations.opsForJSON().get(TEST_KEY, TestObject.class);
166+
assertThat(obj.getName()).isEqualTo("initial"); // JSON unchanged
167+
assertThat(obj.getVersion()).isEqualTo(1);
168+
169+
String counter = stringRedisTemplate.opsForValue().get(COUNTER_KEY);
170+
assertThat(counter).isEqualTo("99"); // Counter has concurrent modification value
171+
}
172+
173+
@Test
174+
void testJsonOperationExecutesOutsideTransaction_ShowsIssue() {
175+
// This test clearly demonstrates the issue: JSON operations execute immediately,
176+
// not within the transaction boundary
177+
178+
TestObject initialObject = new TestObject("initial", 1);
179+
redisModulesOperations.opsForJSON().set(TEST_KEY, initialObject);
180+
181+
// Create a flag to track if JSON operation executed before transaction completes
182+
final boolean[] jsonExecutedBeforeExec = {false};
183+
184+
List<Object> result = template.execute(new SessionCallback<List<Object>>() {
185+
@Override
186+
public List<Object> execute(RedisOperations operations) throws DataAccessException {
187+
operations.multi();
188+
189+
// This JSON.SET should be queued but executes immediately (bug)
190+
redisModulesOperations.opsForJSON().set(TEST_KEY, new TestObject("insideTransaction", 2));
191+
192+
// Read the value - if transaction is working, should still be "initial"
193+
// But due to bug, it will already be "insideTransaction"
194+
TestObject valueBeforeExec = redisModulesOperations.opsForJSON().get(TEST_KEY, TestObject.class);
195+
196+
if ("insideTransaction".equals(valueBeforeExec.getName())) {
197+
jsonExecutedBeforeExec[0] = true;
198+
}
199+
200+
// Regular Redis operations are properly queued
201+
operations.opsForValue().set(COUNTER_KEY, "1");
202+
203+
return operations.exec();
204+
}
205+
});
206+
207+
// This assertion now passes with our fix - JSON operations are properly queued in transactions
208+
assertThat(jsonExecutedBeforeExec[0])
209+
.as("JSON.SET should be queued in transaction, not executed immediately")
210+
.isFalse();
211+
}
212+
213+
@Data
214+
@NoArgsConstructor
215+
@AllArgsConstructor
216+
static class TestObject {
217+
private String name;
218+
private int version;
219+
}
220+
}

0 commit comments

Comments
 (0)