Skip to content

Commit de8ad4f

Browse files
pditommasoclaude
andcommitted
[release] lib-cmd-queue-redis@0.1.0
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent eff9e4e commit de8ad4f

28 files changed

+2505
-0
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ jobs:
8686
bash publish.sh lib-serde-jackson
8787
bash publish.sh lib-trace
8888
bash publish.sh lib-jedis-pool
89+
bash publish.sh lib-cmd-queue-redis
8990
bash publish.sh micronaut-cache-redis
9091
9192
env:

.github/workflows/generate-submit-dependencies.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ jobs:
3737
"lib-trace",
3838
"lib-util-http",
3939
"lib-jedis-pool",
40+
"lib-cmd-queue-redis",
4041
"micronaut-cache-redis",
4142
"wave-api",
4243
"wave-utils"

lib-cmd-queue-redis/README.md

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
# lib-cmd-queue-redis
2+
3+
Asynchronous command queue for executing long-running tasks with persistent state tracking and automatic status polling.
4+
5+
## Installation
6+
7+
Add this dependency to your `build.gradle`:
8+
9+
```gradle
10+
dependencies {
11+
implementation 'io.seqera:lib-cmd-queue-redis:0.1.0'
12+
}
13+
```
14+
15+
## Features
16+
17+
- Fire-and-forget command submission
18+
- Typed parameters and results with JSON serialization
19+
- Status transitions: `SUBMITTED``RUNNING``SUCCEEDED`/`FAILED`/`CANCELLED`
20+
- Automatic timeout handling for long-running commands
21+
- Periodic status checking for async commands
22+
- Command cancellation support
23+
- Persistent storage using Redis or in-memory backend
24+
25+
## Usage
26+
27+
### Define Command Parameters and Result
28+
29+
```java
30+
// Command parameters - must have default constructor for Jackson
31+
public class ProcessingParams {
32+
private String datasetId;
33+
private List<String> steps;
34+
// Getters, setters, constructors...
35+
}
36+
37+
// Command result
38+
public class ProcessingResult {
39+
private int recordsProcessed;
40+
private long durationMs;
41+
// Getters, setters, constructors...
42+
}
43+
```
44+
45+
### Implement a Command Handler
46+
47+
For **synchronous** commands that complete quickly:
48+
49+
```java
50+
@Singleton
51+
public class ProcessingHandler implements CommandHandler<ProcessingParams, ProcessingResult> {
52+
53+
@Override
54+
public String type() { return "data-processing"; }
55+
56+
@Override
57+
public CommandResult<ProcessingResult> execute(Command<ProcessingParams> command) {
58+
var params = command.params();
59+
// Do the work...
60+
var result = new ProcessingResult(1000, 5000L);
61+
return CommandResult.success(result);
62+
}
63+
}
64+
```
65+
66+
For **asynchronous** long-running commands:
67+
68+
```java
69+
@Singleton
70+
public class AsyncProcessingHandler implements CommandHandler<ProcessingParams, ProcessingResult> {
71+
72+
@Inject ExternalService externalService;
73+
74+
@Override
75+
public String type() { return "async-processing"; }
76+
77+
@Override
78+
public CommandResult<ProcessingResult> execute(Command<ProcessingParams> command) {
79+
// Start async job
80+
externalService.startJob(command.id(), command.params());
81+
return CommandResult.running(); // checkStatus() will be called later
82+
}
83+
84+
@Override
85+
public CommandResult<ProcessingResult> checkStatus(Command<ProcessingParams> command, CommandState state) {
86+
var status = externalService.getStatus(command.id());
87+
if (status.isComplete()) return CommandResult.success(status.getResult());
88+
if (status.isFailed()) return CommandResult.failure(status.getError());
89+
return CommandResult.running(); // Still running, check again later
90+
}
91+
}
92+
```
93+
94+
### Submit Commands
95+
96+
```java
97+
@Inject
98+
private CommandService commandService;
99+
100+
// Register handler at startup
101+
commandService.registerHandler(new ProcessingHandler());
102+
103+
// Submit a command
104+
var command = new ProcessingCommand("cmd-123", params);
105+
String commandId = commandService.submit(command);
106+
107+
// Check status
108+
Optional<CommandState> state = commandService.getState(commandId);
109+
110+
// Get result when complete
111+
ProcessingResult result = commandService.getResult(commandId, ProcessingResult.class).orElseThrow();
112+
```
113+
114+
## Command Status Flow
115+
116+
```
117+
submit() ──▶ SUBMITTED ──pickup──▶ RUNNING ─┬─success──▶ SUCCEEDED
118+
├─error────▶ FAILED
119+
└─cancel───▶ CANCELLED
120+
```
121+
122+
## Testing
123+
124+
```bash
125+
./gradlew :lib-cmd-queue-redis:test
126+
```
127+
128+
## License
129+
130+
Apache License 2.0

lib-cmd-queue-redis/VERSION

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
0.1.0

lib-cmd-queue-redis/build.gradle

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2025, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
plugins {
19+
id 'io.seqera.java-library-conventions'
20+
id 'groovy'
21+
}
22+
23+
group = 'io.seqera'
24+
version = "${project.file('VERSION').text.trim()}"
25+
26+
dependencies {
27+
// Micronaut core
28+
implementation "io.micronaut:micronaut-inject:${micronautCoreVersion}"
29+
implementation "io.micronaut:micronaut-runtime:${micronautCoreVersion}"
30+
31+
// JSON serialization
32+
implementation project(':lib-serde-jackson')
33+
34+
// Groovy (needed for AbstractMessageStream base class)
35+
compileOnly "io.micronaut:micronaut-inject-groovy:${micronautCoreVersion}"
36+
37+
// Message stream
38+
implementation project(':lib-data-stream-redis')
39+
implementation project(':lib-serde-moshi')
40+
41+
// State store
42+
implementation project(':lib-data-store-state-redis')
43+
44+
// Utilities (includes type resolution)
45+
implementation project(':lib-lang')
46+
47+
// TSID for ID generation
48+
implementation 'com.github.f4b6a3:tsid-creator:5.2.6'
49+
50+
// SLF4J logging
51+
implementation 'org.slf4j:slf4j-api:2.0.17'
52+
53+
// Test dependencies
54+
testImplementation "io.micronaut:micronaut-inject-groovy:${micronautCoreVersion}"
55+
testImplementation "io.micronaut.test:micronaut-test-spock:${micronautTestVersion}"
56+
testImplementation 'org.apache.groovy:groovy'
57+
testImplementation 'com.github.f4b6a3:tsid-creator:5.2.6'
58+
testImplementation 'org.junit.jupiter:junit-jupiter-api'
59+
testRuntimeOnly 'ch.qos.logback:logback-classic:1.5.13'
60+
}

lib-cmd-queue-redis/changelog.txt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# lib-cmd-queue-redis changelog
2+
3+
0.1.0 - 15 Jan 2026
4+
- Initial release
5+
- Asynchronous command queue with persistent state tracking
6+
- Fire-and-forget command submission
7+
- Typed parameters and results with JSON serialization
8+
- Status transitions: SUBMITTED → RUNNING → SUCCEEDED/FAILED/CANCELLED
9+
- Automatic timeout handling for long-running commands
10+
- Periodic status checking via checkStatus() for async commands
11+
- Command cancellation support
12+
- Redis-backed persistent storage with TTL expiration
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2025, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.seqera.data.command;
19+
20+
/**
21+
* A command represents a unit of work to be executed asynchronously.
22+
* Commands can take arbitrary time (from milliseconds to days) and survive system restarts.
23+
*
24+
* @param <P> The type of the command parameters
25+
*/
26+
public interface Command<P> {
27+
28+
/**
29+
* Unique identifier for this command (typically a TSID).
30+
*/
31+
String id();
32+
33+
/**
34+
* The command type, used to route to the appropriate handler.
35+
*/
36+
String type();
37+
38+
/**
39+
* The typed parameters for this command.
40+
*/
41+
P params();
42+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2025, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.seqera.data.command;
17+
18+
import java.time.Duration;
19+
20+
/**
21+
* Configuration interface for command queue processing.
22+
*
23+
* <p>Provides default values for all configuration options. Applications
24+
* can implement this interface to provide custom values via their
25+
* preferred configuration mechanism (e.g., @Value annotations).
26+
*
27+
* @author Paolo Di Tommaso
28+
*/
29+
public interface CommandConfig {
30+
31+
/**
32+
* Interval for polling the command queue.
33+
*/
34+
default Duration pollInterval() {
35+
return Duration.ofSeconds(1);
36+
}
37+
38+
/**
39+
* Timeout for synchronous command execution.
40+
* If execute() takes longer than this, the command is marked as RUNNING
41+
* and checkStatus() will be called on subsequent queue deliveries.
42+
*/
43+
default Duration executeTimeout() {
44+
return Duration.ofSeconds(1);
45+
}
46+
47+
/**
48+
* TTL (Time-To-Live) for command state records in the persistent store.
49+
* Commands expire and are removed after this duration.
50+
*/
51+
default Duration stateTtl() {
52+
return Duration.ofDays(7);
53+
}
54+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2025, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.seqera.data.command;
19+
20+
/**
21+
* Handler for executing commands of a specific type.
22+
* Generic types P (params) and R (result) are extracted via reflection at registration time.
23+
*
24+
* @param <P> The type of command parameters
25+
* @param <R> The type of command result
26+
*/
27+
public interface CommandHandler<P, R> {
28+
29+
/**
30+
* The command type this handler processes.
31+
*/
32+
String type();
33+
34+
/**
35+
* Execute the command and return a result.
36+
* This method is executed asynchronously via an executor service.
37+
* If execution takes longer than 1 second, the command is marked as RUNNING and
38+
* {@link #checkStatus} will be called periodically to check completion.
39+
* For long-running commands, return {@link CommandResult#running()} to indicate
40+
* the operation is in progress.
41+
*
42+
* @param command The command to execute
43+
* @return The result of the execution
44+
*/
45+
CommandResult<R> execute(Command<P> command);
46+
47+
/**
48+
* Check the status of a long-running command.
49+
* Called periodically for commands in RUNNING state until a terminal status is returned.
50+
* The command parameter provides typed access to params via {@code command.params()}.
51+
* The state parameter provides access to timing and status information.
52+
*
53+
* @param command The command being checked (provides typed params access)
54+
* @param state The current command state (timing, status info)
55+
* @return The result indicating current status (RUNNING to continue, or terminal status)
56+
*/
57+
default CommandResult<R> checkStatus(Command<P> command, CommandState state) {
58+
return CommandResult.running();
59+
}
60+
}

0 commit comments

Comments
 (0)