Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions buildScript/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ dependencies {
// amazon s3 support
implementation 'software.amazon.awssdk:s3-transfer-manager:2.33.11'

// jackson serialization support, including msgpack
implementation "org.msgpack:jackson-dataformat-msgpack:0.9.8"
implementation "com.fasterxml.jackson.core:jackson-databind:2.17.2"
implementation "com.fasterxml.jackson.core:jackson-annotations:2.17.2"
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.2"

}


Expand Down
2 changes: 1 addition & 1 deletion src/firefly/java/edu/caltech/ipac/firefly/api/Async.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private static void sendErrorResponse(int code, JobInfo info, String message, Ht
}
res.setStatus(code);
info.setPhase(JobInfo.Phase.ERROR);
info.setError(new JobInfo.Error(code, message));
info.setErrorSummary(new JobInfo.ErrorSummary(message));
sendResponse(JobUtil.toJson(info), res);
}

Expand Down
30 changes: 18 additions & 12 deletions src/firefly/java/edu/caltech/ipac/firefly/core/RedisService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import edu.caltech.ipac.util.cache.StringKey;
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ConnectionActivatedEvent;
Expand Down Expand Up @@ -74,13 +77,15 @@ private enum SchemaVersion { V1_0, V1_1 }
private static Instant failSince;

private static RedisClient client;
private static StatefulRedisConnection<String, String> mainConn;
private static StatefulRedisConnection<String, String> scanConn;
private static StatefulRedisPubSubConnection<String, String> subPubConn;
private static StatefulRedisConnection<String, byte[]> mainConn;
private static StatefulRedisConnection<String, byte[]> scanConn;
private static StatefulRedisPubSubConnection<String, byte[]> subPubConn;

private static final List<String> RESERVED_KEYS = List.of(ALL_JOB_CACHE_KEY);
private static final AtomicBoolean initialized = new AtomicBoolean(false);
private static final AtomicBoolean localStartupTriggered = new AtomicBoolean(false);
private static RedisCodec<String, byte[]> codec =
RedisCodec.of(StringCodec.UTF8, ByteArrayCodec.INSTANCE);

// -------------------------------------------------------------------------
// Initialization
Expand Down Expand Up @@ -141,9 +146,9 @@ public static synchronized void init() throws Exception {

private static void tryConnect() throws Exception{
try {
mainConn = client.connect();
scanConn = client.connect();
subPubConn = client.connectPubSub();
mainConn = client.connect(codec);
scanConn = client.connect(codec);
subPubConn = client.connectPubSub(codec);
LOG.info("Lettuce connections created. Auto-reconnect enabled.");
} catch (Exception e) {
LOG.error("Error connecting to Redis...");
Expand Down Expand Up @@ -189,19 +194,19 @@ public static boolean isConnected() {
// Connection accessors
// -------------------------------------------------------------------------

public static StatefulRedisConnection<String, String> mainConn() throws Exception {
public static StatefulRedisConnection<String, byte[]> mainConn() throws Exception {
return checkConn(mainConn);
}

public static StatefulRedisConnection<String, String> scanConn() throws Exception {
public static StatefulRedisConnection<String, byte[]> scanConn() throws Exception {
return checkConn(scanConn);
}

public static StatefulRedisPubSubConnection<String, String> pubSubConn() throws Exception {
public static StatefulRedisPubSubConnection<String, byte[]> pubSubConn() throws Exception {
return checkConn(subPubConn);
}

private static <T extends StatefulRedisConnection<String, String>> T checkConn(T conn)
private static <T extends StatefulRedisConnection<String, byte[]>> T checkConn(T conn)
throws Exception {
if (conn == null) init();
if (conn != null && conn.isOpen()) return conn;
Expand Down Expand Up @@ -510,15 +515,16 @@ private static void dataMigration() {
if (isEmpty(cVersion) && isEmpty(jobCacheVersion)) {
LOG.info("Migrating unversioned Redis data schema to version 1.0");
LOG.info(" - change to composite job key; jobId:userKey");
int count = migrateRedisKeys();
int count = appendUserKeyToJobId();
LOG.info("Migrated " + count + " job keys to new format");
CacheManager.getDistributed().put(JOB_CACHE_VERSION_KEY, "1.0");
jobCacheVersion = "1.0";
}

// moving 1.0 to V1_1:
// Rename key to SchemaVersion. Apply version to the full Redis data structure and not just JobInfo.
if (!isEmpty(jobCacheVersion)) {
LOG.info("Updating Redis data structure version from 1.0 to V1.1");
LOG.info("Updating Redis data structure version from 1.0 to V1_1");
redisCache.remove(JOB_CACHE_VERSION_KEY);
redisCache.put(VersionKey, SchemaVersion.V1_1.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ public static void sendNotification(JobInfo jobInfo) {
Logger.getLogger().info("No email address found for job: %s; skip Email Notification".formatted(jobInfo.getJobId()));
return;
}
if (jobInfo.getError() != null) {
if (jobInfo.getErrorSummary() != null) {

String msg = failure.formatted(name, jobInfo.getJobId(), jobInfo.getError().msg(), contact);
String msg = failure.formatted(name, jobInfo.getJobId(), jobInfo.getErrorSummary().message(), contact);
Try.it(() -> EMailUtil.sendMessage(new String[]{email}, null, null, subject, msg))
.getOrElse(e -> Logger.getLogger().error(e));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import static edu.caltech.ipac.firefly.core.background.JobManager.sendUpdate;
import static edu.caltech.ipac.firefly.core.background.JobManager.updateJobInfo;
import static edu.caltech.ipac.firefly.server.util.QueryUtil.combineErrorMsg;
import static java.util.Optional.ofNullable;

/**
* Date: 9/29/21
Expand Down Expand Up @@ -73,7 +72,7 @@ default String call() {
} catch (Exception e) {
updateManagedStatus(ji -> {
String msg = combineErrorMsg(e.getMessage(), e.getCause() == null ? null : e.getCause().getMessage());
ji.setError(new JobInfo.Error(500, msg));
ji.setErrorSummary(new JobInfo.ErrorSummary(msg));
});
Logger.getLogger().error(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package edu.caltech.ipac.firefly.core.background;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import edu.caltech.ipac.firefly.core.Util;
import edu.caltech.ipac.firefly.server.SrvParam;
import edu.caltech.ipac.util.AppProperties;
Expand Down Expand Up @@ -52,6 +54,7 @@ public enum Phase {PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, HELD,
public static final String ERROR_SUMMARY = "errorSummary";
public static final String ERROR_TYPE = "type";
public static final String ERROR_MSG = "message";
public static final String ERROR_HAS_DETAILS = "hasDetail";
public static final String META = "meta";
public static final String JOB_INFO = "jobInfo";

Expand Down Expand Up @@ -83,9 +86,9 @@ public enum Phase {PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, HELD,
private Instant endTime;
private int executionDuration = LIFE_SPAN;
private Instant destruction;
private Map<String, String> params = new HashMap<>();
private Map<String, String> parameters = new HashMap<>();
private List<Result> results = new ArrayList<>();
private Error error;
private ErrorSummary errorSummary;

//meta contains essential information needed to manage the job
final private Meta meta = new Meta();
Expand Down Expand Up @@ -113,6 +116,8 @@ public void setRunId(String runId) {
public Meta getMeta() {
return meta;
}

@JsonProperty("jobInfo")
public Aux getAux() {return aux; }

public Phase getPhase() {
Expand All @@ -127,13 +132,13 @@ public void setPhase(Phase phase) {
this.phase = phase;
}

public Error getError() {
return error;
public ErrorSummary getErrorSummary() {
return errorSummary;
}

public void setError(Error error) {
public void setErrorSummary(ErrorSummary errorSummary) {
setPhase(Phase.ERROR);
this.error = error;
this.errorSummary = errorSummary;
}

public List<Result> getResults() {
Expand All @@ -145,11 +150,11 @@ public void setResults(List<Result> results) {
}

@Nonnull
public Map<String, String> getParams() {
return params;
public Map<String, String> getParameters() {
return parameters;
}

public void setParams(Map<String,String> params) { this.params = params; }
public void setParameters(Map<String,String> parameters) { this.parameters = parameters; }

public String getOwnerId() { return ownerId;}

Expand Down Expand Up @@ -187,16 +192,17 @@ public Instant getEndTime() {
/**
* @return how long this job may run in seconds. zero implies unlimited execution duration.
*/
public long executionDuration() {
public long getExecutionDuration() {
return executionDuration;
}
public void setExecutionDuration(int duration) { executionDuration = duration; }

/**
* @return a SrvParam from the flatten params map
*/
@JsonIgnore
public SrvParam getSrvParams() {
return SrvParam.makeSrvParamSimpleMap(getMeta().getParams());
return SrvParam.makeSrvParamSimpleMap(getMeta().getParameters());
}

public void copyFrom(JobInfo uws) {
Expand All @@ -212,9 +218,9 @@ public void copyFrom(JobInfo uws) {
this.endTime = uws.endTime;
this.executionDuration = uws.executionDuration;
this.destruction = uws.destruction;
this.params = new HashMap<>(uws.params);
this.parameters = new HashMap<>(uws.parameters);
this.results = new ArrayList<>(uws.results);
this.error = uws.error;
this.errorSummary = uws.errorSummary;
ifNotNull(uws.aux.getJobUrl()).apply(aux::setJobUrl);
ifNotNull(uws.aux.getUserId()).apply(aux::setUserId);
ifNotNull(uws.aux.getUserName()).apply(aux::setUserName);
Expand All @@ -226,7 +232,11 @@ public void copyFrom(JobInfo uws) {
//
//====================================================================

public record Error ( int code, String msg) implements Serializable {}
public record ErrorSummary(String message, String type, boolean hasDetail) implements Serializable {
public ErrorSummary(String message) {
this(message, "transient", true);
}
}
public record Result(String id, String href, String mimeType, String size) implements Serializable {};

/**
Expand All @@ -237,7 +247,7 @@ public record Result(String id, String href, String mimeType, String size) imple
public static class Meta implements Serializable {
String jobId;
String runId;
Map<String, String> params = new HashMap<>();
Map<String, String> parameters = new HashMap<>();
String userKey;
Job.Type type;
int progress;
Expand All @@ -258,8 +268,8 @@ public static class Meta implements Serializable {
public String getRunId() { return runId; }
public void setRunId(String runId) { this.runId = runId; }

public Map<String, String> getParams() { return params; }
public void setParams(Map<String, String> params) { this.params = params; }
public Map<String, String> getParameters() { return parameters; }
public void setParameters(Map<String, String> parameters) { this.parameters = parameters; }

public String getUserKey() { return userKey; }
public void setUserKey(String userKey) { this.userKey = userKey;}
Expand Down
Loading