Skip to content

Commit 21f682d

Browse files
committed
FIREFLY-1925: Use MessagePack for Redis object storage.
1 parent 50f19f4 commit 21f682d

File tree

27 files changed

+1084
-179
lines changed

27 files changed

+1084
-179
lines changed

buildScript/dependencies.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ dependencies {
9898
// amazon s3 support
9999
implementation 'software.amazon.awssdk:s3-transfer-manager:2.33.11'
100100

101+
// jackson serialization support, including msgpack
102+
implementation "org.msgpack:jackson-dataformat-msgpack:0.9.8"
103+
implementation "com.fasterxml.jackson.core:jackson-databind:2.17.2"
104+
implementation "com.fasterxml.jackson.core:jackson-annotations:2.17.2"
105+
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.2"
106+
101107
}
102108

103109

src/firefly/java/edu/caltech/ipac/firefly/api/Async.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ private static void sendErrorResponse(int code, JobInfo info, String message, Ht
183183
}
184184
res.setStatus(code);
185185
info.setPhase(JobInfo.Phase.ERROR);
186-
info.setError(new JobInfo.Error(code, message));
186+
info.setErrorSummary(new JobInfo.ErrorSummary(message));
187187
sendResponse(JobUtil.toJson(info), res);
188188
}
189189

src/firefly/java/edu/caltech/ipac/firefly/core/RedisService.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import edu.caltech.ipac.util.cache.StringKey;
1717
import io.lettuce.core.*;
1818
import io.lettuce.core.api.StatefulRedisConnection;
19+
import io.lettuce.core.codec.ByteArrayCodec;
20+
import io.lettuce.core.codec.RedisCodec;
21+
import io.lettuce.core.codec.StringCodec;
1922
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
2023
import io.lettuce.core.event.EventBus;
2124
import io.lettuce.core.event.connection.ConnectionActivatedEvent;
@@ -74,13 +77,15 @@ private enum SchemaVersion { V1_0, V1_1 }
7477
private static Instant failSince;
7578

7679
private static RedisClient client;
77-
private static StatefulRedisConnection<String, String> mainConn;
78-
private static StatefulRedisConnection<String, String> scanConn;
79-
private static StatefulRedisPubSubConnection<String, String> subPubConn;
80+
private static StatefulRedisConnection<String, byte[]> mainConn;
81+
private static StatefulRedisConnection<String, byte[]> scanConn;
82+
private static StatefulRedisPubSubConnection<String, byte[]> subPubConn;
8083

8184
private static final List<String> RESERVED_KEYS = List.of(ALL_JOB_CACHE_KEY);
8285
private static final AtomicBoolean initialized = new AtomicBoolean(false);
8386
private static final AtomicBoolean localStartupTriggered = new AtomicBoolean(false);
87+
private static RedisCodec<String, byte[]> codec =
88+
RedisCodec.of(StringCodec.UTF8, ByteArrayCodec.INSTANCE);
8489

8590
// -------------------------------------------------------------------------
8691
// Initialization
@@ -141,9 +146,9 @@ public static synchronized void init() throws Exception {
141146

142147
private static void tryConnect() throws Exception{
143148
try {
144-
mainConn = client.connect();
145-
scanConn = client.connect();
146-
subPubConn = client.connectPubSub();
149+
mainConn = client.connect(codec);
150+
scanConn = client.connect(codec);
151+
subPubConn = client.connectPubSub(codec);
147152
LOG.info("Lettuce connections created. Auto-reconnect enabled.");
148153
} catch (Exception e) {
149154
LOG.error("Error connecting to Redis...");
@@ -189,19 +194,19 @@ public static boolean isConnected() {
189194
// Connection accessors
190195
// -------------------------------------------------------------------------
191196

192-
public static StatefulRedisConnection<String, String> mainConn() throws Exception {
197+
public static StatefulRedisConnection<String, byte[]> mainConn() throws Exception {
193198
return checkConn(mainConn);
194199
}
195200

196-
public static StatefulRedisConnection<String, String> scanConn() throws Exception {
201+
public static StatefulRedisConnection<String, byte[]> scanConn() throws Exception {
197202
return checkConn(scanConn);
198203
}
199204

200-
public static StatefulRedisPubSubConnection<String, String> pubSubConn() throws Exception {
205+
public static StatefulRedisPubSubConnection<String, byte[]> pubSubConn() throws Exception {
201206
return checkConn(subPubConn);
202207
}
203208

204-
private static <T extends StatefulRedisConnection<String, String>> T checkConn(T conn)
209+
private static <T extends StatefulRedisConnection<String, byte[]>> T checkConn(T conn)
205210
throws Exception {
206211
if (conn == null) init();
207212
if (conn != null && conn.isOpen()) return conn;
@@ -510,15 +515,16 @@ private static void dataMigration() {
510515
if (isEmpty(cVersion) && isEmpty(jobCacheVersion)) {
511516
LOG.info("Migrating unversioned Redis data schema to version 1.0");
512517
LOG.info(" - change to composite job key; jobId:userKey");
513-
int count = migrateRedisKeys();
518+
int count = appendUserKeyToJobId();
514519
LOG.info("Migrated " + count + " job keys to new format");
515520
CacheManager.getDistributed().put(JOB_CACHE_VERSION_KEY, "1.0");
521+
jobCacheVersion = "1.0";
516522
}
517523

518524
// moving 1.0 to V1_1:
519525
// Rename key to SchemaVersion. Apply version to the full Redis data structure and not just JobInfo.
520526
if (!isEmpty(jobCacheVersion)) {
521-
LOG.info("Updating Redis data structure version from 1.0 to V1.1");
527+
LOG.info("Updating Redis data structure version from 1.0 to V1_1");
522528
redisCache.remove(JOB_CACHE_VERSION_KEY);
523529
redisCache.put(VersionKey, SchemaVersion.V1_1.name());
524530
}

src/firefly/java/edu/caltech/ipac/firefly/core/background/EmailNotification.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ public static void sendNotification(JobInfo jobInfo) {
9898
Logger.getLogger().info("No email address found for job: %s; skip Email Notification".formatted(jobInfo.getJobId()));
9999
return;
100100
}
101-
if (jobInfo.getError() != null) {
101+
if (jobInfo.getErrorSummary() != null) {
102102

103-
String msg = failure.formatted(name, jobInfo.getJobId(), jobInfo.getError().msg(), contact);
103+
String msg = failure.formatted(name, jobInfo.getJobId(), jobInfo.getErrorSummary().message(), contact);
104104
Try.it(() -> EMailUtil.sendMessage(new String[]{email}, null, null, subject, msg))
105105
.getOrElse(e -> Logger.getLogger().error(e));
106106

src/firefly/java/edu/caltech/ipac/firefly/core/background/Job.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import static edu.caltech.ipac.firefly.core.background.JobManager.sendUpdate;
1414
import static edu.caltech.ipac.firefly.core.background.JobManager.updateJobInfo;
1515
import static edu.caltech.ipac.firefly.server.util.QueryUtil.combineErrorMsg;
16-
import static java.util.Optional.ofNullable;
1716

1817
/**
1918
* Date: 9/29/21
@@ -73,7 +72,7 @@ default String call() {
7372
} catch (Exception e) {
7473
updateManagedStatus(ji -> {
7574
String msg = combineErrorMsg(e.getMessage(), e.getCause() == null ? null : e.getCause().getMessage());
76-
ji.setError(new JobInfo.Error(500, msg));
75+
ji.setErrorSummary(new JobInfo.ErrorSummary(msg));
7776
});
7877
Logger.getLogger().error(e);
7978
} finally {

src/firefly/java/edu/caltech/ipac/firefly/core/background/JobInfo.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
package edu.caltech.ipac.firefly.core.background;
66

7+
import com.fasterxml.jackson.annotation.JsonIgnore;
8+
import com.fasterxml.jackson.annotation.JsonProperty;
79
import edu.caltech.ipac.firefly.core.Util;
810
import edu.caltech.ipac.firefly.server.SrvParam;
911
import edu.caltech.ipac.util.AppProperties;
@@ -52,6 +54,7 @@ public enum Phase {PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, HELD,
5254
public static final String ERROR_SUMMARY = "errorSummary";
5355
public static final String ERROR_TYPE = "type";
5456
public static final String ERROR_MSG = "message";
57+
public static final String ERROR_HAS_DETAILS = "hasDetail";
5558
public static final String META = "meta";
5659
public static final String JOB_INFO = "jobInfo";
5760

@@ -83,9 +86,9 @@ public enum Phase {PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, HELD,
8386
private Instant endTime;
8487
private int executionDuration = LIFE_SPAN;
8588
private Instant destruction;
86-
private Map<String, String> params = new HashMap<>();
89+
private Map<String, String> parameters = new HashMap<>();
8790
private List<Result> results = new ArrayList<>();
88-
private Error error;
91+
private ErrorSummary errorSummary;
8992

9093
//meta contains essential information needed to manage the job
9194
final private Meta meta = new Meta();
@@ -113,6 +116,8 @@ public void setRunId(String runId) {
113116
public Meta getMeta() {
114117
return meta;
115118
}
119+
120+
@JsonProperty("jobInfo")
116121
public Aux getAux() {return aux; }
117122

118123
public Phase getPhase() {
@@ -127,13 +132,13 @@ public void setPhase(Phase phase) {
127132
this.phase = phase;
128133
}
129134

130-
public Error getError() {
131-
return error;
135+
public ErrorSummary getErrorSummary() {
136+
return errorSummary;
132137
}
133138

134-
public void setError(Error error) {
139+
public void setErrorSummary(ErrorSummary errorSummary) {
135140
setPhase(Phase.ERROR);
136-
this.error = error;
141+
this.errorSummary = errorSummary;
137142
}
138143

139144
public List<Result> getResults() {
@@ -145,11 +150,11 @@ public void setResults(List<Result> results) {
145150
}
146151

147152
@Nonnull
148-
public Map<String, String> getParams() {
149-
return params;
153+
public Map<String, String> getParameters() {
154+
return parameters;
150155
}
151156

152-
public void setParams(Map<String,String> params) { this.params = params; }
157+
public void setParameters(Map<String,String> parameters) { this.parameters = parameters; }
153158

154159
public String getOwnerId() { return ownerId;}
155160

@@ -187,16 +192,17 @@ public Instant getEndTime() {
187192
/**
188193
* @return how long this job may run in seconds. zero implies unlimited execution duration.
189194
*/
190-
public long executionDuration() {
195+
public long getExecutionDuration() {
191196
return executionDuration;
192197
}
193198
public void setExecutionDuration(int duration) { executionDuration = duration; }
194199

195200
/**
196201
* @return a SrvParam from the flatten params map
197202
*/
203+
@JsonIgnore
198204
public SrvParam getSrvParams() {
199-
return SrvParam.makeSrvParamSimpleMap(getMeta().getParams());
205+
return SrvParam.makeSrvParamSimpleMap(getMeta().getParameters());
200206
}
201207

202208
public void copyFrom(JobInfo uws) {
@@ -212,9 +218,9 @@ public void copyFrom(JobInfo uws) {
212218
this.endTime = uws.endTime;
213219
this.executionDuration = uws.executionDuration;
214220
this.destruction = uws.destruction;
215-
this.params = new HashMap<>(uws.params);
221+
this.parameters = new HashMap<>(uws.parameters);
216222
this.results = new ArrayList<>(uws.results);
217-
this.error = uws.error;
223+
this.errorSummary = uws.errorSummary;
218224
ifNotNull(uws.aux.getJobUrl()).apply(aux::setJobUrl);
219225
ifNotNull(uws.aux.getUserId()).apply(aux::setUserId);
220226
ifNotNull(uws.aux.getUserName()).apply(aux::setUserName);
@@ -226,7 +232,11 @@ public void copyFrom(JobInfo uws) {
226232
//
227233
//====================================================================
228234

229-
public record Error ( int code, String msg) implements Serializable {}
235+
public record ErrorSummary(String message, String type, boolean hasDetail) implements Serializable {
236+
public ErrorSummary(String message) {
237+
this(message, "transient", true);
238+
}
239+
}
230240
public record Result(String id, String href, String mimeType, String size) implements Serializable {};
231241

232242
/**
@@ -237,7 +247,7 @@ public record Result(String id, String href, String mimeType, String size) imple
237247
public static class Meta implements Serializable {
238248
String jobId;
239249
String runId;
240-
Map<String, String> params = new HashMap<>();
250+
Map<String, String> parameters = new HashMap<>();
241251
String userKey;
242252
Job.Type type;
243253
int progress;
@@ -258,8 +268,8 @@ public static class Meta implements Serializable {
258268
public String getRunId() { return runId; }
259269
public void setRunId(String runId) { this.runId = runId; }
260270

261-
public Map<String, String> getParams() { return params; }
262-
public void setParams(Map<String, String> params) { this.params = params; }
271+
public Map<String, String> getParameters() { return parameters; }
272+
public void setParameters(Map<String, String> parameters) { this.parameters = parameters; }
263273

264274
public String getUserKey() { return userKey; }
265275
public void setUserKey(String userKey) { this.userKey = userKey;}

0 commit comments

Comments
 (0)