diff --git a/buildScript/dependencies.gradle b/buildScript/dependencies.gradle index 262d6e1424..7ec4e5c35b 100644 --- a/buildScript/dependencies.gradle +++ b/buildScript/dependencies.gradle @@ -98,6 +98,12 @@ dependencies { // amazon s3 support implementation 'software.amazon.awssdk:s3-transfer-manager:2.33.11' + // jackson serialization support, including msgpack + implementation "org.msgpack:jackson-dataformat-msgpack:0.9.8" + implementation "com.fasterxml.jackson.core:jackson-databind:2.17.2" + implementation "com.fasterxml.jackson.core:jackson-annotations:2.17.2" + implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.17.2" + } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/api/Async.java b/src/firefly/java/edu/caltech/ipac/firefly/api/Async.java index bb058a2df2..df7956c576 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/api/Async.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/api/Async.java @@ -183,7 +183,7 @@ private static void sendErrorResponse(int code, JobInfo info, String message, Ht } res.setStatus(code); info.setPhase(JobInfo.Phase.ERROR); - info.setError(new JobInfo.Error(code, message)); + info.setErrorSummary(new JobInfo.ErrorSummary(message)); sendResponse(JobUtil.toJson(info), res); } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/RedisService.java b/src/firefly/java/edu/caltech/ipac/firefly/core/RedisService.java index 438146dbf2..8f5863c8c0 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/RedisService.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/RedisService.java @@ -16,6 +16,9 @@ import edu.caltech.ipac.util.cache.StringKey; import io.lettuce.core.*; import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.codec.ByteArrayCodec; +import io.lettuce.core.codec.RedisCodec; +import io.lettuce.core.codec.StringCodec; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import io.lettuce.core.event.EventBus; import io.lettuce.core.event.connection.ConnectionActivatedEvent; @@ -74,13 +77,15 @@ private enum SchemaVersion { V1_0, V1_1 } private static Instant failSince; private static RedisClient client; - private static StatefulRedisConnection mainConn; - private static StatefulRedisConnection scanConn; - private static StatefulRedisPubSubConnection subPubConn; + private static StatefulRedisConnection mainConn; + private static StatefulRedisConnection scanConn; + private static StatefulRedisPubSubConnection subPubConn; private static final List RESERVED_KEYS = List.of(ALL_JOB_CACHE_KEY); private static final AtomicBoolean initialized = new AtomicBoolean(false); private static final AtomicBoolean localStartupTriggered = new AtomicBoolean(false); + private static RedisCodec codec = + RedisCodec.of(StringCodec.UTF8, ByteArrayCodec.INSTANCE); // ------------------------------------------------------------------------- // Initialization @@ -141,9 +146,9 @@ public static synchronized void init() throws Exception { private static void tryConnect() throws Exception{ try { - mainConn = client.connect(); - scanConn = client.connect(); - subPubConn = client.connectPubSub(); + mainConn = client.connect(codec); + scanConn = client.connect(codec); + subPubConn = client.connectPubSub(codec); LOG.info("Lettuce connections created. Auto-reconnect enabled."); } catch (Exception e) { LOG.error("Error connecting to Redis..."); @@ -189,19 +194,19 @@ public static boolean isConnected() { // Connection accessors // ------------------------------------------------------------------------- - public static StatefulRedisConnection mainConn() throws Exception { + public static StatefulRedisConnection mainConn() throws Exception { return checkConn(mainConn); } - public static StatefulRedisConnection scanConn() throws Exception { + public static StatefulRedisConnection scanConn() throws Exception { return checkConn(scanConn); } - public static StatefulRedisPubSubConnection pubSubConn() throws Exception { + public static StatefulRedisPubSubConnection pubSubConn() throws Exception { return checkConn(subPubConn); } - private static > T checkConn(T conn) + private static > T checkConn(T conn) throws Exception { if (conn == null) init(); if (conn != null && conn.isOpen()) return conn; @@ -510,15 +515,16 @@ private static void dataMigration() { if (isEmpty(cVersion) && isEmpty(jobCacheVersion)) { LOG.info("Migrating unversioned Redis data schema to version 1.0"); LOG.info(" - change to composite job key; jobId:userKey"); - int count = migrateRedisKeys(); + int count = appendUserKeyToJobId(); LOG.info("Migrated " + count + " job keys to new format"); CacheManager.getDistributed().put(JOB_CACHE_VERSION_KEY, "1.0"); + jobCacheVersion = "1.0"; } // moving 1.0 to V1_1: // Rename key to SchemaVersion. Apply version to the full Redis data structure and not just JobInfo. if (!isEmpty(jobCacheVersion)) { - LOG.info("Updating Redis data structure version from 1.0 to V1.1"); + LOG.info("Updating Redis data structure version from 1.0 to V1_1"); redisCache.remove(JOB_CACHE_VERSION_KEY); redisCache.put(VersionKey, SchemaVersion.V1_1.name()); } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/background/EmailNotification.java b/src/firefly/java/edu/caltech/ipac/firefly/core/background/EmailNotification.java index 4a22e13844..3527252c45 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/background/EmailNotification.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/background/EmailNotification.java @@ -98,9 +98,9 @@ public static void sendNotification(JobInfo jobInfo) { Logger.getLogger().info("No email address found for job: %s; skip Email Notification".formatted(jobInfo.getJobId())); return; } - if (jobInfo.getError() != null) { + if (jobInfo.getErrorSummary() != null) { - String msg = failure.formatted(name, jobInfo.getJobId(), jobInfo.getError().msg(), contact); + String msg = failure.formatted(name, jobInfo.getJobId(), jobInfo.getErrorSummary().message(), contact); Try.it(() -> EMailUtil.sendMessage(new String[]{email}, null, null, subject, msg)) .getOrElse(e -> Logger.getLogger().error(e)); diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/background/Job.java b/src/firefly/java/edu/caltech/ipac/firefly/core/background/Job.java index debb637e00..c00f8cc8ca 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/background/Job.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/background/Job.java @@ -13,7 +13,6 @@ import static edu.caltech.ipac.firefly.core.background.JobManager.sendUpdate; import static edu.caltech.ipac.firefly.core.background.JobManager.updateJobInfo; import static edu.caltech.ipac.firefly.server.util.QueryUtil.combineErrorMsg; -import static java.util.Optional.ofNullable; /** * Date: 9/29/21 @@ -73,7 +72,7 @@ default String call() { } catch (Exception e) { updateManagedStatus(ji -> { String msg = combineErrorMsg(e.getMessage(), e.getCause() == null ? null : e.getCause().getMessage()); - ji.setError(new JobInfo.Error(500, msg)); + ji.setErrorSummary(new JobInfo.ErrorSummary(msg)); }); Logger.getLogger().error(e); } finally { diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobInfo.java b/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobInfo.java index 4ef3481a6a..ccb86392d2 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobInfo.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobInfo.java @@ -4,6 +4,8 @@ package edu.caltech.ipac.firefly.core.background; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import edu.caltech.ipac.firefly.core.Util; import edu.caltech.ipac.firefly.server.SrvParam; import edu.caltech.ipac.util.AppProperties; @@ -52,6 +54,7 @@ public enum Phase {PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, HELD, public static final String ERROR_SUMMARY = "errorSummary"; public static final String ERROR_TYPE = "type"; public static final String ERROR_MSG = "message"; + public static final String ERROR_HAS_DETAILS = "hasDetail"; public static final String META = "meta"; public static final String JOB_INFO = "jobInfo"; @@ -83,9 +86,9 @@ public enum Phase {PENDING, QUEUED, EXECUTING, COMPLETED, ERROR, ABORTED, HELD, private Instant endTime; private int executionDuration = LIFE_SPAN; private Instant destruction; - private Map params = new HashMap<>(); + private Map parameters = new HashMap<>(); private List results = new ArrayList<>(); - private Error error; + private ErrorSummary errorSummary; //meta contains essential information needed to manage the job final private Meta meta = new Meta(); @@ -113,6 +116,8 @@ public void setRunId(String runId) { public Meta getMeta() { return meta; } + + @JsonProperty("jobInfo") public Aux getAux() {return aux; } public Phase getPhase() { @@ -127,13 +132,13 @@ public void setPhase(Phase phase) { this.phase = phase; } - public Error getError() { - return error; + public ErrorSummary getErrorSummary() { + return errorSummary; } - public void setError(Error error) { + public void setErrorSummary(ErrorSummary errorSummary) { setPhase(Phase.ERROR); - this.error = error; + this.errorSummary = errorSummary; } public List getResults() { @@ -145,11 +150,11 @@ public void setResults(List results) { } @Nonnull - public Map getParams() { - return params; + public Map getParameters() { + return parameters; } - public void setParams(Map params) { this.params = params; } + public void setParameters(Map parameters) { this.parameters = parameters; } public String getOwnerId() { return ownerId;} @@ -187,7 +192,7 @@ public Instant getEndTime() { /** * @return how long this job may run in seconds. zero implies unlimited execution duration. */ - public long executionDuration() { + public long getExecutionDuration() { return executionDuration; } public void setExecutionDuration(int duration) { executionDuration = duration; } @@ -195,8 +200,9 @@ public long executionDuration() { /** * @return a SrvParam from the flatten params map */ + @JsonIgnore public SrvParam getSrvParams() { - return SrvParam.makeSrvParamSimpleMap(getMeta().getParams()); + return SrvParam.makeSrvParamSimpleMap(getMeta().getParameters()); } public void copyFrom(JobInfo uws) { @@ -212,9 +218,9 @@ public void copyFrom(JobInfo uws) { this.endTime = uws.endTime; this.executionDuration = uws.executionDuration; this.destruction = uws.destruction; - this.params = new HashMap<>(uws.params); + this.parameters = new HashMap<>(uws.parameters); this.results = new ArrayList<>(uws.results); - this.error = uws.error; + this.errorSummary = uws.errorSummary; ifNotNull(uws.aux.getJobUrl()).apply(aux::setJobUrl); ifNotNull(uws.aux.getUserId()).apply(aux::setUserId); ifNotNull(uws.aux.getUserName()).apply(aux::setUserName); @@ -226,7 +232,11 @@ public void copyFrom(JobInfo uws) { // //==================================================================== - public record Error ( int code, String msg) implements Serializable {} + public record ErrorSummary(String message, String type, boolean hasDetail) implements Serializable { + public ErrorSummary(String message) { + this(message, "transient", true); + } + } public record Result(String id, String href, String mimeType, String size) implements Serializable {}; /** @@ -237,7 +247,7 @@ public record Result(String id, String href, String mimeType, String size) imple public static class Meta implements Serializable { String jobId; String runId; - Map params = new HashMap<>(); + Map parameters = new HashMap<>(); String userKey; Job.Type type; int progress; @@ -258,8 +268,8 @@ public static class Meta implements Serializable { public String getRunId() { return runId; } public void setRunId(String runId) { this.runId = runId; } - public Map getParams() { return params; } - public void setParams(Map params) { this.params = params; } + public Map getParameters() { return parameters; } + public void setParameters(Map parameters) { this.parameters = parameters; } public String getUserKey() { return userKey; } public void setUserKey(String userKey) { this.userKey = userKey;} diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobManager.java b/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobManager.java index 236cbef171..944d40d643 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobManager.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobManager.java @@ -25,7 +25,6 @@ import io.lettuce.core.ScanArgs; import org.apache.commons.lang.text.StrBuilder; import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; import javax.annotation.Nonnull; import java.io.Serializable; @@ -68,8 +67,8 @@ public class JobManager { public static final String BG_INFO = "background.info"; public static final String ALL_JOB_CACHE_KEY = "ALL_JOB_INFOS"; // cache key for all job infos - public static final long CLEANUP_INTVL_MINS = AppProperties.getIntProperty("job.cleanup.interval", 12*60); // run cleanup once every 12 hours - public static final int JOB_LIST_DEFAULT_LIMIT = AppProperties.getIntProperty("job.list.default.limit", 100_000); // the default limit for job list queries + public static final long CLEANUP_INTVL_MINS = AppProperties.getIntProperty("job.cleanup.interval", 2*60); // run cleanup once every 2 hours + public static final int JOB_LIST_DEFAULT_LIMIT = AppProperties.getIntProperty("job.list.default.limit", 100_000); // the default limit for job list queries private static final int KEEP_ALIVE_INTERVAL = AppProperties.getIntProperty("job.keepalive.interval", 60); // default keepalive interval in seconds private static final int WAIT_COMPLETE = AppProperties.getIntProperty("job.wait.complete", 1); // wait for complete after submit in seconds private static final int MAX_PACKAGERS = AppProperties.getIntProperty("job.max.packagers", 10); // maximum number of simultaneous packaging threads @@ -79,7 +78,7 @@ public class JobManager { private static final ExecutorService packagers = Executors.newFixedThreadPool(MAX_PACKAGERS); private static final ExecutorService searches = Executors.newCachedThreadPool(); private static final HashMap runningJobs = new HashMap<>(); - private static final DistribMapCache allJobInfos = new DistribMapCache<>(ALL_JOB_CACHE_KEY, 0, new JobInfoSerializer()); // the all job hash should never expire + private static final DistribMapCache allJobInfos = new DistribMapCache<>(ALL_JOB_CACHE_KEY, 0); // the all job hash should never expire private static final String COMPLETED_HANDLER = AppProperties.getProperty("job.completed.handler"); public static final CacheKey JOB_CACHE_VERSION_KEY = new StringKey("job.all.cache.version"); @@ -124,18 +123,20 @@ public static List list() { if (ids != null) importedJobIds.addAll(ids); }); // update all userJobs with active status - userJobs.forEach(ji -> { - if (ji.getMeta().getType() == UWS && - isActive(ji) && - !importedJobIds.contains(ji.getMeta().getJobId())) { - JobInfo uws = Try.it(() -> getUwsJobInfo(ji.getAux().getJobUrl())).get(); - if (uws == null) { - LOG.debug("Job no longer exists:" + ji.getAux().getJobUrl()); - ji.setError(new JobInfo.Error(404, "Job no longer exists")); - } else { - mergeJobInfo(ji, uws, null, null); + userJobs.removeIf(ji -> { + if (isActive(ji)) { + if (ji.getMeta().getType() == UWS && !importedJobIds.contains(ji.getMeta().getJobId())) { + // update UWS job status, if not already updated during import + JobInfo uws = Try.it(() -> getUwsJobInfo(ji.getAux().getJobUrl())).get(); + if (uws == null) { + LOG.debug("Job no longer exists:" + ji.getAux().getJobUrl()); + return true; // remove job that no longer exists + } else { + mergeJobInfo(ji, uws, null, null); + } } } + return false; }); return userJobs; @@ -149,6 +150,7 @@ public static JobInfo submit(Job job) { ji.setCreationTime(start); ji.setDestruction(start.plus(JOB_TTL_DAYS, ChronoUnit.DAYS)); ji.getMeta().setType(job.getType()); + ji.getMeta().setRunHost(hostName()); }); // update Job after jobInfo has been created job.runAs(reqOwner); @@ -169,7 +171,7 @@ public static JobInfo submit(Job job) { } catch (Exception e) { // job run() handles exceptions; this only happens if submit or future.get() fails sendUpdate(jobId, (ji) -> { - ji.setError(new JobInfo.Error(500, e.getMessage())); + ji.setErrorSummary(new ErrorSummary(e.getMessage())); ji.getMeta().setProgress(100, null); }); LOG.error(e); @@ -179,7 +181,7 @@ public static JobInfo submit(Job job) { public static JobInfo abort(String jobId, String reason) { JobInfo info = updateJobInfo(jobId, (ji) -> { - if (reason != null) ji.setError(new JobInfo.Error(500, reason)); + if (reason != null) ji.setErrorSummary(new ErrorSummary(reason)); ji.setPhase(ABORTED); }); if (info != null) { @@ -219,7 +221,7 @@ public static JobInfo setMonitored(String jobId, boolean isMonitored) { } public static JobInfo sendEmail(String jobId, String email) { - updateJobInfo(jobId, (ji) -> ji.getMeta().getParams().put(EMAIL, email)); + updateJobInfo(jobId, (ji) -> ji.getMeta().getParameters().put(EMAIL, email)); JobInfo jobInfo = getJobInfo(jobId); if (jobInfo != null) EmailNotification.sendNotification(jobInfo); return jobInfo; @@ -426,7 +428,6 @@ private static void initNewJob(JobInfo ji) { ji.getMeta().setJobId(ji.getJobId()); ji.getMeta().setUserKey(reqOwner.getUserKey()); ji.getMeta().setEventConnId(reqOwner.getEventConnID()); - ji.getMeta().setRunHost(hostName()); ji.getMeta().setAppUrl(ServerContext.getRequestOwner().getBaseUrl()); ji.getMeta().setMonitored(true); // all async jobs are monitored by default @@ -468,7 +469,7 @@ private static void checkJobs() { // kill expired jobs getRunningJobs().forEach(fi -> { - long duration = fi.executionDuration(); + long duration = fi.getExecutionDuration(); if (duration != 0 && fi.getStartTime().plus(duration, ChronoUnit.SECONDS).isBefore(Instant.now())) { abort(fi.getMeta().getJobId(), "Exceeded execution duration"); } @@ -565,13 +566,24 @@ public static void cleanup() { List jobs = getAllJobs(); jobs.forEach(job -> { CacheKey k = cacheKey(job); - if (!job.getMeta().isMonitored() && job.getEndTime().plus(1, ChronoUnit.HOURS).isBefore(Instant.now())) { - LOG.info(" Removing non-monitored job: " + k); + var endTime = job.getEndTime(); + if (endTime == null) return; // skip active jobs + + if (!job.getMeta().isMonitored() && endTime.plus(1, ChronoUnit.HOURS).isBefore(Instant.now())) { + LOG.info("Removing non-monitored job: " + k); allJobInfos.remove(k); // remove non-monitored job after 1 hour + } else if (isActive(job) && hostName().equals(job.getMeta().getRunHost())) { + if (job.getMeta().getType() != UWS && !runningJobs.containsKey(job.getMeta().getJobId())) { + // if the job is active and supposed to run on this host, but we don't have record of it running, remove it + LOG.info("Removing orphan active job: " + k); + allJobInfos.remove(k); + } } else { Instant desDate = job.getDestruction(); - desDate = desDate == null ? job.getCreationTime().plus(JOB_TTL_DAYS, ChronoUnit.DAYS) : desDate; // ensure destruction date has a value - if (desDate.isBefore(Instant.now())) { + if (desDate == null) { + desDate = job.getCreationTime() == null ? null : job.getCreationTime().plus(JOB_TTL_DAYS, ChronoUnit.DAYS); // ensure destruction date has a value + } + if (desDate != null && desDate.isBefore(Instant.now())) { LOG.info(" Removing expired job: " + k); allJobInfos.remove(k); } @@ -580,28 +592,6 @@ public static void cleanup() { LOG.info("JobInfo cleanup finished"); } - /** - * This serializer is used to serialize JobInfo objects for storage in the cache. - * Instead of using the default Java serialization, it uses a JSON string. - */ - private static class JobInfoSerializer implements DistribMapCache.Serializer { - - public String serialize(Object obj) { - if (obj instanceof JobInfo jobInfo) { - return toJson(jobInfo); - } - return null; - } - - public JobInfo deserialize(String str) throws Exception{ - if (str != null) { - return toJobInfo((JSONObject) new JSONParser().parse(str)); - } - return null; - } - } - - //==================================================================== // Utilities: adhoc use cases //==================================================================== @@ -610,7 +600,7 @@ public JobInfo deserialize(String str) throws Exception{ * Migrate all Redis keys that do not have userKey in the key to the new format. * @return the number of keys migrated */ - public static int migrateRedisKeys() { + public static int appendUserKeyToJobId() { // For keys without userKey, we need to migrate them to the new format where userKey is appended to the jobId. int count = 0; for(CacheKey key : allJobInfos.getKeys()) { diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobUtil.java b/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobUtil.java index dc3b3de8d9..d80e5e9777 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobUtil.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/background/JobUtil.java @@ -7,6 +7,7 @@ import edu.caltech.ipac.firefly.server.util.Logger; import edu.caltech.ipac.firefly.util.Ref; import edu.caltech.ipac.util.AppProperties; +import edu.caltech.ipac.util.serialization.Serializer; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.w3c.dom.Document; @@ -20,7 +21,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; -import java.util.Base64; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -116,8 +116,7 @@ public static String jobIdToDir(String jobId) { } public static String toJson(JobInfo info) { - JSONObject jsonObject = toJsonObject(info); - return jsonObject == null ? "null" : jsonObject.toJSONString(); + return info == null ? "null" : Serializer.toJsonString(info); } /** @@ -222,16 +221,17 @@ public static JSONObject toJsonObject(JobInfo info) { applyIfNotEmpty(info.getCreationTime(), v -> rval.put(CREATION_TIME, v.toString())); applyIfNotEmpty(info.getStartTime(), v -> rval.put(START_TIME, v.toString())); applyIfNotEmpty(info.getEndTime(), v -> rval.put(END_TIME, v.toString())); - applyIfNotEmpty(info.executionDuration(), v -> rval.put(EXECUTION_DURATION, v)); + applyIfNotEmpty(info.getExecutionDuration(), v -> rval.put(EXECUTION_DURATION, v)); applyIfNotEmpty(info.getDestruction(), v -> rval.put(DESTRUCTION, v.toString())); - if (!info.getParams().isEmpty()) rval.put(PARAMETERS, info.getParams()); + if (!info.getParameters().isEmpty()) rval.put(PARAMETERS, info.getParameters()); if (!info.getResults().isEmpty()) rval.put(RESULTS, toResults(info.getResults())); - applyIfNotEmpty(info.getError(), v -> { + applyIfNotEmpty(info.getErrorSummary(), v -> { JSONObject errSum = new JSONObject(); - errSum.put(ERROR_MSG, v.msg()); - errSum.put(ERROR_TYPE, v.code() < 500 ? "fatal" : "transient"); // 5xx are typically system error, e.g. server down. + errSum.put(ERROR_MSG, v.message()); + ifNotNull(v.type()).apply((t) -> errSum.put(ERROR_TYPE, t)); + errSum.put(ERROR_HAS_DETAILS, v.hasDetail()); rval.put(ERROR_SUMMARY, errSum); }); @@ -259,7 +259,7 @@ public static JSONObject toJsonObject(JobInfo info) { applyIfNotEmpty(meta.getRunHost(), v -> jsonMeta.put(RUN_HOST, v)); applyIfNotEmpty(meta.getSendNotif(), v -> jsonMeta.put(SEND_NOTIF, v)); - if (!meta.getParams().isEmpty()) jsonMeta.put(PARAMETERS, meta.getParams()); + if (!meta.getParameters().isEmpty()) jsonMeta.put(PARAMETERS, meta.getParameters()); return rval; } @@ -279,14 +279,14 @@ public static JobInfo toJobInfo(JSONObject json) { ifNotNull(json.get(EXECUTION_DURATION)).apply(v -> rval.setExecutionDuration(((Long) v).intValue())); ifNotNull(json.get(DESTRUCTION)).apply(v -> rval.setDestruction(Instant.parse(v.toString()))); - ifNotNull(toParameters(json.get(PARAMETERS))).apply(p -> rval.setParams(p)); + ifNotNull(toParameters(json.get(PARAMETERS))).apply(p -> rval.setParameters(p)); ifNotNull(toResults(json.get(RESULTS))).apply(r -> rval.setResults(r)); ifNotNull(json.get(ERROR_SUMMARY)).apply(v -> { if (v instanceof JSONObject jo) { int code = getInt(jo.get(ERROR_TYPE), 500); String msg = String.valueOf(jo.get(ERROR_MSG)); - rval.setError(new JobInfo.Error(code, msg)); + rval.setErrorSummary(new ErrorSummary(msg)); } }); ifNotNull(json.get(META)).apply(v -> { @@ -304,7 +304,7 @@ public static JobInfo toJobInfo(JSONObject json) { ifNotNull(ji.get(RUN_HOST)).apply(s -> rval.getMeta().setRunHost(s.toString())); ifNotNull(ji.get(SEND_NOTIF)).apply(o -> rval.getMeta().setSendNotif((Boolean) o)); - ifNotNull(toParameters(ji.get(PARAMETERS))).apply(p -> rval.getMeta().setParams(p)); + ifNotNull(toParameters(ji.get(PARAMETERS))).apply(p -> rval.getMeta().setParameters(p)); } }); ifNotNull(json.get(JOB_INFO)).apply(v -> { diff --git a/src/firefly/java/edu/caltech/ipac/firefly/core/background/ServCmdJob.java b/src/firefly/java/edu/caltech/ipac/firefly/core/background/ServCmdJob.java index 8d55128051..461c9158e5 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/core/background/ServCmdJob.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/core/background/ServCmdJob.java @@ -9,7 +9,6 @@ import edu.caltech.ipac.firefly.server.ServerContext; import edu.caltech.ipac.firefly.server.SrvParam; -import java.time.Instant; import java.util.Map; import static edu.caltech.ipac.firefly.core.background.JobManager.*; @@ -53,8 +52,8 @@ public void setJobId(String jobId) { this.jobId = jobId; updateJobInfo(jobId, ji -> { Map p = params.flatten(); - ji.getMeta().setParams(p); - ji.setParams(p); // for non-uws searches. + ji.getMeta().setParameters(p); + ji.setParameters(p); // for non-uws searches. }); } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/data/FileInfo.java b/src/firefly/java/edu/caltech/ipac/firefly/data/FileInfo.java index 77e77cbf63..a76997f9be 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/data/FileInfo.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/data/FileInfo.java @@ -3,6 +3,7 @@ */ package edu.caltech.ipac.firefly.data; +import com.fasterxml.jackson.annotation.JsonAutoDetect; import edu.caltech.ipac.firefly.server.network.HttpServiceInput; import edu.caltech.ipac.util.StringUtils; import edu.caltech.ipac.util.cache.CacheKey; @@ -22,7 +23,11 @@ import static edu.caltech.ipac.firefly.data.HttpResultInfo.SIZE_IN_BYTES; import static edu.caltech.ipac.firefly.data.HttpResultInfo.SUFFIX; - +@JsonAutoDetect( + fieldVisibility = JsonAutoDetect.Visibility.ANY, + getterVisibility = JsonAutoDetect.Visibility.NONE, + setterVisibility = JsonAutoDetect.Visibility.NONE +) public class FileInfo implements HasAccessInfo, Serializable, CacheKey { public static final String INTERNAL_NAME= "internalName"; diff --git a/src/firefly/java/edu/caltech/ipac/firefly/data/RelatedData.java b/src/firefly/java/edu/caltech/ipac/firefly/data/RelatedData.java index b63afa392f..3cc425fe75 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/data/RelatedData.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/data/RelatedData.java @@ -4,6 +4,7 @@ package edu.caltech.ipac.firefly.data; +import com.fasterxml.jackson.annotation.JsonAutoDetect; import edu.caltech.ipac.firefly.visualize.RequestType; import edu.caltech.ipac.firefly.visualize.WebPlotRequest; @@ -15,6 +16,11 @@ /** * @author Trey Roby */ +@JsonAutoDetect( + fieldVisibility = JsonAutoDetect.Visibility.ANY, + getterVisibility = JsonAutoDetect.Visibility.NONE, + setterVisibility = JsonAutoDetect.Visibility.NONE +) public class RelatedData implements Serializable, HasSizeOf { public static final String IMAGE_OVERLAY= "IMAGE_OVERLAY"; public static final String IMAGE_MASK= "IMAGE_MASK"; @@ -36,6 +42,8 @@ public class RelatedData implements Serializable, HasSizeOf { private long size= 0; + protected RelatedData() {this(null, null, null);} + private RelatedData(String dataType, String dataKey, String desc) { this.dataType= dataType; this.dataKey= dataKey; diff --git a/src/firefly/java/edu/caltech/ipac/firefly/data/userdata/UserInfo.java b/src/firefly/java/edu/caltech/ipac/firefly/data/userdata/UserInfo.java index 0f6bd84e1a..5d982e2559 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/data/userdata/UserInfo.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/data/userdata/UserInfo.java @@ -3,6 +3,8 @@ */ package edu.caltech.ipac.firefly.data.userdata; +import com.fasterxml.jackson.annotation.JsonAutoDetect; + import java.io.Serializable; import java.util.HashMap; import java.util.Map; @@ -11,6 +13,11 @@ * @author tatianag * @version $Id: UserInfo.java,v 1.15 2012/07/16 23:30:10 loi Exp $ */ +@JsonAutoDetect( + fieldVisibility = JsonAutoDetect.Visibility.ANY, + getterVisibility = JsonAutoDetect.Visibility.NONE, + setterVisibility = JsonAutoDetect.Visibility.NONE +) public class UserInfo implements Serializable { public static final String GUEST = "Guest"; @@ -81,10 +88,7 @@ public void setLastName(String lastName) { setProperty(LASTNAME, lastName); } - public String getEmail() { - String email = getProperty(EMAIL); - return email == null ? getLoginName() : email; - } + public String getEmail() { return getProperty(EMAIL);} public void setEmail(String email) { setProperty(EMAIL, email); diff --git a/src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java b/src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java index 0fe6c2d595..d9b5ef376a 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/messaging/Messenger.java @@ -14,6 +14,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import static edu.caltech.ipac.firefly.core.Util.Opt.ifNotEmpty; +import static edu.caltech.ipac.util.serialization.Serializer.fromUtf8; +import static edu.caltech.ipac.util.serialization.Serializer.toUtf8; /** * An implementation of publish-subscribe messaging pattern based on Jedis client and Redis backend. @@ -100,7 +102,7 @@ public static Map> getSubscribers() { */ public static void publish(String topic, Message msg) { try { - RedisService.mainConn().async().publish(topic, msg.toJson()); + RedisService.mainConn().async().publish(topic, fromUtf8(msg.toJson())); } catch (Exception e) { LOG.error(e, "Error publishing message to topic: " + topic); } @@ -118,9 +120,10 @@ public static void publish(Message msg) { // A single Listener that dispatches messages to all subscribers //==================================================================== - private static class RedisPubListener extends RedisPubSubAdapter { + private static class RedisPubListener extends RedisPubSubAdapter { @Override - public void message(String channel, String message) { + public void message(String channel, byte[] raw) { + String message = toUtf8(raw); LOG.trace("Received message from channel [" + channel + "]: " + message); List subs = subscribers.get(channel); if (subs != null) { diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/cache/DistribMapCache.java b/src/firefly/java/edu/caltech/ipac/firefly/server/cache/DistribMapCache.java index e9840cadf0..8ec31e9a3d 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/cache/DistribMapCache.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/cache/DistribMapCache.java @@ -33,7 +33,7 @@ public DistribMapCache(String mapKey, long ttl) { this(mapKey, ttl, new DefaultImpl<>()); } - public DistribMapCache(String mapKey, long ttl, Serializer serializer) { + public DistribMapCache(String mapKey, long ttl, ValueSerializer serializer) { super(serializer); this.mapKey = mapKey; this.ttl = ttl; @@ -46,8 +46,8 @@ public List getValuesFor(ScanArgs scanArgs) { ScanCursor cursor = ScanCursor.INITIAL; do { - MapScanCursor scanResult = redis.hscan(mapKey, cursor, scanArgs); - for (Map.Entry entry : scanResult.getMap().entrySet()) { + MapScanCursor scanResult = redis.hscan(mapKey, cursor, scanArgs); + for (Map.Entry entry : scanResult.getMap().entrySet()) { result.add(deserialize(entry.getValue())); } cursor = scanResult; // advance cursor @@ -64,15 +64,15 @@ public List getValuesFor(ScanArgs scanArgs) { // override for Redis Map implementation //==================================================================== - String get(StatefulRedisConnection redis, String key) { + byte[] get(StatefulRedisConnection redis, String key) { return redis.sync().hget(mapKey, key); } - void del(StatefulRedisConnection redis, String key) { + void del(StatefulRedisConnection redis, String key) { redis.sync().hdel(mapKey, key); } - void set(StatefulRedisConnection redis, String key, String value) { + void set(StatefulRedisConnection redis, String key, byte[] value) { var sync = redis.sync(); sync.hset(mapKey, key, value); if (ttl > 0) { @@ -82,19 +82,19 @@ void set(StatefulRedisConnection redis, String key, String value } } - void setex(StatefulRedisConnection redis, String key, String value, long ttl) { + void setex(StatefulRedisConnection redis, String key, byte[] value, long ttl) { set(redis, key, value); // ttl is managed at the map level, not individual keys } - List keys(StatefulRedisConnection redis) { + List keys(StatefulRedisConnection redis) { return new ArrayList<>(redis.sync().hkeys(mapKey)); } - boolean exists(StatefulRedisConnection redis, String key) { + boolean exists(StatefulRedisConnection redis, String key) { return redis.sync().hexists(mapKey, key); } - long size(StatefulRedisConnection redis) { + long size(StatefulRedisConnection redis) { return redis.sync().hlen(mapKey); } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/cache/DistributedCache.java b/src/firefly/java/edu/caltech/ipac/firefly/server/cache/DistributedCache.java index 937f486c93..0580dd17de 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/cache/DistributedCache.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/cache/DistributedCache.java @@ -5,17 +5,21 @@ import edu.caltech.ipac.firefly.core.RedisService; import edu.caltech.ipac.firefly.core.Util; +import edu.caltech.ipac.firefly.core.background.JobInfo; import edu.caltech.ipac.firefly.server.util.Logger; import edu.caltech.ipac.util.AppProperties; import edu.caltech.ipac.util.cache.Cache; import edu.caltech.ipac.util.cache.CacheKey; import edu.caltech.ipac.util.cache.StringKey; +import edu.caltech.ipac.util.serialization.Serializer; import io.lettuce.core.api.StatefulRedisConnection; import javax.annotation.Nonnull; import java.util.List; import java.util.function.Predicate; +import static edu.caltech.ipac.util.serialization.Serializer.*; + /** * This class provides an implementation of a distributed cache using Redis. *

@@ -35,26 +39,26 @@ * @version $Id: EhcacheImpl.java,v 1.8 2009/12/16 21:43:25 loi Exp $ */ public class DistributedCache implements Cache { - public static final int DEF_TTL = AppProperties.getIntProperty("dist.cache.ttl.hours", 14*24) * 60 * 60; // default to 14 days in seconds + public static final int DEF_TTL = AppProperties.getIntProperty("dist.cache.ttl.hours", 7*24) * 60 * 60; // default to 7 days in seconds static final Logger.LoggerImpl LOG = Logger.getLogger(); private static final String BASE64 = "BASE64::"; private transient Predicate getValidator; - private final Serializer serializer; // required; default is DefaultImpl + private final ValueSerializer serializer; // required; default is DefaultImpl public DistributedCache() { this(new DefaultImpl<>()); } - public DistributedCache(Serializer serializer) { + public DistributedCache(ValueSerializer serializer) { this.serializer = serializer; } - String serialize(Object value) { + byte[] serialize(Object value) { return serializer.serialize(value); } - T deserialize(String value) throws Exception { + T deserialize(byte[] value) throws Exception { return serializer.deserialize(value); } @@ -134,33 +138,33 @@ public long getSize() { // Implementation of redis string; override for map, list, and set. //==================================================================== - String get(StatefulRedisConnection redis, String key) { + byte[] get(StatefulRedisConnection redis, String key) { return redis.sync().get(key); } - void del(StatefulRedisConnection redis, String key) { + void del(StatefulRedisConnection redis, String key) { redis.sync().del(key); } - void set(StatefulRedisConnection redis, String key, String value) { + void set(StatefulRedisConnection redis, String key, byte[] value) { redis.sync().set(key, value); } - void setex(StatefulRedisConnection redis, String key, String value, long lifespanInSecs) { + void setex(StatefulRedisConnection redis, String key, byte[] value, long lifespanInSecs) { redis.sync().setex(key, lifespanInSecs, value); } @Nonnull - List keys(StatefulRedisConnection redis) { + List keys(StatefulRedisConnection redis) { var keys = redis.sync().keys("*"); return keys == null ? List.of() : keys.stream().toList(); } - boolean exists(StatefulRedisConnection redis, String key) { + boolean exists(StatefulRedisConnection redis, String key) { return redis.sync().exists(key) > 0; } - long size(StatefulRedisConnection redis) { + long size(StatefulRedisConnection redis) { return redis.sync().dbsize(); } @@ -168,30 +172,42 @@ long size(StatefulRedisConnection redis) { // Utility functions //==================================================================== - public interface Serializer { - String serialize(Object object); - T deserialize(String s) throws Exception; + public interface ValueSerializer { + byte[] serialize(Object object); + T deserialize(byte[] s) throws Exception; } /** * Default implementation of the Serializer. It serializes objects to Base64 strings * using java serialization. When the object is a String, it returns the string directly. */ - public static class DefaultImpl implements Serializer { + public static class DefaultImpl implements ValueSerializer { - public String serialize(Object object) { + public byte[] serialize(Object object) { if (object == null) return null; - if (object instanceof String v) { - return v; - } else { - return BASE64 + Util.serialize(object); - } + return Serializer.toTypedMessagePack(object); } - public T deserialize(String s) throws Exception { - if (s == null) return null; - return !s.startsWith(BASE64) ? (T) s : (T) Util.deserialize(s.substring(BASE64.length())); + public T deserialize(byte[] raw) throws Exception { + if (raw == null) return null; + // prior to MsgPack, JobInfo was stored as JSON string + // for the time being, we will add special case here + // it may be removed in the future + if (looksLikeJson(raw)) { + try { + return (T) Serializer.fromJson(toUtf8(raw), JobInfo.class); + } catch (Exception ignored) {} + } + var val = Serializer.fromTypedMessagePack(raw); + if (val instanceof String s) { + if (s.startsWith(BASE64)) { + return (T) Util.deserialize(s.substring(BASE64.length())); + } else { + return (T) s; + } + } else { + return (T) val; + } } } - } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/events/ReplicatedQueueList.java b/src/firefly/java/edu/caltech/ipac/firefly/server/events/ReplicatedQueueList.java index fe58cda927..078511b963 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/events/ReplicatedQueueList.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/events/ReplicatedQueueList.java @@ -16,23 +16,25 @@ /** * @author Trey Roby */ -class ReplicatedQueueList { +public class ReplicatedQueueList { private static final StringKey HOST_NAME= new StringKey(FileUtil.getHostname()); private static final String REP_QUEUE_MAP = "ReplicatedEventQueueMap"; - private static Cache> getCache() { + private static Cache getCache() { return CacheManager.getDistributedMap(REP_QUEUE_MAP); } + public record EventQueueList(List items) {} + synchronized void setQueueListForNode(List list) { - getCache().put(HOST_NAME, list); + getCache().put(HOST_NAME, new EventQueueList(list)); } synchronized List getCombinedNodeList() { List retList= new ArrayList<>(); - Cache> cache= getCache(); + Cache cache= getCache(); for(CacheKey k : cache.getKeys()) { - retList.addAll(cache.get(k)); + retList.addAll(cache.get(k).items); } return retList; } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventQueue.java b/src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventQueue.java index 9a524b6673..88a333cb97 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventQueue.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/events/ServerEventQueue.java @@ -29,6 +29,10 @@ public class ServerEventQueue implements Serializable { private String userKey; private transient long lastPutTime= 0; + protected ServerEventQueue() { + this.eventTerminal = null; + } + public ServerEventQueue(String connID, String channel, String userKey, EventConnector terminal) { this.connID = connID; this.channel = channel; diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/query/EmbeddedDbProcessor.java b/src/firefly/java/edu/caltech/ipac/firefly/server/query/EmbeddedDbProcessor.java index 3cd833dcd0..fa0e1b7dbb 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/query/EmbeddedDbProcessor.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/query/EmbeddedDbProcessor.java @@ -10,7 +10,6 @@ import edu.caltech.ipac.firefly.server.db.DuckDbReadable; import edu.caltech.ipac.firefly.server.util.Logger; import edu.caltech.ipac.table.TableUtil; -import edu.caltech.ipac.table.io.IpacTableException; import edu.caltech.ipac.table.io.IpacTableWriter; import edu.caltech.ipac.table.io.RegionTableWriter; import edu.caltech.ipac.table.io.VoTableWriter; @@ -47,7 +46,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.function.Consumer; import static edu.caltech.ipac.firefly.core.Util.Opt.ifNotNull; import static edu.caltech.ipac.firefly.data.TableServerRequest.TBL_INDEX; @@ -164,7 +162,7 @@ public DataGroupPart getData(ServerRequest request) throws DataAccessException { results = EmbeddedDbUtil.toDataGroupPart(dg, treq); String error = dbAdapter.handleSqlExp("", e).getCause().getMessage(); // get the message describing the cause of the exception. results.setErrorMsg(error); - sendJobUpdate(ji -> ji.setError( new JobInfo.Error(500, error))); // because an error table is returned + sendJobUpdate(ji -> ji.setErrorSummary( new JobInfo.ErrorSummary(error))); // because an error table is returned } else { throw e; } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/query/UwsJobProcessor.java b/src/firefly/java/edu/caltech/ipac/firefly/server/query/UwsJobProcessor.java index 77791b3986..38bd19d986 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/query/UwsJobProcessor.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/query/UwsJobProcessor.java @@ -5,7 +5,6 @@ import edu.caltech.ipac.firefly.core.background.Job; import edu.caltech.ipac.firefly.core.background.JobManager; -import edu.caltech.ipac.firefly.core.background.JobUtil; import edu.caltech.ipac.firefly.data.TableServerRequest; import edu.caltech.ipac.firefly.server.network.HttpServiceInput; import edu.caltech.ipac.firefly.server.network.HttpServices; @@ -35,7 +34,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import static edu.caltech.ipac.firefly.core.Util.Opt.ifNotEmpty; import static edu.caltech.ipac.firefly.core.Util.Opt.ifNotNull; import static edu.caltech.ipac.firefly.core.background.JobInfo.*; import static edu.caltech.ipac.firefly.core.background.JobManager.getJobInfo; @@ -90,7 +88,7 @@ public void onAbort() { // instead, we will query the jobUrl directly to see if it's aborted JobInfo jobInfo = Try.it(() -> getUwsJobInfo(jobUrl)).get(); if (jobInfo == null || jobInfo.getPhase() != Phase.ABORTED) { - String msg = ifNotNull(jobInfo.getError().msg()).getOrElse("Job cannot be aborted"); + String msg = ifNotNull(jobInfo.getErrorSummary().message()).getOrElse("Job cannot be aborted"); return new Status(400, "Failed to abort: %s".formatted(msg) ); } return Status.ok(); // aborted or no longer active @@ -99,7 +97,7 @@ public void onAbort() { if (status.isError()) { logger.warn(status.getErrMsg()); sendJobUpdate(ji -> { - ji.setError(new JobInfo.Error(status.getStatusCode(), status.getErrMsg())); + ji.setErrorSummary(new ErrorSummary(status.getErrMsg())); }); } else { logger.info("UWS job aborted: " + jobUrl); @@ -130,7 +128,7 @@ public DataGroup fetchDataGroup(TableServerRequest req) throws DataAccessExcepti } } catch (Exception e) { updateJob(ji -> { - ji.setError(new JobInfo.Error(400, e.getMessage())); + ji.setErrorSummary(new ErrorSummary(e.getMessage())); ji.getMeta().setProgress(100); }); throw new DataAccessException(e.getMessage()); @@ -147,7 +145,7 @@ public DataGroup fetchDataGroup(TableServerRequest req) throws DataAccessExcepti JobInfo uwsJob = getUwsJobInfo(jobUrl); if (uwsJob == null) { String msg = "Failed to retrieve UWS job info"; - sendJobUpdate(ji -> ji.setError(new JobInfo.Error(500, msg))); + sendJobUpdate(ji -> ji.setErrorSummary(new ErrorSummary(msg))); throw new DataAccessException(msg); } @@ -165,11 +163,11 @@ public DataGroup fetchDataGroup(TableServerRequest req) throws DataAccessExcepti updateJob(ji -> ji.setPhase(Phase.PENDING)); throw new DataAccessException("The job was submitted, but no execution request has been made."); } else if (phase == Phase.ERROR) { - JobInfo.Error error = getError(uwsJob, jobUrl); - updateJob(ji -> ji.setError(error)); - throw new DataAccessException("Job has failed with the error: " + error.msg()); + ErrorSummary error = getError(uwsJob, jobUrl); + updateJob(ji -> ji.setErrorSummary(error)); + throw new DataAccessException("Job has failed with the error: " + error.message()); } else if (phase == Phase.UNKNOWN) { - updateJob(ji -> ji.setError(new JobInfo.Error(500, "Unknown phase"))); + updateJob(ji -> ji.setErrorSummary(new ErrorSummary("Unknown phase"))); throw new DataAccessException("The job is in an unknown state"); } else { int wait = cnt < 3 ? 500 : cnt < 20 ? 1000 : 2000; @@ -344,8 +342,8 @@ public static Phase getPhase(String jobUrl) throws DataAccessException { } } - public static JobInfo.Error getError(JobInfo uwsJob, String jobUrl) { - JobInfo.Error jobError = uwsJob.getError(); + public static ErrorSummary getError(JobInfo uwsJob, String jobUrl) { + ErrorSummary jobError = uwsJob.getErrorSummary(); if (jobError != null) return jobError; // error is a part of the job resource else { // error document maybe present at /error endpoint of the job String errorUrl = jobUrl + "/error"; @@ -356,7 +354,7 @@ public static JobInfo.Error getError(JobInfo uwsJob, String jobUrl) { return new HttpServices.Status(500, "Unexpected exception: " + e.getMessage()); } }); - return new JobInfo.Error(status.getStatusCode(), status.getErrMsg()); + return new ErrorSummary(status.getErrMsg()); } } @@ -427,9 +425,9 @@ public static JobInfo convertToJobInfo(Document doc) throws Exception { for (int i = 0; i < plist.getLength(); i++) { Node p = plist.item(i); String key = getAttr(p, "id"); - String val = jobInfo.getParams().get(key); + String val = jobInfo.getParameters().get(key); val = isEmpty(val) ? p.getTextContent() : val + PARAM_DELIM + p.getTextContent(); - jobInfo.getParams().put(key, val); + jobInfo.getParameters().put(key, val); } }); @@ -450,10 +448,10 @@ public static JobInfo convertToJobInfo(Document doc) throws Exception { applyIfNotEmpty(getEl(root, prefix + ERROR_SUMMARY), errsum -> { String type = errsum.getAttribute(ERROR_TYPE); - int code = type.equals("transient") ? 500 : 400; + String hasDetails = errsum.getAttribute(ERROR_HAS_DETAILS); String msg = getVal(errsum, prefix + ERROR_MSG); if (!isEmpty(msg)) { - jobInfo.setError(new JobInfo.Error(code, msg)); + jobInfo.setErrorSummary(new ErrorSummary(msg, type, Boolean.parseBoolean(hasDetails))); } }); diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/servlets/ServerStatus.java b/src/firefly/java/edu/caltech/ipac/firefly/server/servlets/ServerStatus.java index 6827899bc3..78410fca4a 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/servlets/ServerStatus.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/servlets/ServerStatus.java @@ -78,7 +78,9 @@ protected void processRequest(HttpServletRequest req, HttpServletResponse res) t if (execGC) System.gc(); // force garbage collection. if (execRedisCleanup) { long keyCount = RedisService.cleanupStaleKeys(); // manually clean up stale Redis keys - writer.println("* Redis cleanup completed. Number of keys removed: " + keyCount); + writer.println("* Redis cleanupStaleKeys completed. Number of keys removed: " + keyCount); + JobManager.cleanup(); + writer.println("* JobManager cleanup completed. Check redis CACHE USAGE SUMMARY for details."); skip(writer); } diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/util/multipart/UploadFileInfo.java b/src/firefly/java/edu/caltech/ipac/firefly/server/util/multipart/UploadFileInfo.java index 56e394f7e6..6897726998 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/util/multipart/UploadFileInfo.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/util/multipart/UploadFileInfo.java @@ -3,6 +3,7 @@ */ package edu.caltech.ipac.firefly.server.util.multipart; +import com.fasterxml.jackson.annotation.JsonAutoDetect; import edu.caltech.ipac.firefly.data.FileInfo; import java.io.File; @@ -14,6 +15,11 @@ * @author loi * @version $Id: UploadFileInfo.java,v 1.1 2010/07/29 00:37:05 loi Exp $ */ +@JsonAutoDetect( + fieldVisibility = JsonAutoDetect.Visibility.ANY, + getterVisibility = JsonAutoDetect.Visibility.NONE, + setterVisibility = JsonAutoDetect.Visibility.NONE +) public class UploadFileInfo implements Serializable { private String pname; private File file; @@ -23,6 +29,8 @@ public class UploadFileInfo implements Serializable { private long size; private transient FileInfo fileInfo; + protected UploadFileInfo() {} + public UploadFileInfo(String pname, File file, String fileName, String contentType, int responseCode) { this.pname = pname; this.file = file; diff --git a/src/firefly/java/edu/caltech/ipac/firefly/server/visualize/ProgressStat.java b/src/firefly/java/edu/caltech/ipac/firefly/server/visualize/ProgressStat.java index 35af616472..b6796e328d 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/server/visualize/ProgressStat.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/server/visualize/ProgressStat.java @@ -24,6 +24,8 @@ public enum PType { DOWNLOADING, READING, CREATING, OTHER, GROUP, SUCCESS, FAIL private final String plotId; private final List memberIDList; + protected ProgressStat() {this(null, null, null, null);} + public ProgressStat(String id, String plotId, PType type, String message) { this.id = id; this.plotId = plotId; diff --git a/src/firefly/java/edu/caltech/ipac/firefly/util/event/Name.java b/src/firefly/java/edu/caltech/ipac/firefly/util/event/Name.java index 5d2bb97d82..a7cf4b382a 100644 --- a/src/firefly/java/edu/caltech/ipac/firefly/util/event/Name.java +++ b/src/firefly/java/edu/caltech/ipac/firefly/util/event/Name.java @@ -3,6 +3,7 @@ */ package edu.caltech.ipac.firefly.util.event; +import com.fasterxml.jackson.annotation.JsonAutoDetect; import edu.caltech.ipac.util.ComparisonUtil; import java.io.Serializable; @@ -17,6 +18,11 @@ /** * @author Trey Roby */ +@JsonAutoDetect( + fieldVisibility = JsonAutoDetect.Visibility.ANY, + getterVisibility = JsonAutoDetect.Visibility.NONE, + setterVisibility = JsonAutoDetect.Visibility.NONE +) public class Name implements Serializable { public static final Name EVT_CONN_EST = new Name("EVT_CONN_EST", diff --git a/src/firefly/java/edu/caltech/ipac/util/serialization/Serializer.java b/src/firefly/java/edu/caltech/ipac/util/serialization/Serializer.java new file mode 100644 index 0000000000..d094f160a7 --- /dev/null +++ b/src/firefly/java/edu/caltech/ipac/util/serialization/Serializer.java @@ -0,0 +1,274 @@ +/* + * License information at https://github.com/Caltech-IPAC/firefly/blob/master/License.txt + */ + +package edu.caltech.ipac.util.serialization; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import edu.caltech.ipac.firefly.core.background.JobInfo; +import edu.caltech.ipac.firefly.server.events.ReplicatedQueueList; +import edu.caltech.ipac.firefly.server.events.ServerEventQueue; +import org.msgpack.jackson.dataformat.MessagePackFactory; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; +import edu.caltech.ipac.firefly.core.background.JobManager; +import edu.caltech.ipac.firefly.data.FileInfo; +import edu.caltech.ipac.firefly.data.ServerEvent; +import edu.caltech.ipac.firefly.data.userdata.UserInfo; +import edu.caltech.ipac.firefly.server.util.multipart.UploadFileInfo; +import edu.caltech.ipac.firefly.server.visualize.ProgressStat; + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public final class Serializer { + + private static final ObjectMapper JSON_MAPPER = baseMapper(new ObjectMapper()); + private static final ObjectMapper MSGPACK_MAPPER = baseMapper(new ObjectMapper(new MessagePackFactory())); + + private static ObjectMapper baseMapper(ObjectMapper mapper) { + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); // Unknown properties are silently ignored + mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); // Omits fields that are empty (lists, strings, arrays, maps) during serialization + return mapper; + } + + public static ObjectMapper getJsonMapper() { return JSON_MAPPER; } + public static ObjectMapper getMsgpackMapper() { return MSGPACK_MAPPER; } + + + /** + * Typed MessagePack envelope format; Class type is stored as part of the payload. + * [ 0xC1 0xC1 0x01 ] (magic + version; invalid UTF-8 bytes prevent text collision) + * [ u16 typeId ] (unsigned short) + * [ msgpack bytes ] (payload) + */ + private static final byte[] TYPED_MAGIC = new byte[] { (byte) 0xC1, (byte) 0xC1, (byte) 0x01 }; + + /** + * Registry: typeId <-> class. + * Keep IDs stable over time. Add new types by appending new IDs; don't renumber. + * NOTE: You must register any types you intend to deserialize. + * This keeps the payload small while still allowing automatic type resolution during deserialization. + */ + private static final Map> ID_TO_CLASS; + private static final Map, Short> CLASS_TO_ID; + + static { + // Register a type id <-> class mapping for typed MessagePack. + Map> idToClass = Map.ofEntries( + Map.entry((short) 1, File.class), + Map.entry((short) 2, Character.class), + Map.entry((short) 100, JobInfo.class), + Map.entry((short) 101, FileInfo.class), + Map.entry((short) 102, JobManager.BackGroundInfo.class), + Map.entry((short) 103, UserInfo.class), + Map.entry((short) 104, ProgressStat.class), + Map.entry((short) 105, UploadFileInfo.class), + Map.entry((short) 106, ServerEvent.class), + Map.entry((short) 107, ServerEventQueue.class), + Map.entry((short) 108, ReplicatedQueueList.EventQueueList.class) + ); + Map> id2c = new HashMap<>(idToClass); + Map, Short> c2id = new HashMap<>(id2c.size() * 2); + + for (Map.Entry> e : id2c.entrySet()) { + Short id = e.getKey(); + Class cls = e.getValue(); + if (id == null || cls == null) continue; + if (c2id.containsKey(cls)) { + throw new IllegalArgumentException("Duplicate class registered: " + cls.getName()); + } + c2id.put(cls, id); + } + ID_TO_CLASS = Collections.unmodifiableMap(id2c); + CLASS_TO_ID = Collections.unmodifiableMap(c2id); + } + +//==================================================================== +// Typed MessagePack API +//==================================================================== + + /** + * Serializes the value as typed MessagePack if its class is registered; + * otherwise, serializes it as plain MessagePack. + * Supported classes are registered statically in this class at initialization time. + */ + public static byte[] toTypedMessagePack(Object value) { + if (value == null) return null; + byte[] payload = toMessagePack(value); + + Short typeId = CLASS_TO_ID.get(value.getClass()); + if (typeId == null) { + return payload; // not registered; return plain MessagePack + } + + // [magic][u16 typeId][payload] + ByteBuffer bb = ByteBuffer.allocate(TYPED_MAGIC.length + 2 + payload.length); + bb.put(TYPED_MAGIC); + bb.putShort(typeId); + bb.put(payload); + return bb.array(); + } + + /** + * Deserialize a typed MessagePack envelope into its registered Java class if registered. + * Otherwise, deserialize as plain MessagePack. + */ + public static Object fromTypedMessagePack(byte[] data) { + if (data == null) return null; + + try { + + if (isTypedMessagePack(data)) { + ByteBuffer bb = ByteBuffer.wrap(data); + bb.position(TYPED_MAGIC.length); + + short typeId = bb.getShort(); + Class cls = ID_TO_CLASS.get(typeId); + if (cls == null) { + throw new SerializationException("Unknown typeId for typed MessagePack: " + (typeId & 0xFFFF), null); + } + + byte[] payload = new byte[bb.remaining()]; + bb.get(payload); + + return fromMessagePack(payload, cls); + } else if(isMsgPack(data)) { + return fromMessagePack(data, Object.class); // plain MessagePack + } else { + // Previously, strings were stored as UTF-8 bytes, not MessagePack. + // Temporary workaround for backward compatibility. + return toUtf8(data); + } + } catch (SerializationException e) { + throw e; + } catch (Exception e) { + throw new SerializationException("Typed MessagePack deserialization failed", e); + } + } + + /** True if bytes begin with the typed MessagePack magic header. */ + public static boolean isTypedMessagePack(byte[] data) { + if (data == null || data.length < TYPED_MAGIC.length + 2) return false; // +2 for typeId + for (int i = 0; i < TYPED_MAGIC.length; i++) { + if (data[i] != TYPED_MAGIC[i]) return false; + } + return true; + } + + +//==================================================================== +// MessagePack API without type info +//==================================================================== + + public static byte[] toMessagePack(Object value) { + try { + return getMsgpackMapper().writeValueAsBytes(value); + } catch (Exception e) { + throw new SerializationException("MessagePack serialization failed", e); + } + } + + public static T fromMessagePack(byte[] data, Class type) { + if (data == null) return null; + try { + return getMsgpackMapper().readValue(data, type); + } catch (Exception e) { + throw new SerializationException("MessagePack deserialization failed", e); + } + } + +//==================================================================== +// JSON API +//==================================================================== + + public static byte[] toJsonBytes(Object value) { + try { + return getJsonMapper().writeValueAsBytes(value); + } catch (Exception e) { + throw new SerializationException("JSON serialization failed", e); + } + } + + public static String toJsonString(Object value) { + try { + return getJsonMapper().writeValueAsString(value); + } catch (Exception e) { + throw new SerializationException("JSON serialization failed", e); + } + } + + public static T fromJson(byte[] data, Class type) { + if (data == null) return null; + try { + return getJsonMapper().readValue(data, type); + } catch (Exception e) { + throw new SerializationException("JSON deserialization failed", e); + } + } + + public static T fromJson(String json, Class type) { + if (json == null) return null; + try { + return getJsonMapper().readValue(json, type); + } catch (Exception e) { + throw new SerializationException("JSON deserialization failed", e); + } + } + +//==================================================================== +// Helper methods +//==================================================================== + + public static boolean isMsgPack(byte[] raw) { + if (raw == null || raw.length == 0) return false; + try (MessageUnpacker u = MessagePack.newDefaultUnpacker(raw)) { + u.unpackValue(); // parse one complete value + return !u.hasNext(); // ensure no trailing junk + } catch (Exception e) { + return false; + } + } + + /** + * Attempt MessagePack first, then fall back to JSON. + * Intended for Redis migration. + */ + public static T fromMessagePackOrJson(byte[] data, Class type) { + if (data == null) return null; + try { + return fromMessagePack(data, type); + } catch (Exception mpFail) { + return fromJson(data, type); + } + } + + public static boolean looksLikeJson(byte[] data) { + if (data == null || data.length == 0) return false; + byte b = data[0]; + return b == '{' || b == '['; + } + + public static String toUtf8(byte[] data) { return data == null ? null : new String(data, StandardCharsets.UTF_8); } + public static byte[] fromUtf8(String data) { return data == null ? null : data.getBytes(StandardCharsets.UTF_8); } + +//==================================================================== +// Inner Classes +//==================================================================== + + public static class SerializationException extends RuntimeException { + public SerializationException(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/src/firefly/test/edu/caltech/ipac/firefly/core/RedisServiceTest.java b/src/firefly/test/edu/caltech/ipac/firefly/core/RedisServiceTest.java index cb742245f6..fc194383ed 100644 --- a/src/firefly/test/edu/caltech/ipac/firefly/core/RedisServiceTest.java +++ b/src/firefly/test/edu/caltech/ipac/firefly/core/RedisServiceTest.java @@ -14,6 +14,8 @@ import org.junit.Ignore; import org.junit.Test; +import static edu.caltech.ipac.util.serialization.Serializer.fromUtf8; +import static edu.caltech.ipac.util.serialization.Serializer.toUtf8; import static org.junit.Assert.assertEquals; /** @@ -82,8 +84,8 @@ private void testRedis() { try { var redis = RedisService.mainConn().sync(); - redis.setex("key1", 1, "val1"); - assertEquals("val1", redis.get("key1")); + redis.setex("key1", 1, fromUtf8("val1")); + assertEquals("val1", toUtf8(redis.get("key1"))); Thread.sleep(3_000); // should be expired after 3 seconds assertEquals(0, (long) redis.exists("key1")); } catch (Exception e) { diff --git a/src/firefly/test/edu/caltech/ipac/firefly/core/background/JobManagerTest.java b/src/firefly/test/edu/caltech/ipac/firefly/core/background/JobManagerTest.java index c787999d71..cd10ca9a53 100644 --- a/src/firefly/test/edu/caltech/ipac/firefly/core/background/JobManagerTest.java +++ b/src/firefly/test/edu/caltech/ipac/firefly/core/background/JobManagerTest.java @@ -86,27 +86,27 @@ public void loadRedisDB() throws Exception { AppProperties.setProperty("job.wait.complete", "0"); Logger.setLogLevel(Level.INFO); -// ServerContext.getRequestOwner().setWsConnInfo("test", "test"); -// for(int i =0; i < 100_000; i++) { +// ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414629999"); // ridiculously large amount of jobs +// for(int i =0; i < 2_000_000; i++) { // JobManager.submit(new SleepJob(Job.Type.PACKAGE, ServerContext.getRequestOwner().getEventConnID())); // } - ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414629999"); // load 9999 with 100k; extreme + ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414629999"); // load user x9999 with 100k; extreme for(int i =0; i < 100_000; i++) { JobManager.submit(new SleepJob(Job.Type.PACKAGE, ServerContext.getRequestOwner().getEventConnID())); } - ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414621000"); // load 1000 with 1k; rare + ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414621000"); // load user x1000 with 1k; rare for(int i =0; i < 1000; i++) { JobManager.submit(new SleepJob(Job.Type.PACKAGE, ServerContext.getRequestOwner().getEventConnID())); } - ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414620100"); // load 0100 with 100; normal + ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414620100"); // load user x0100 with 100; normal for(int i =0; i < 100; i++) { JobManager.submit(new SleepJob(Job.Type.PACKAGE, ServerContext.getRequestOwner().getEventConnID())); } - ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414620010"); // load 0010 with 10; less common + ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414620010"); // load user x0010 with 10; less common for(int i =0; i < 10; i++) { JobManager.submit(new SleepJob(Job.Type.PACKAGE, ServerContext.getRequestOwner().getEventConnID())); } @@ -118,8 +118,8 @@ public void loadTest() throws Exception { /* Performance results using Jedis vs Lettuce are identical with embedded Redis server. Cache keys count: 101,104 - All Jobs: 1,165ms with 101,104 jobs - User Jobs: 936ms with 99,994 jobs # confirmed only 99,994 jobs for 9999; not sure why 6 missing + All Jobs: 1,165ms with 101,110 jobs + User Jobs: 936ms with 100,000 jobs User Jobs: 94ms with 1,000 jobs User Jobs: 84ms with 100 jobs User Jobs: 92ms with 10 jobs @@ -135,28 +135,40 @@ public void loadTest() throws Exception { int aCount = JobManager.getAllJobs().size(); System.out.printf("All Jobs: %,dms with %,d jobs %n", Duration.between(start, Instant.now()).toMillis(), aCount); - ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414629999"); // get jobs for 9999 with 100k jobs + ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414629999"); // get jobs for user x9999 with 100k jobs start = Instant.now(); int count = JobManager.getUserJobs().size(); System.out.printf("User Jobs: %,dms with %,d jobs %n", Duration.between(start, Instant.now()).toMillis(), count); - ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414621000"); // get jobs for 1000 with 1k jobs + ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414621000"); // get jobs for user x1000 with 1k jobs start = Instant.now(); count = JobManager.getUserJobs().size(); System.out.printf("User Jobs: %,dms with %,d jobs %n", Duration.between(start, Instant.now()).toMillis(), count); - ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414620100"); // get jobs for 0100 with 100 jobs + ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414620100"); // get jobs for user x0100 with 100 jobs start = Instant.now(); count = JobManager.getUserJobs().size(); System.out.printf("User Jobs: %,dms with %,d jobs %n", Duration.between(start, Instant.now()).toMillis(), count); - ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414620010"); // get jobs for 0010 with 10 jobs + ServerContext.getRequestOwner().setUserKey("59bac3e4-6dc7-4b74-a83d-a35414620010"); // get jobs for user x0010 with 10 jobs start = Instant.now(); count = JobManager.getUserJobs().size(); System.out.printf("User Jobs: %,dms with %,d jobs %n", Duration.between(start, Instant.now()).toMillis(), count); } + @Category({TestCategory.Perf.class}) + @Test + public void cleanupTest() throws Exception { + /* + cleanup called on redis with 2_303_348 jobs ~ 2.8 minutes + 13618 remaining jobs after cleanup; 2,289,730 removed. + cleanup: 170,493ms with 2,303,348 jobs + */ + Instant start = Instant.now(); + JobManager.cleanup(); + System.out.printf("cleanup: %,dms with %,d jobs %n", Duration.between(start, Instant.now()).toMillis(), 2_303_348); + } private static class SleepJob extends ServCmdJob implements Job.Worker { Job.Type type; diff --git a/src/firefly/test/edu/caltech/ipac/util/serialization/SerializerTest.java b/src/firefly/test/edu/caltech/ipac/util/serialization/SerializerTest.java new file mode 100644 index 0000000000..82e4ab12e8 --- /dev/null +++ b/src/firefly/test/edu/caltech/ipac/util/serialization/SerializerTest.java @@ -0,0 +1,551 @@ +/* + * License information at https://github.com/Caltech-IPAC/firefly/blob/master/License.txt + */ + +package edu.caltech.ipac.util.serialization; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import edu.caltech.ipac.firefly.core.Util; +import edu.caltech.ipac.firefly.core.background.JobInfo; +import edu.caltech.ipac.firefly.core.background.JobManager; +import edu.caltech.ipac.firefly.core.background.JobUtil; +import edu.caltech.ipac.firefly.data.FileInfo; +import edu.caltech.ipac.firefly.data.RelatedData; +import edu.caltech.ipac.firefly.data.ServerEvent; +import edu.caltech.ipac.firefly.data.userdata.RoleList; +import edu.caltech.ipac.firefly.data.userdata.UserInfo; +import edu.caltech.ipac.firefly.server.events.ReplicatedQueueList; +import edu.caltech.ipac.firefly.server.events.ServerEventQueue; +import edu.caltech.ipac.firefly.server.network.HttpServiceInput; +import edu.caltech.ipac.firefly.server.util.multipart.UploadFileInfo; +import edu.caltech.ipac.firefly.server.visualize.ProgressStat; +import edu.caltech.ipac.firefly.util.event.Name; +import org.junit.Test; + +import java.io.File; +import java.time.OffsetDateTime; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import static edu.caltech.ipac.util.serialization.Serializer.getJsonMapper; +import static org.junit.Assert.*; +import static edu.caltech.ipac.firefly.core.Util.Try; + +public class SerializerTest { + + private JobInfo createSampleJob() { + String json = """ + { + "phase": "EXECUTING", + "jobId": "job-123", + "creationTime": "2025-01-01T10:00:00Z", + "meta": { + "summary": "Running", + "progress": 50, + "sendNotif": false, + "monitored": false + }, + "executionDuration": 3600, + "ownerId": "user1", + "parameters": { + "ra": "10.5", + "dec": "-2.1" + }, + "errorSummary": { + "message": "error message" + }, + "results": [ + { + "size": "12345", + "id": "result1", + "href": "http://example/result.fits", + "mimeType": "image/fits" + } + ], + "jobInfo": { + "jobUrl": "http://example/job/123", + "userName": "Jane Doe" + } + } + """; + + JobInfo job = Serializer.fromJson(json, JobInfo.class); + assertNotNull("Failed to parse JobInfo from JSON test fixture", job); + return job; + } + + @Test + public void reducedPayloadTest() { + JobInfo job = createSampleJob(); + + byte[] json = JobUtil.toJsonObject(job).toJSONString().getBytes(); + byte[] msgpack = Serializer.toMessagePack(job); + String base64 = Util.serialize(job); + + assertNotNull(json); + assertNotNull(msgpack); + assertNotNull(base64); + // Size reduced from 476 to 367 (~23% smaller) + assertTrue( + "Expected MessagePack payload to be smaller than JSON", + msgpack.length < json.length + ); + // Size reduced from 2460 to 367 (~85% smaller) + assertTrue( + "Expected MessagePack payload to be smaller than base64", + msgpack.length < base64.length() + ); + } + + @Test + public void jobInfoMsgPackRoundTrip() { + JobInfo original = createSampleJob(); + + byte[] bytes = Serializer.toTypedMessagePack(original); + assertNotNull(bytes); + assertTrue(bytes.length > 0); + + JobInfo decoded = (JobInfo) Serializer.fromTypedMessagePack(bytes); + assertNotNull(decoded); + + assertEquals(original.getJobId(), decoded.getJobId()); + assertEquals(original.getOwnerId(), decoded.getOwnerId()); + assertEquals(original.getPhase(), decoded.getPhase()); + assertEquals(original.getExecutionDuration(), decoded.getExecutionDuration()); + assertEquals(original.getParameters(), decoded.getParameters()); + assertEquals( + original.getMeta().getProgress(), + decoded.getMeta().getProgress() + ); + assertEquals( + original.getAux().getUserName(), + decoded.getAux().getUserName() + ); + } + + @Test + public void jobInfoJsonRoundTrip() { + JobInfo original = createSampleJob(); + + String json = Serializer.toJsonString(original); + assertNotNull(json); + assertTrue(json.startsWith("{")); + + JobInfo decoded = Serializer.fromJson(json, JobInfo.class); + assertNotNull(decoded); + + assertEquals(original.getJobId(), decoded.getJobId()); + assertEquals(original.getPhase(), decoded.getPhase()); + assertEquals( + original.getMeta().getSummary(), + decoded.getMeta().getSummary() + ); + } + + @Test + public void msgPackOrJson() { + JobInfo original = createSampleJob(); + + byte[] jsonBytes = Serializer.toJsonBytes(original); + + JobInfo decoded = Serializer.fromMessagePackOrJson( + jsonBytes, + JobInfo.class + ); + + assertNotNull(decoded); + assertEquals(original.getJobId(), decoded.getJobId()); + assertEquals(original.getPhase(), decoded.getPhase()); + } + + @Test + public void nullInputs() { + assertNull(Serializer.fromMessagePack(null, JobInfo.class)); + assertNull(Serializer.fromJson((byte[]) null, JobInfo.class)); + assertNull(Serializer.fromJson((String) null, JobInfo.class)); + } + + @Test + public void jsonJobInfoBackwardCompatible() { + JobInfo job = createSampleJob(); + String cJsonString = Serializer.toJsonString(job); + String pJsonString = JobUtil.toJsonObject(job).toJSONString(); + ObjectMapper mapper = getJsonMapper(); + JsonNode c = Try.it(() -> mapper.readTree(cJsonString)).get(); + JsonNode p = Try.it(() -> mapper.readTree(pJsonString)).get(); + + assertEquals("JSON output should match previous implementation", c, p); + } + + @Test + public void fileInfoTest() { + FileInfo fi = new FileInfo("/abc/xyz.txt", "xyz.txt", 1024L); + fi.setDesc("test description"); + fi.setSuffix("txt"); + fi.setHasAccess(true); + + RelatedData rd = RelatedData.makeMaskRelatedData(111, "file.fits", Map.of("header", "value"), false, 2, "mask-data-key"); + fi.addRelatedData(rd); + + HttpServiceInput si = new HttpServiceInput("http://example.com"); + si.setFile("file.fits", new File("/abc/file.fits")); + si.setHeader("X-Custom-Header", "CustomValue"); + fi.setRequestInfo(si); + + byte[] msgpack = Serializer.toMessagePack(fi); + FileInfo mp = Serializer.fromMessagePack(msgpack, FileInfo.class); + assertNotNull(mp); + + // ---------- identity / core fields ---------- + assertEquals(fi.getInternalFilename(), mp.getInternalFilename()); + assertEquals(fi.getExternalName(), mp.getExternalName()); + assertEquals(fi.getSizeInBytes(), mp.getSizeInBytes()); + + // ---------- attribute-backed fields ---------- + assertEquals(fi.getDesc(), mp.getDesc()); + assertEquals(fi.getSuffix(), mp.getSuffix()); + assertEquals(fi.hasAccess(), mp.hasAccess()); + + // ---------- derived/default behavior ---------- + assertFalse(mp.isBlank()); + assertEquals(200, mp.getResponseCode()); + assertNull(mp.getResponseCodeMsg()); + assertNull(mp.getContentType()); + + // ---------- attribute map integrity ---------- + assertEquals(fi.getAttributeMap(), mp.getAttributeMap()); + + // ---------- relatedData (should survive) ---------- + assertNotNull(mp.getRelatedData()); + assertEquals(1, mp.getRelatedData().size()); + + RelatedData mpRd = mp.getRelatedData().get(0); + assertEquals(rd.getDataType(), mpRd.getDataType()); + assertEquals(rd.getDataKey(), mpRd.getDataKey()); + assertEquals(rd.getDesc(), mpRd.getDesc()); + assertEquals(rd.getHduIdx(), mpRd.getHduIdx()); + assertEquals(rd.getSearchParams(), mpRd.getSearchParams()); + + // ---------- requestInfo (should survive if serializable) ---------- + assertNotNull(mp.getRequestInfo()); + assertEquals(fi.getRequestInfo().getRequestUrl(), mp.getRequestInfo().getRequestUrl()); + assertEquals(fi.getRequestInfo().getHeaders(), mp.getRequestInfo().getHeaders()); + assertEquals( fi.getRequestInfo().getFiles().keySet(), mp.getRequestInfo().getFiles().keySet()); + + // ---------- transient / non-serialized fields ---------- + assertFalse(mp.hasFileNameResolver()); + + } + + @Test + public void UserInfoTest() throws Exception { + + UserInfo original = new UserInfo("test-user", "Test"); + original.setEmail("user@acme.org"); + original.setLastName("User"); + original.setFirstName("Test"); + original.setUserId(123); + original.setProperty("prop1", "value1"); + original.setProperty("prop2", "value2"); + RoleList roles= new RoleList(); + roles.add(new RoleList.RoleEntry("admin",1,"group1",10,"READ")); + roles.add(new RoleList.RoleEntry("missionX",-1,"ALL",-1,"ADMIN")); + original.setRoles(roles); + + byte[] bytes = Serializer.toMessagePack(original); + assertNotNull(bytes); + assertTrue(bytes.length > 0); + + UserInfo decoded = Serializer.fromMessagePack(bytes, UserInfo.class); + + assertNotNull(decoded); + assertEquals(original.getLoginName(), decoded.getLoginName()); + assertEquals(original.getPassword(), decoded.getPassword()); + assertEquals(original.getFirstName(), decoded.getFirstName()); + assertEquals(original.getLastName(), decoded.getLastName()); + assertEquals(original.getUserId(), decoded.getUserId()); + assertEquals(original.getProperties().size(), decoded.getProperties().size()); + assertEquals(original.getProperties().get("prop1"), decoded.getProperties().get("prop1")); + assertEquals(original.getProperties().get("prop2"), decoded.getProperties().get("prop2")); + assertNotNull(decoded.getRoles()); + assertEquals(2, decoded.getRoles().size()); + RoleList.RoleEntry re1= decoded.getRoles().get(0); + assertEquals("admin", re1.getMissionName()); + assertEquals(1, re1.getMissionId()); + assertEquals("group1", re1.getGroupName()); + assertEquals(10, re1.getGroupId()); + assertEquals("READ", re1.getPrivilege()); + RoleList.RoleEntry re2= decoded.getRoles().get(1); + assertEquals("missionX", re2.getMissionName()); + assertEquals(-1, re2.getMissionId()); + assertEquals("ALL", re2.getGroupName()); + assertEquals(-1, re2.getGroupId()); + assertEquals("ADMIN", re2.getPrivilege()); + } + + @Test + public void serverEventTest() throws Exception { + + ServerEvent.EventTarget target = new ServerEvent.EventTarget(ServerEvent.Scope.USER, "c1", "ch1", "u1"); + ServerEvent original = new ServerEvent( + Name.EVT_CONN_EST, + target, + ServerEvent.DataType.JSON, + "{\"x\":1}", + "from-123" + ); + + byte[] bytes = Serializer.toMessagePack(original); + assertNotNull(bytes); + assertTrue(bytes.length > 0); + + ServerEvent decoded = Serializer.fromMessagePack(bytes, ServerEvent.class); + + assertNotNull(decoded); + assertEquals(original.getName(), decoded.getName()); + + assertNotNull(decoded.getTarget()); + assertEquals(original.getTarget().getScope(), decoded.getTarget().getScope()); + assertEquals(original.getTarget().getConnID(), decoded.getTarget().getConnID()); + assertEquals(original.getTarget().getChannel(), decoded.getTarget().getChannel()); + assertEquals(original.getTarget().getUserKey(), decoded.getTarget().getUserKey()); + + assertEquals(original.getDataType(), decoded.getDataType()); + assertEquals(original.getData(), decoded.getData()); + assertEquals(original.getFrom(), decoded.getFrom()); + } + + @Test + public void serverEventQueueTest() throws Exception { + ServerEventQueue original = new ServerEventQueue( + "conn-123", + "channel-xyz", + "userkey-abc", + null + ); + byte[] bytes = Serializer.toTypedMessagePack(original); + assertNotNull(bytes); + assertTrue(bytes.length > 0); + ServerEventQueue decoded = (ServerEventQueue) Serializer.fromTypedMessagePack(bytes); + assertNotNull(decoded); + assertEquals(original.getConnID(), decoded.getConnID()); + assertEquals(original.getChannel(), decoded.getChannel()); + assertEquals(original.getUserKey(), decoded.getUserKey()); + } + + @Test + public void eventQueueListTest() { + List list = List.of( + new ServerEventQueue("c1","ch1","u1",null), + new ServerEventQueue("c2","ch2","u2",null), + new ServerEventQueue("c3","ch3","u3",null) + ); + ReplicatedQueueList.EventQueueList original = new ReplicatedQueueList.EventQueueList(list); + byte[] bytes = Serializer.toTypedMessagePack(original); + assertNotNull(bytes); + assertTrue(bytes.length > 0); + ReplicatedQueueList.EventQueueList decoded = (ReplicatedQueueList.EventQueueList) Serializer.fromTypedMessagePack(bytes); + assertNotNull(decoded); + assertEquals(original.items().size(), decoded.items().size()); + for (int i = 0; i < original.items().size(); i++) { + ServerEventQueue oQ = original.items().get(i); + ServerEventQueue dQ = decoded.items().get(i); + assertEquals(oQ.getConnID(), dQ.getConnID()); + assertEquals(oQ.getChannel(), dQ.getChannel()); + assertEquals(oQ.getUserKey(), dQ.getUserKey()); + } + } + + @Test + public void backgroundInfoTest() throws Exception { + JobManager.BackGroundInfo original = new JobManager.BackGroundInfo(false, "notify@ipac.caltech.edu"); + + byte[] bytes = Serializer.toMessagePack(original); + assertNotNull(bytes); + assertTrue(bytes.length > 0); + + JobManager.BackGroundInfo decoded = Serializer.fromMessagePack(bytes, JobManager.BackGroundInfo.class); + + assertEquals(original, decoded); + assertEquals(original.notifEnabled(), decoded.notifEnabled()); + assertEquals(original.email(), decoded.email()); + } + + @Test + public void uploadFileInfoTest() { + + File file = new File("/tmp/upload-test.txt"); // does not need to exist + UploadFileInfo original = new UploadFileInfo( + "param1", + file, + "upload-test.txt", + "text/plain", + 201 + ); + + // ---------- serialize ---------- + byte[] msgpack = Serializer.toMessagePack(original); + assertNotNull(msgpack); + assertTrue(msgpack.length > 0); + + // ---------- deserialize ---------- + UploadFileInfo decoded = Serializer.fromMessagePack(msgpack, UploadFileInfo.class); + + assertNotNull(decoded); + + assertEquals(original.getPname(), decoded.getPname()); + assertEquals(original.getFileName(), decoded.getFileName()); + assertEquals(original.getContentType(), decoded.getContentType()); + assertEquals(original.getResponseCode(), decoded.getResponseCode()); + assertEquals(original.getSize(), decoded.getSize()); + + assertNotNull(decoded.getFile()); + assertEquals( + original.getFile().getPath(), + decoded.getFile().getPath() + ); + + // ---------- toString sanity ---------- + assertTrue(decoded.toString().contains(original.getFileName())); + } + + @Test + public void progressStatTest() { + + ProgressStat normal = new ProgressStat( + "job-123", + "plot-456", + ProgressStat.PType.DOWNLOADING, + "Downloading data" + ); + + ProgressStat normalDecoded = + Serializer.fromMessagePack( + Serializer.toMessagePack(normal), + ProgressStat.class + ); + + assertNotNull(normalDecoded); + assertEquals(normal.getId(), normalDecoded.getId()); + assertEquals(normal.getPlotId(), normalDecoded.getPlotId()); + assertEquals(normal.getMessage(), normalDecoded.getMessage()); + assertEquals(ProgressStat.PType.DOWNLOADING, normalDecoded.getType()); + assertFalse(normalDecoded.isGroup()); + assertFalse(normalDecoded.isDone()); + + // ============================================================ + // group progress + // ============================================================ + List members = List.of("id-1", "id-2", "id-3"); + ProgressStat group = new ProgressStat(members, "group-999"); + + ProgressStat groupDecoded = + Serializer.fromMessagePack( + Serializer.toMessagePack(group), + ProgressStat.class + ); + + assertNotNull(groupDecoded); + assertEquals(group.getId(), groupDecoded.getId()); + assertEquals(ProgressStat.PType.GROUP, groupDecoded.getType()); + assertTrue(groupDecoded.isGroup()); + assertEquals(members, groupDecoded.getMemberIDList()); + assertNull(groupDecoded.getMessage()); + assertNull(groupDecoded.getPlotId()); + assertFalse(groupDecoded.isDone()); + + // ============================================================ + // terminal states (SUCCESS / FAIL) + // ============================================================ + ProgressStat success = new ProgressStat( + "job-success", + "plot-1", + ProgressStat.PType.SUCCESS, + "Completed" + ); + + ProgressStat fail = new ProgressStat( + "job-fail", + "plot-1", + ProgressStat.PType.FAIL, + "Failed" + ); + + ProgressStat successDecoded = Serializer.fromMessagePack( Serializer.toMessagePack(success), ProgressStat.class); + ProgressStat failDecoded = Serializer.fromMessagePack( Serializer.toMessagePack(fail), ProgressStat.class); + + assertTrue(successDecoded.isDone()); + assertTrue(failDecoded.isDone()); + assertEquals(ProgressStat.PType.SUCCESS, successDecoded.getType()); + assertEquals(ProgressStat.PType.FAIL, failDecoded.getType()); + } + + @Test + public void unregisteredTypesTest() { + + // Common types supported without explicit type registration + byte[] msgpack = Serializer.toTypedMessagePack("ping"); + String aString = (String) Serializer.fromTypedMessagePack(msgpack); + assertEquals("ping", aString); + + msgpack = Serializer.toTypedMessagePack(true); + boolean aBoolean = (boolean) Serializer.fromTypedMessagePack(msgpack); + assertTrue(aBoolean); + + msgpack = Serializer.toTypedMessagePack(1); + int one = (int) Serializer.fromTypedMessagePack(msgpack); + assertEquals(1, one); + + msgpack = Serializer.toTypedMessagePack(1_000); + int anInt = (int) Serializer.fromTypedMessagePack(msgpack); + assertEquals(1_000, anInt); + + msgpack = Serializer.toTypedMessagePack(1.2); + double aDouble = (double) Serializer.fromTypedMessagePack(msgpack); + assertEquals(1.2, aDouble, 0.00001); + + msgpack = Serializer.toTypedMessagePack(3_000_000_000L); + long aLong = (long) Serializer.fromTypedMessagePack(msgpack); + assertEquals(3_000_000_000L, aLong); + + List aList = List.of("one", 2, 3.0); + msgpack = Serializer.toTypedMessagePack(aList); + List dList = (List) Serializer.fromTypedMessagePack(msgpack); + assertEquals(aList.size(), dList.size()); + assertEquals(aList.get(0), dList.get(0)); + assertEquals(aList.get(1), dList.get(1)); + assertEquals(aList.get(2), dList.get(2)); + + Map aMap = Map.of("str", "one", "int", 2, "double", 3.0); + msgpack = Serializer.toTypedMessagePack(aMap); + Map dMap = (Map) Serializer.fromTypedMessagePack(msgpack); + assertEquals(aMap.size(), dMap.size()); + assertEquals(aMap.get("str"), dMap.get("str")); + assertEquals(aMap.get("int"), dMap.get("int")); + assertEquals(aMap.get("double"), dMap.get("double")); + + //==================================================================== + // Less used types that is not handled correctly... register if needed + //==================================================================== + + // Date type is saved as ISO-8601 string with offset + Date date = new Date(); + msgpack = Serializer.toTypedMessagePack(date); + String dDate = (String) Serializer.fromTypedMessagePack(msgpack); + assertEquals(date, Date.from(OffsetDateTime.parse(dDate).toInstant())); + + // array is saved as a List + String[] ary = new String[]{"a", "b", "c"}; + msgpack = Serializer.toTypedMessagePack(ary); + List dAry = (List) Serializer.fromTypedMessagePack(msgpack); + assertEquals(ary.length, dAry.size()); + assertEquals(ary[0], dAry.get(0)); + assertEquals(ary[1], dAry.get(1)); + assertEquals(ary[2], dAry.get(2)); + + + } + +}