Skip to content

Commit 944e41b

Browse files
committed
Add Redis queue functionality with enqueue, dequeue, and consumer
Implement Redis-based queue handling, including enqueueing, dequeueing, and consumer registration. Add corresponding tests (e.g., `RedisQueueTest`) to validate worker-sender interactions and processing. Extend MockRedis to support queue operations for testing purposes.
1 parent ae2517c commit 944e41b

5 files changed

Lines changed: 359 additions & 5 deletions

File tree

src/main/java/io/github/intisy/utils/custom/external/Redis.java

Lines changed: 139 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ private void stopEmbeddedServer() {
243243
}
244244
}
245245

246+
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
246247
public boolean ping() {
247248
if (useMockFallback && mockRedis != null) {
248249
return mockRedis.isRunning();
@@ -539,14 +540,97 @@ public void onMessage(String channel, String message) {
539540
}, "Redis-Subscriber-" + channel).start();
540541
}
541542

543+
public boolean enqueue(String queueName, String message) {
544+
if (useMockFallback) {
545+
return mockRedis.enqueue(queueName, message);
546+
}
547+
548+
if (!isConnected()) {
549+
logger.error("Not connected to Redis server. Cannot enqueue message.");
550+
return false;
551+
}
552+
553+
try (Jedis jedis = jedisPool.getResource()) {
554+
jedis.rpush(queueName, message);
555+
return true;
556+
} catch (JedisConnectionException e) {
557+
logger.error("Jedis connection error during RPUSH operation", e);
558+
connected = false;
559+
return false;
560+
} catch (Exception e) {
561+
logger.error("Error enqueueing message to Redis", e);
562+
return false;
563+
}
564+
}
565+
566+
public String dequeue(String queueName) {
567+
if (useMockFallback) {
568+
return mockRedis.dequeue(queueName);
569+
}
570+
571+
if (!isConnected()) {
572+
logger.error("Not connected to Redis server. Cannot dequeue message.");
573+
return null;
574+
}
575+
576+
try (Jedis jedis = jedisPool.getResource()) {
577+
return jedis.lpop(queueName);
578+
} catch (JedisConnectionException e) {
579+
logger.error("Jedis connection error during LPOP operation", e);
580+
connected = false;
581+
return null;
582+
} catch (Exception e) {
583+
logger.error("Error dequeuing message from Redis", e);
584+
return null;
585+
}
586+
}
587+
588+
public void registerQueueConsumer(String queueName, QueueMessageConsumer consumer) {
589+
if (useMockFallback) {
590+
mockRedis.registerQueueConsumer(queueName, consumer);
591+
return;
592+
}
593+
594+
if (!isConnected()) {
595+
logger.error("Not connected to Redis server. Cannot register queue consumer.");
596+
return;
597+
}
598+
599+
new Thread(() -> {
600+
try {
601+
while (isConnected()) {
602+
try (Jedis jedis = jedisPool.getResource()) {
603+
List<String> result = jedis.blpop(0, queueName);
604+
if (result != null && result.size() >= 2) {
605+
String queue = result.get(0);
606+
String message = result.get(1);
607+
consumer.onMessageReceived(queue, message);
608+
}
609+
}
610+
}
611+
} catch (JedisConnectionException e) {
612+
logger.error("Jedis connection error during BLPOP operation", e);
613+
connected = false;
614+
} catch (Exception e) {
615+
logger.error("Error consuming from Redis queue", e);
616+
}
617+
}, "Redis-Queue-Consumer-" + queueName).start();
618+
}
619+
542620
public interface MessageListener {
543621
void onMessage(String channel, String message);
544622
}
545623

624+
public interface QueueMessageConsumer {
625+
void onMessageReceived(String queueName, String message);
626+
}
627+
546628
public static class MockRedis extends Redis {
547629
private final Map<String, String> dataStore = new HashMap<>();
548630
private boolean running = false;
549631
private final Map<String, List<MessageListener>> subscribers = new HashMap<>();
632+
private final Map<String, List<String>> messageQueues = new HashMap<>();
633+
private final Map<String, List<QueueMessageConsumer>> queueConsumers = new HashMap<>();
550634

551635
public MockRedis() {
552636
super("mock", 0);
@@ -660,6 +744,59 @@ public void subscribe(String channel, MessageListener messageListener) {
660744
subscribers.computeIfAbsent(channel, k -> new CopyOnWriteArrayList<>())
661745
.add(messageListener);
662746
}
747+
748+
public boolean enqueue(String queueName, String message) {
749+
if (!isRunning()) {
750+
getLogger().error("Mock Redis server is not running. Cannot enqueue message.");
751+
return false;
752+
}
753+
754+
messageQueues.computeIfAbsent(queueName, k -> new CopyOnWriteArrayList<>())
755+
.add(message);
756+
757+
List<QueueMessageConsumer> consumers = queueConsumers.get(queueName);
758+
if (consumers != null && !consumers.isEmpty()) {
759+
QueueMessageConsumer consumer = consumers.get(0);
760+
761+
List<String> queue = messageQueues.get(queueName);
762+
if (!queue.isEmpty()) {
763+
String msg = queue.remove(0);
764+
new Thread(() -> consumer.onMessageReceived(queueName, msg)).start();
765+
}
766+
}
767+
768+
return true;
769+
}
770+
771+
public String dequeue(String queueName) {
772+
if (!isRunning()) {
773+
getLogger().error("Mock Redis server is not running. Cannot dequeue message.");
774+
return null;
775+
}
776+
777+
List<String> queue = messageQueues.get(queueName);
778+
if (queue == null || queue.isEmpty()) {
779+
return null;
780+
}
781+
782+
return queue.remove(0);
783+
}
784+
785+
public void registerQueueConsumer(String queueName, QueueMessageConsumer consumer) {
786+
if (!isRunning()) {
787+
getLogger().error("Mock Redis server is not running. Cannot register queue consumer.");
788+
return;
789+
}
790+
791+
queueConsumers.computeIfAbsent(queueName, k -> new CopyOnWriteArrayList<>())
792+
.add(consumer);
793+
794+
List<String> queue = messageQueues.get(queueName);
795+
if (queue != null && !queue.isEmpty()) {
796+
String message = queue.remove(0);
797+
new Thread(() -> consumer.onMessageReceived(queueName, message)).start();
798+
}
799+
}
663800
}
664801

665802
public Map<String, String> getDebugInfo() {
@@ -669,6 +806,8 @@ public Map<String, String> getDebugInfo() {
669806
mockInfo.put("status", mockRedis.isRunning() ? "running" : "stopped");
670807
mockInfo.put("dataStoreSize", String.valueOf(mockRedis.dataStore.size()));
671808
mockInfo.put("subscribersCount", String.valueOf(mockRedis.subscribers.size()));
809+
mockInfo.put("queuesCount", String.valueOf(mockRedis.messageQueues.size()));
810+
mockInfo.put("queueConsumersCount", String.valueOf(mockRedis.queueConsumers.size()));
672811
return mockInfo;
673812
}
674813

@@ -746,7 +885,6 @@ public Map<String, Long> getKeyspaceStats() {
746885
Map<String, Long> stats = new HashMap<>();
747886
stats.put("total_keys", jedis.dbSize());
748887

749-
// Get key patterns statistics
750888
Set<String> keys = jedis.keys("*");
751889
stats.put("string_keys", keys.stream()
752890
.filter(k -> jedis.type(k).equals("string"))

src/test/java/io/github/intisy/utils/custom/external/RedisMultiClientTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.github.intisy.utils.custom.external;
22

33
import io.github.intisy.simple.logger.SimpleLogger; // Assuming you have a logger implementation
4-
import io.github.intisy.utils.custom.external.Redis;
54

65
import java.io.IOException;
76

@@ -12,7 +11,7 @@ public static void main(String[] args) {
1211
Redis serverInstance = null;
1312
Redis client1 = null;
1413
Redis client2 = null;
15-
int actualPort = -1;
14+
int actualPort;
1615

1716
try {
1817
// 1. Start an embedded Redis server instance

src/test/java/io/github/intisy/utils/custom/external/RedisPubSubTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.github.intisy.utils.custom.external;
22

33
import io.github.intisy.simple.logger.SimpleLogger; // Assuming you have a logger implementation
4-
import io.github.intisy.utils.custom.external.Redis;
54
import io.github.intisy.utils.utils.ThreadUtils; // Using your ThreadUtils for convenience
65

76
import java.io.IOException;

0 commit comments

Comments
 (0)