Skip to content
This repository was archived by the owner on Oct 29, 2025. It is now read-only.

Commit 100f790

Browse files
committed
refactor: Created ExportStepHelper for reuse by RIOT-X
1 parent e1097b1 commit 100f790

File tree

9 files changed

+121
-95
lines changed

9 files changed

+121
-95
lines changed

core/riot-core/src/main/java/com/redis/riot/core/AbstractJobCommand.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@
5050
import org.springframework.util.CollectionUtils;
5151

5252
import com.redis.spring.batch.JobUtils;
53-
import com.redis.spring.batch.item.AbstractAsyncItemReader;
54-
import com.redis.spring.batch.item.AbstractPollableItemReader;
53+
import com.redis.spring.batch.item.AbstractAsyncItemStreamSupport;
54+
import com.redis.spring.batch.item.PollableItemReader;
5555
import com.redis.spring.batch.step.FlushingStepBuilder;
5656

5757
import picocli.CommandLine.ArgGroup;
@@ -121,7 +121,7 @@ private JobLauncher jobLauncher() throws Exception {
121121
return launcher;
122122
}
123123

124-
protected void configureAsyncReader(AbstractAsyncItemReader<?, ?> reader) {
124+
protected void configureAsyncStreamSupport(AbstractAsyncItemStreamSupport<?, ?> reader) {
125125
reader.setJobRepository(jobRepository);
126126
}
127127

@@ -327,7 +327,7 @@ private TaskExecutor taskExecutor() {
327327
}
328328

329329
private <I, O> ItemReader<? extends I> reader(Step<I, O> step) {
330-
if (stepArgs.getThreads() == 1 || step.getReader() instanceof AbstractPollableItemReader) {
330+
if (stepArgs.getThreads() == 1 || step.getReader() instanceof PollableItemReader) {
331331
return step.getReader();
332332
}
333333
log.info("Synchronizing reader in step {}", step.getName());

core/riot-core/src/main/java/com/redis/riot/core/LoggingMixin.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414
import picocli.CommandLine.Spec;
1515

1616
public class LoggingMixin {
17+
18+
public static final boolean DEFAULT_SHOW_DATE_TIME = true;
19+
public static final boolean DEFAULT_SHOW_THREAD_NAME = true;
20+
public static final boolean DEFAULT_SHOW_LOG_NAME = true;
21+
public static final Level DEFAULT_LEVEL = Level.WARN;
22+
1723
/**
1824
* This mixin is able to climb the command hierarchy because the
1925
* {@code @Spec(Target.MIXEE)}-annotated field gets a reference to the command
@@ -22,15 +28,15 @@ public class LoggingMixin {
2228
private @Spec(MIXEE) CommandSpec mixee; // spec of the command where the @Mixin is used
2329

2430
private String file;
25-
private boolean showDateTime;
31+
private boolean showDateTime = DEFAULT_SHOW_DATE_TIME;
2632
private String dateTimeFormat;
2733
private boolean showThreadId;
28-
private boolean hideThreadName;
29-
private boolean hideLogName;
34+
private boolean showThreadName = DEFAULT_SHOW_THREAD_NAME;
35+
private boolean showLogName = DEFAULT_SHOW_LOG_NAME;
3036
private boolean showShortLogName;
3137
private boolean levelInBrackets;
3238
private Map<String, Level> levels = new LinkedHashMap<>();
33-
private Level level = Level.WARN;
39+
private Level level = DEFAULT_LEVEL;
3440

3541
private static LoggingMixin getTopLevelCommandLoggingMixin(CommandSpec commandSpec) {
3642
return ((MainCommand) commandSpec.root().userObject()).loggingMixin;
@@ -41,7 +47,7 @@ public void setFile(String file) {
4147
getTopLevelCommandLoggingMixin(mixee).file = file;
4248
}
4349

44-
@Option(names = "--log-time", description = "Include current date and time in log messages.")
50+
@Option(names = "--log-time", description = "Include current date and time in log messages. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true")
4551
public void setShowDateTime(boolean show) {
4652
getTopLevelCommandLoggingMixin(mixee).showDateTime = show;
4753
}
@@ -56,18 +62,18 @@ public void setShowThreadId(boolean show) {
5662
getTopLevelCommandLoggingMixin(mixee).showThreadId = show;
5763
}
5864

59-
@Option(names = "--no-log-thread", description = "Hide current thread name in log messages.", hidden = true)
60-
public void setHideThreadName(boolean hide) {
61-
getTopLevelCommandLoggingMixin(mixee).hideThreadName = hide;
65+
@Option(names = "--log-thread", description = "Show current thread name in log messages. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true", hidden = true)
66+
public void setShowThreadName(boolean show) {
67+
getTopLevelCommandLoggingMixin(mixee).showThreadName = show;
6268
}
6369

64-
@Option(names = "--no-log-name", description = "Hide logger instance name in log messages.", hidden = true)
65-
public void setHideLogName(boolean hide) {
66-
getTopLevelCommandLoggingMixin(mixee).hideLogName = hide;
70+
@Option(names = "--log-name", description = "Show logger instance name in log messages. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true", hidden = true)
71+
public void setShowLogName(boolean show) {
72+
getTopLevelCommandLoggingMixin(mixee).showLogName = show;
6773
}
6874

69-
@Option(names = "--log-short", description = "Include last component of logger instance name in log messages.", hidden = true)
70-
public void setShowLogName(boolean show) {
75+
@Option(names = "--log-short", description = "Include last component of logger instance name in log messages. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true", hidden = true)
76+
public void setShowShortLogName(boolean show) {
7177
getTopLevelCommandLoggingMixin(mixee).showShortLogName = show;
7278
}
7379

@@ -123,8 +129,8 @@ public void configureLoggers() {
123129
System.setProperty(SimpleLogger.DATE_TIME_FORMAT_KEY, mixin.dateTimeFormat);
124130
}
125131
setBoolean(SimpleLogger.SHOW_THREAD_ID_KEY, mixin.showThreadId);
126-
setBoolean(SimpleLogger.SHOW_THREAD_NAME_KEY, !mixin.hideThreadName);
127-
setBoolean(SimpleLogger.SHOW_LOG_NAME_KEY, !mixin.hideLogName);
132+
setBoolean(SimpleLogger.SHOW_THREAD_NAME_KEY, mixin.showThreadName);
133+
setBoolean(SimpleLogger.SHOW_LOG_NAME_KEY, mixin.showLogName);
128134
setBoolean(SimpleLogger.SHOW_SHORT_LOG_NAME_KEY, mixin.showShortLogName);
129135
setBoolean(SimpleLogger.LEVEL_IN_BRACKETS_KEY, mixin.levelInBrackets);
130136
setLogLevel("com.amazonaws.internal", Level.ERROR);

plugins/riot/src/main/java/com/redis/riot/AbstractExportCommand.java

Lines changed: 2 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,8 @@
11
package com.redis.riot;
22

3-
import java.util.Map;
4-
import java.util.Set;
5-
import java.util.stream.Collectors;
6-
7-
import org.slf4j.Logger;
83
import org.springframework.batch.item.ItemWriter;
94
import org.springframework.expression.spel.support.StandardEvaluationContext;
10-
import org.springframework.util.Assert;
115

12-
import com.redis.lettucemod.RedisModulesUtils;
13-
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
146
import com.redis.riot.core.AbstractJobCommand;
157
import com.redis.riot.core.Step;
168
import com.redis.spring.batch.item.redis.RedisItemReader;
@@ -19,16 +11,12 @@
1911
import com.redis.spring.batch.item.redis.common.KeyValue;
2012
import com.redis.spring.batch.item.redis.reader.KeyValueRead;
2113

22-
import io.lettuce.core.AbstractRedisClient;
23-
import io.lettuce.core.RedisException;
2414
import picocli.CommandLine.ArgGroup;
2515
import picocli.CommandLine.Option;
2616

2717
public abstract class AbstractExportCommand extends AbstractJobCommand {
2818

2919
public static final ReaderMode DEFAULT_MODE = RedisItemReader.DEFAULT_MODE;
30-
public static final String NOTIFY_CONFIG = "notify-keyspace-events";
31-
public static final String NOTIFY_CONFIG_VALUE = "KEA";
3220

3321
private static final String TASK_NAME = "Exporting";
3422
private static final String VAR_SOURCE = "source";
@@ -67,7 +55,7 @@ protected void configure(StandardEvaluationContext context) {
6755
}
6856

6957
protected void configureSourceRedisReader(RedisItemReader<?, ?> reader) {
70-
configureAsyncReader(reader);
58+
configureAsyncStreamSupport(reader);
7159
sourceRedisContext.configure(reader);
7260
log.info("Configuring {} in {} mode", reader.getName(), mode);
7361
reader.setMode(mode);
@@ -93,49 +81,11 @@ protected void configureSourceRedisWriter(RedisItemWriter<?, ?, ?> writer) {
9381
protected <O> Step<KeyValue<String>, O> step(ItemWriter<O> writer) {
9482
RedisItemReader<String, String> reader = RedisItemReader.struct();
9583
configureSourceRedisReader(reader);
96-
Step<KeyValue<String>, O> step = step(reader, writer);
84+
Step<KeyValue<String>, O> step = new ExportStepHelper(log).step(reader, writer);
9785
step.taskName(TASK_NAME);
9886
return step;
9987
}
10088

101-
protected <K, V, T, O> Step<KeyValue<K>, O> step(RedisItemReader<K, V> reader, ItemWriter<O> writer) {
102-
Step<KeyValue<K>, O> step = new Step<>(reader, writer);
103-
if (reader.getMode() == ReaderMode.SCAN) {
104-
log.info("Configuring step with scan size estimator");
105-
step.maxItemCountSupplier(reader.scanSizeEstimator());
106-
} else {
107-
checkNotifyConfig(reader.getClient(), log);
108-
log.info("Configuring export step with live true, flushInterval {}, idleTimeout {}",
109-
reader.getFlushInterval(), reader.getIdleTimeout());
110-
step.live(true);
111-
step.flushInterval(reader.getFlushInterval());
112-
step.idleTimeout(reader.getIdleTimeout());
113-
}
114-
return step;
115-
}
116-
117-
public static void checkNotifyConfig(AbstractRedisClient client, Logger log) {
118-
Map<String, String> valueMap;
119-
try (StatefulRedisModulesConnection<String, String> conn = RedisModulesUtils.connection(client)) {
120-
try {
121-
valueMap = conn.sync().configGet(NOTIFY_CONFIG);
122-
} catch (RedisException e) {
123-
log.info("Could not check keyspace notification config", e);
124-
return;
125-
}
126-
}
127-
String actual = valueMap.getOrDefault(NOTIFY_CONFIG, "");
128-
log.info("Retrieved config {}: {}", NOTIFY_CONFIG, actual);
129-
Set<Character> expected = characterSet(NOTIFY_CONFIG_VALUE);
130-
Assert.isTrue(characterSet(actual).containsAll(expected),
131-
String.format("Keyspace notifications not property configured. Expected %s '%s' but was '%s'.",
132-
NOTIFY_CONFIG, NOTIFY_CONFIG_VALUE, actual));
133-
}
134-
135-
private static Set<Character> characterSet(String string) {
136-
return string.codePoints().mapToObj(c -> (char) c).collect(Collectors.toSet());
137-
}
138-
13989
public ReaderMode getMode() {
14090
return mode;
14191
}

plugins/riot/src/main/java/com/redis/riot/AbstractRedisCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ protected RedisModulesCommands<String, String> commands() {
3434
}
3535

3636
protected void configure(RedisItemReader<?, ?> reader) {
37-
configureAsyncReader(reader);
37+
configureAsyncStreamSupport(reader);
3838
redisContext.configure(reader);
3939
}
4040

plugins/riot/src/main/java/com/redis/riot/AbstractRedisTargetExportCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ protected void configure(StandardEvaluationContext context) {
6161
}
6262

6363
protected void configureTargetRedisReader(RedisItemReader<?, ?> reader) {
64-
configureAsyncReader(reader);
64+
configureAsyncStreamSupport(reader);
6565
targetRedisContext.configure(reader);
6666
}
6767

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.redis.riot;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
import java.util.stream.Collectors;
6+
7+
import org.slf4j.Logger;
8+
import org.springframework.batch.item.ItemWriter;
9+
import org.springframework.util.Assert;
10+
11+
import com.redis.lettucemod.RedisModulesUtils;
12+
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
13+
import com.redis.riot.core.Step;
14+
import com.redis.spring.batch.item.redis.RedisItemReader;
15+
import com.redis.spring.batch.item.redis.RedisItemReader.ReaderMode;
16+
import com.redis.spring.batch.item.redis.common.KeyValue;
17+
18+
import io.lettuce.core.AbstractRedisClient;
19+
import io.lettuce.core.RedisException;
20+
21+
public class ExportStepHelper {
22+
23+
public static final String NOTIFY_CONFIG = "notify-keyspace-events";
24+
public static final String NOTIFY_CONFIG_VALUE = "KEA";
25+
26+
private final Logger log;
27+
28+
public ExportStepHelper(Logger log) {
29+
this.log = log;
30+
}
31+
32+
public <K, V, T, O> Step<KeyValue<K>, O> step(RedisItemReader<K, V> reader, ItemWriter<O> writer) {
33+
Step<KeyValue<K>, O> step = new Step<>(reader, writer);
34+
if (reader.getMode() == ReaderMode.SCAN) {
35+
log.info("Configuring step with scan size estimator");
36+
step.maxItemCountSupplier(reader.scanSizeEstimator());
37+
} else {
38+
checkNotifyConfig(reader.getClient(), log);
39+
log.info("Configuring export step with live true, flushInterval {}, idleTimeout {}",
40+
reader.getFlushInterval(), reader.getIdleTimeout());
41+
step.live(true);
42+
step.flushInterval(reader.getFlushInterval());
43+
step.idleTimeout(reader.getIdleTimeout());
44+
}
45+
return step;
46+
}
47+
48+
public static void checkNotifyConfig(AbstractRedisClient client, Logger log) {
49+
Map<String, String> valueMap;
50+
try (StatefulRedisModulesConnection<String, String> conn = RedisModulesUtils.connection(client)) {
51+
try {
52+
valueMap = conn.sync().configGet(NOTIFY_CONFIG);
53+
} catch (RedisException e) {
54+
log.info("Could not check keyspace notification config", e);
55+
return;
56+
}
57+
}
58+
String actual = valueMap.getOrDefault(NOTIFY_CONFIG, "");
59+
log.info("Retrieved config {}: {}", NOTIFY_CONFIG, actual);
60+
Set<Character> expected = characterSet(NOTIFY_CONFIG_VALUE);
61+
Assert.isTrue(characterSet(actual).containsAll(expected),
62+
String.format("Keyspace notifications not property configured. Expected %s '%s' but was '%s'.",
63+
NOTIFY_CONFIG, NOTIFY_CONFIG_VALUE, actual));
64+
}
65+
66+
private static Set<Character> characterSet(String string) {
67+
return string.codePoints().mapToObj(c -> (char) c).collect(Collectors.toSet());
68+
}
69+
70+
}

plugins/riot/src/main/java/com/redis/riot/RedisReaderArgs.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88
import com.redis.riot.core.RiotDuration;
99
import com.redis.riot.core.processor.FunctionPredicate;
1010
import com.redis.riot.core.processor.PredicateOperator;
11-
import com.redis.spring.batch.item.AbstractAsyncItemReader;
11+
import com.redis.spring.batch.item.AbstractAsyncItemStreamSupport;
1212
import com.redis.spring.batch.item.AbstractPollableItemReader;
1313
import com.redis.spring.batch.item.redis.RedisItemReader;
14-
import com.redis.spring.batch.item.redis.common.KeyValue;
14+
import com.redis.spring.batch.item.redis.reader.KeyEvent;
1515

1616
import io.lettuce.core.codec.RedisCodec;
1717
import lombok.ToString;
@@ -23,8 +23,8 @@ public class RedisReaderArgs {
2323

2424
public static final long DEFAULT_SCAN_COUNT = 1000;
2525
public static final int DEFAULT_QUEUE_CAPACITY = RedisItemReader.DEFAULT_QUEUE_CAPACITY;
26-
public static final int DEFAULT_THREADS = AbstractAsyncItemReader.DEFAULT_THREADS;
27-
public static final int DEFAULT_CHUNK_SIZE = AbstractAsyncItemReader.DEFAULT_CHUNK_SIZE;
26+
public static final int DEFAULT_THREADS = AbstractAsyncItemStreamSupport.DEFAULT_THREADS;
27+
public static final int DEFAULT_CHUNK_SIZE = AbstractAsyncItemStreamSupport.DEFAULT_CHUNK_SIZE;
2828
public static final RiotDuration DEFAULT_POLL_TIMEOUT = RiotDuration
2929
.of(AbstractPollableItemReader.DEFAULT_POLL_TIMEOUT, ChronoUnit.MILLIS);
3030

@@ -71,8 +71,8 @@ public <K> void configure(RedisItemReader<K, ?> reader) {
7171
reader.setProcessor(keyProcessor(reader.getCodec(), keyFilterArgs));
7272
}
7373

74-
private <K> ItemProcessor<KeyValue<K>, KeyValue<K>> keyProcessor(RedisCodec<K, ?> codec, KeyFilterArgs args) {
75-
return args.predicate(codec).map(p -> new FunctionPredicate<KeyValue<K>, K>(KeyValue::getKey, p))
74+
private <K> ItemProcessor<KeyEvent<K>, KeyEvent<K>> keyProcessor(RedisCodec<K, ?> codec, KeyFilterArgs args) {
75+
return args.predicate(codec).map(p -> new FunctionPredicate<KeyEvent<K>, K>(KeyEvent::getKey, p))
7676
.map(PredicateOperator::new).map(FunctionItemProcessor::new).orElse(null);
7777
}
7878

plugins/riot/src/main/java/com/redis/riot/Replicate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ protected void configureTargetRedisWriter(RedisItemWriter<?, ?, ?> writer) {
7474
protected Step<KeyValue<byte[]>, KeyValue<byte[]>> replicateStep() {
7575
RedisItemReader<byte[], byte[]> reader = reader();
7676
configureSourceRedisReader(reader);
77-
Step<KeyValue<byte[]>, KeyValue<byte[]>> step = step(reader, replicateWriter());
77+
Step<KeyValue<byte[]>, KeyValue<byte[]>> step = new ExportStepHelper(log).step(reader, replicateWriter());
7878
step.processor(filter());
7979
step.taskName(taskName(reader));
8080
if (logKeys) {

0 commit comments

Comments
 (0)