diff --git a/src/main/java/com/liveramp/captain/app_type/CaptainAppType.java b/src/main/java/com/liveramp/captain/app_type/CaptainAppType.java index 5947732..1c441a3 100644 --- a/src/main/java/com/liveramp/captain/app_type/CaptainAppType.java +++ b/src/main/java/com/liveramp/captain/app_type/CaptainAppType.java @@ -29,7 +29,7 @@ public boolean equals(Object o) { return false; } - return appType.equals(((CaptainAppType)o).get()); + return appType.equals(((CaptainAppType) o).get()); } @Override diff --git a/src/main/java/com/liveramp/captain/daemon/BaseCaptainBuilder.java b/src/main/java/com/liveramp/captain/daemon/BaseCaptainBuilder.java index f1725a1..ae2aa66 100644 --- a/src/main/java/com/liveramp/captain/daemon/BaseCaptainBuilder.java +++ b/src/main/java/com/liveramp/captain/daemon/BaseCaptainBuilder.java @@ -1,11 +1,5 @@ package com.liveramp.captain.daemon; -import java.io.IOException; -import java.util.Optional; -import java.util.concurrent.TimeUnit; - -import org.apache.curator.framework.CuratorFramework; - import com.liveramp.captain.manifest_manager.ManifestManager; import com.liveramp.captain.notifier.CaptainDaemonInternalNotifier; import com.liveramp.captain.notifier.CaptainNotifier; @@ -18,18 +12,22 @@ import com.liveramp.daemon_lib.JobletCallback; import com.liveramp.daemon_lib.builders.ThreadingDaemonBuilder; import com.liveramp.daemon_lib.utils.JobletCallbackUtil; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.curator.framework.CuratorFramework; public class BaseCaptainBuilder> { /** - * allows the base builder to return the type of the subclass that using it. - * allows us to have the unchecked exception in one place so that it can suppressed once for code cleanliness. - * by making it protected, it allows a consumer to extend a builder that they build. - * by handling this in an instance variable as opposed to an abstract method, we avoid making the extender having - * to worry about implementing a "get self" method. + * allows the base builder to return the type of the subclass that using it. allows us to have the + * unchecked exception in one place so that it can suppressed once for code cleanliness. by making + * it protected, it allows a consumer to extend a builder that they build. by handling this in an + * instance variable as opposed to an abstract method, we avoid making the extender having to + * worry about implementing a "get self" method. */ @SuppressWarnings("unchecked") - protected final T self = (T)this; + protected final T self = (T) this; private boolean DEFAULT_SUPPORTS_PENDING = false; private boolean DEFAULT_SUPPORTS_RAMMING_SPEED = false; @@ -56,7 +54,11 @@ public class BaseCaptainBuilder> { private Optional> failureCallback = Optional.empty(); private FailedRequestPolicy failedRequestPolicy = new DefaultFailedRequestPolicy(); - protected BaseCaptainBuilder(String identifier, CaptainConfigProducer configProducer, ManifestManager manifestManager, RequestUpdater requestUpdater) { + protected BaseCaptainBuilder( + String identifier, + CaptainConfigProducer configProducer, + ManifestManager manifestManager, + RequestUpdater requestUpdater) { this.identifier = identifier; this.configProducer = configProducer; this.manifestManager = manifestManager; @@ -68,9 +70,7 @@ public T setFailedRequestPolicy(FailedRequestPolicy failedRequestPolicy) { return self; } - /** - * BETA - */ + /** BETA */ public T setRammingSpeed(boolean rammingSpeed) { this.rammingSpeed = rammingSpeed; @@ -96,13 +96,16 @@ public T setDaemonConfigProductionLock(DaemonLock daemonConfigProductionLock) { } /** - * built-in: if you have a zookeeper cluster you can point to, we can handle the locking infra for you. + * built-in: if you have a zookeeper cluster you can point to, we can handle the locking infra for + * you. * * @param curatorFramework * @return */ public T setZkDaemonLock(CuratorFramework curatorFramework) { - setDaemonConfigProductionLock(com.liveramp.captain.daemon.CaptainZkDaemonLock.getProduction(curatorFramework, identifier)); + setDaemonConfigProductionLock( + com.liveramp.captain.daemon.CaptainZkDaemonLock.getProduction( + curatorFramework, identifier)); return self; } @@ -150,7 +153,8 @@ public T setConfigWaitTime(int configWaitTime, TimeUnit unit) { * @return */ public T setExecutionSlotWaitTime(int executionSlotWaitTime, TimeUnit unit) { - this.executionSlotWaitSeconds = Optional.of(Math.toIntExact(unit.toSeconds(executionSlotWaitTime))); + this.executionSlotWaitSeconds = + Optional.of(Math.toIntExact(unit.toSeconds(executionSlotWaitTime))); return self; } @@ -185,24 +189,23 @@ public T setFailureCallback(JobletCallback failureCallback return self; } - public Daemon build() throws IllegalAccessException, IOException, InstantiationException { - CaptainNotifier resolvedCaptainNotifier = notifier != null ? notifier : new DefaultCaptainLoggingNotifier(); + public Daemon build() + throws IllegalAccessException, IOException, InstantiationException { + CaptainNotifier resolvedCaptainNotifier = + notifier != null ? notifier : new DefaultCaptainLoggingNotifier(); - CaptainJobletFactory jobletFactory = new ThreadedCaptainJobletFactoryImpl( - requestUpdater, - manifestManager, - resolvedCaptainNotifier, - supportsPending, - rammingSpeed, - failedRequestPolicy - ); + CaptainJobletFactory jobletFactory = + new ThreadedCaptainJobletFactoryImpl( + requestUpdater, + manifestManager, + resolvedCaptainNotifier, + supportsPending, + rammingSpeed, + failedRequestPolicy); - ThreadingDaemonBuilder daemonBuilder = new ThreadingDaemonBuilder<>( - identifier, - jobletFactory, - configProducer - ) - .setNotifier(new CaptainDaemonInternalNotifier(resolvedCaptainNotifier)); + ThreadingDaemonBuilder daemonBuilder = + new ThreadingDaemonBuilder<>(identifier, jobletFactory, configProducer) + .setNotifier(new CaptainDaemonInternalNotifier(resolvedCaptainNotifier)); maxThreads.ifPresent(daemonBuilder::setMaxThreads); nextConfigWaitSeconds.ifPresent(daemonBuilder::setNextConfigWaitSeconds); @@ -211,8 +214,10 @@ public Daemon build() throws IllegalAccessException, IOExc failureWaitSeconds.ifPresent(daemonBuilder::setFailureWaitSeconds); daemonConfigProductionLock.ifPresent(daemonBuilder::setDaemonConfigProductionLock); - Optional> lockRequestCallbackOptional = generateCallbackOptionalFromRequestLockOptional(requestLock, true); - Optional> unlockRequestCallbackOptional = generateCallbackOptionalFromRequestLockOptional(requestLock, false); + Optional> lockRequestCallbackOptional = + generateCallbackOptionalFromRequestLockOptional(requestLock, true); + Optional> unlockRequestCallbackOptional = + generateCallbackOptionalFromRequestLockOptional(requestLock, false); composeRequestLockCallbackAndOtherCallbacks(lockRequestCallbackOptional, onNewConfigCallback) .ifPresent(daemonBuilder::setOnNewConfigCallback); @@ -224,19 +229,26 @@ public Daemon build() throws IllegalAccessException, IOExc return daemonBuilder.build(); } - private Optional> generateCallbackOptionalFromRequestLockOptional(Optional requestLock, boolean lock) { + private Optional> + generateCallbackOptionalFromRequestLockOptional( + Optional requestLock, boolean lock) { if (requestLock.isPresent()) { if (lock) { - return Optional.of(new CaptainRequestLockingCallbacks.CaptainRequestLockCallback(requestLock.get())); + return Optional.of( + new CaptainRequestLockingCallbacks.CaptainRequestLockCallback(requestLock.get())); } else { - return Optional.of(new CaptainRequestLockingCallbacks.CaptainRequestUnlockCallback(requestLock.get())); + return Optional.of( + new CaptainRequestLockingCallbacks.CaptainRequestUnlockCallback(requestLock.get())); } } else { return Optional.empty(); } } - private Optional> composeRequestLockCallbackAndOtherCallbacks(Optional> requestLock, Optional> callback) { + private Optional> + composeRequestLockCallbackAndOtherCallbacks( + Optional> requestLock, + Optional> callback) { if (requestLock.isPresent() && callback.isPresent()) { return Optional.of(JobletCallbackUtil.compose(requestLock.get(), callback.get())); } else if (requestLock.isPresent()) { diff --git a/src/main/java/com/liveramp/captain/daemon/CaptainBuilder.java b/src/main/java/com/liveramp/captain/daemon/CaptainBuilder.java index f1abeb4..57885c9 100644 --- a/src/main/java/com/liveramp/captain/daemon/CaptainBuilder.java +++ b/src/main/java/com/liveramp/captain/daemon/CaptainBuilder.java @@ -4,11 +4,19 @@ @SuppressWarnings("OptionalUsedAsFieldOrParameterType") public class CaptainBuilder extends BaseCaptainBuilder { - public CaptainBuilder(String identifier, CaptainConfigProducer configProducer, ManifestManager manifestManager, RequestUpdater requestUpdater) { + public CaptainBuilder( + String identifier, + CaptainConfigProducer configProducer, + ManifestManager manifestManager, + RequestUpdater requestUpdater) { super(identifier, configProducer, manifestManager, requestUpdater); } - public static CaptainBuilder of(String identifier, CaptainConfigProducer configProducer, ManifestManager multiAppManifestManager, RequestUpdater requestUpdater) { + public static CaptainBuilder of( + String identifier, + CaptainConfigProducer configProducer, + ManifestManager multiAppManifestManager, + RequestUpdater requestUpdater) { return new CaptainBuilder(identifier, configProducer, multiAppManifestManager, requestUpdater); } } diff --git a/src/main/java/com/liveramp/captain/daemon/CaptainConfigProducer.java b/src/main/java/com/liveramp/captain/daemon/CaptainConfigProducer.java index 127007f..1b572ea 100644 --- a/src/main/java/com/liveramp/captain/daemon/CaptainConfigProducer.java +++ b/src/main/java/com/liveramp/captain/daemon/CaptainConfigProducer.java @@ -6,4 +6,3 @@ public interface CaptainConfigProducer extends JobletConfigProducer { CaptainRequestConfig getNextConfig() throws DaemonException; } - diff --git a/src/main/java/com/liveramp/captain/daemon/CaptainJoblet.java b/src/main/java/com/liveramp/captain/daemon/CaptainJoblet.java index 9b2d9b9..da9e68d 100644 --- a/src/main/java/com/liveramp/captain/daemon/CaptainJoblet.java +++ b/src/main/java/com/liveramp/captain/daemon/CaptainJoblet.java @@ -1,12 +1,5 @@ package com.liveramp.captain.daemon; - -import java.util.Optional; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; - import com.liveramp.captain.exception.CaptainPersistorException; import com.liveramp.captain.lib.CaptainAlertHelpers; import com.liveramp.captain.manifest.Manifest; @@ -22,6 +15,10 @@ import com.liveramp.captain.waypoint.WaypointType; import com.liveramp.daemon_lib.Joblet; import com.liveramp.daemon_lib.utils.DaemonException; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; public class CaptainJoblet implements Joblet { private CaptainNotifier notifier; @@ -60,19 +57,26 @@ public static CaptainJoblet of( boolean supportsPending, boolean rammingSpeed, FailedRequestPolicy failedRequestPolicy) { - return new CaptainJoblet(config, notifier, requestUpdater, manifestManager, supportsPending, rammingSpeed, failedRequestPolicy); + return new CaptainJoblet( + config, + notifier, + requestUpdater, + manifestManager, + supportsPending, + rammingSpeed, + failedRequestPolicy); } private Manifest getAccountManifest() { ManifestFactory manifestFactory = manifestManager.getManifestFactory(config.getAppType()); if (null == manifestFactory) { throw new RuntimeException( - String.format("could not find a manifest (host: %s) for given request type: %s. the manifests are available " + - "for the following request types: %s", + String.format( + "could not find a manifest (host: %s) for given request type: %s. the manifests are available " + + "for the following request types: %s", CaptainAlertHelpers.getHostName(), - config.getAppType(), manifestManager.getAvailableCaptainAppTypes() - ) - ); + config.getAppType(), + manifestManager.getAvailableCaptainAppTypes())); } return manifestFactory.create(); } @@ -98,7 +102,9 @@ public void run() throws DaemonException { executeFailedRequestPolicy(config); break; default: - throw new DaemonException("Provided config has a job status that shouldn't be handled by the captain " + config); + throw new DaemonException( + "Provided config has a job status that shouldn't be handled by the captain " + + config); } } catch (Exception e) { notifier.notify( @@ -115,7 +121,8 @@ public void run() throws DaemonException { private void executeFailedRequestPolicy(CaptainRequestConfig config) { long jobId = config.getId(); - FailedRequestPolicy.FailedRequestAction failedRequestAction = failedRequestPolicy.getFailedRequestAction(jobId); + FailedRequestPolicy.FailedRequestAction failedRequestAction = + failedRequestPolicy.getFailedRequestAction(jobId); switch (failedRequestAction) { case RETRY: requestUpdater.retry(jobId); @@ -135,29 +142,49 @@ private void goToNextStep(CaptainRequestConfig config) { Manifest manifest = getAccountManifest(); long jobId = config.getId(); - Optional nextStepOptional = manifest.getNextStep(config.getStep(), config.getId()); + Optional nextStepOptional = + manifest.getNextStep(config.getStep(), config.getId()); - LOG.info(String.format("current step: %s, current status: %s, next step: %s, next status: %s", config.getStep(), config.getStatus(), nextStepOptional, CaptainStatus.READY)); + LOG.info( + String.format( + "current step: %s, current status: %s, next step: %s, next status: %s", + config.getStep(), config.getStatus(), nextStepOptional, CaptainStatus.READY)); if (nextStepOptional.isPresent()) { - requestUpdater.setStepAndStatus(jobId, config.getStep(), config.getStatus(), nextStepOptional.get(), CaptainStatus.READY); + requestUpdater.setStepAndStatus( + jobId, + config.getStep(), + config.getStatus(), + nextStepOptional.get(), + CaptainStatus.READY); if (rammingSpeed) { - SimpleCaptainConfig newConfig = new SimpleCaptainConfig( - config.getId(), - CaptainStatus.READY, - nextStepOptional.get(), - config.getAppType() - ); - CaptainJoblet.of(newConfig, notifier, requestUpdater, manifestManager, supportsPending, rammingSpeed, failedRequestPolicy).run(); + SimpleCaptainConfig newConfig = + new SimpleCaptainConfig( + config.getId(), CaptainStatus.READY, nextStepOptional.get(), config.getAppType()); + CaptainJoblet.of( + newConfig, + notifier, + requestUpdater, + manifestManager, + supportsPending, + rammingSpeed, + failedRequestPolicy) + .run(); } } else { - requestUpdater.setStepAndStatus(jobId, config.getStep(), config.getStatus(), CaptainStep.fromString("DONE"), CaptainStatus.COMPLETED); + requestUpdater.setStepAndStatus( + jobId, + config.getStep(), + config.getStatus(), + CaptainStep.fromString("DONE"), + CaptainStatus.COMPLETED); } } catch (Exception e) { - String subject = String.format("%s: error while transitioning steps for request %s from step: %s.", - CaptainAlertHelpers.getHostName(), config.getId(), config.getStep() - ); + String subject = + String.format( + "%s: error while transitioning steps for request %s from step: %s.", + CaptainAlertHelpers.getHostName(), config.getId(), config.getStep()); notifier.notify(subject, e, CaptainNotifier.NotificationLevel.ERROR); requestUpdater.fail(config.getId()); @@ -175,16 +202,22 @@ private void submitRequest(CaptainRequestConfig config) throws DaemonException { try { WaypointSubmitter waypointSubmitter = waypoint.getSubmitter(); - RequestContext requestOptions = manifest.getRequestContextProducerFactory().create().get(jobId); + RequestContext requestOptions = + manifest.getRequestContextProducerFactory().create().get(jobId); waypointSubmitter.submitServiceRequest(config.getId(), requestOptions); } catch (CaptainPersistorException e) { - String subject = String.format("%s: handle persistence failed for request %s", CaptainAlertHelpers.getHostName(), jobId); + String subject = + String.format( + "%s: handle persistence failed for request %s", + CaptainAlertHelpers.getHostName(), jobId); notifier.notify(subject, e, CaptainNotifier.NotificationLevel.ERROR); requestUpdater.cancel(jobId); return; } catch (Throwable e) { - String subject = String.format("%s: error while submitting request %s", CaptainAlertHelpers.getHostName(), jobId); + String subject = + String.format( + "%s: error while submitting request %s", CaptainAlertHelpers.getHostName(), jobId); notifier.notify(subject, e, CaptainNotifier.NotificationLevel.ERROR); requestUpdater.fail(jobId); return; @@ -202,17 +235,25 @@ private void submitRequest(CaptainRequestConfig config) throws DaemonException { targetStatus = CaptainStatus.COMPLETED; } - LOG.info(String.format("current step %s, current status: %s, next status: %s", config.getStep(), config.getStatus(), targetStatus)); + LOG.info( + String.format( + "current step %s, current status: %s, next status: %s", + config.getStep(), config.getStatus(), targetStatus)); requestUpdater.setStatus(jobId, config.getStep(), config.getStatus(), targetStatus); if (rammingSpeed) { - SimpleCaptainConfig newConfig = new SimpleCaptainConfig( - config.getId(), - targetStatus, - config.getStep(), - config.getAppType()); - CaptainJoblet.of(newConfig, notifier, requestUpdater, manifestManager, supportsPending, rammingSpeed, failedRequestPolicy).run(); - + SimpleCaptainConfig newConfig = + new SimpleCaptainConfig( + config.getId(), targetStatus, config.getStep(), config.getAppType()); + CaptainJoblet.of( + newConfig, + notifier, + requestUpdater, + manifestManager, + supportsPending, + rammingSpeed, + failedRequestPolicy) + .run(); } } @@ -222,11 +263,13 @@ private void checkRequestStarted(CaptainRequestConfig config) { Waypoint waypoint = manifest.getWaypointForStep(config.getStep()); long jobId = config.getId(); - CaptainStatus status = waypoint.getStatusRetrieverFactory().create().getStatus(config.getId()); + CaptainStatus status = + waypoint.getStatusRetrieverFactory().create().getStatus(config.getId()); CaptainStatus targetStatus; - if (status.equals(CaptainStatus.IN_PROGRESS) || status.equals(CaptainStatus.FAILED) || status.equals( - CaptainStatus.COMPLETED)) { + if (status.equals(CaptainStatus.IN_PROGRESS) + || status.equals(CaptainStatus.FAILED) + || status.equals(CaptainStatus.COMPLETED)) { targetStatus = CaptainStatus.IN_PROGRESS; } else if (status.equals(CaptainStatus.QUARANTINED)) { targetStatus = CaptainStatus.QUARANTINED; @@ -238,17 +281,20 @@ private void checkRequestStarted(CaptainRequestConfig config) { targetStatus = config.getStatus(); } - LOG.info(String.format("current step %s, current status: %s, next status: %s", config.getStep(), config.getStatus(), targetStatus)); + LOG.info( + String.format( + "current step %s, current status: %s, next status: %s", + config.getStep(), config.getStatus(), targetStatus)); requestUpdater.setStatus(jobId, config.getStep(), config.getStatus(), targetStatus); } catch (Exception e) { - String subject = String.format("%s: error while checking if service has begun processing req %s in step: %s", - config.getId(), CaptainAlertHelpers.getHostName(), config.getStep() - ); + String subject = + String.format( + "%s: error while checking if service has begun processing req %s in step: %s", + config.getId(), CaptainAlertHelpers.getHostName(), config.getStep()); notifier.notify(subject, e, CaptainNotifier.NotificationLevel.ERROR); requestUpdater.fail(config.getId()); } - } private void checkRequestComplete(CaptainRequestConfig config) { @@ -257,7 +303,8 @@ private void checkRequestComplete(CaptainRequestConfig config) { Waypoint waypoint = manifest.getWaypointForStep(config.getStep()); long jobId = config.getId(); - CaptainStatus status = waypoint.getStatusRetrieverFactory().create().getStatus(config.getId()); + CaptainStatus status = + waypoint.getStatusRetrieverFactory().create().getStatus(config.getId()); CaptainStatus targetStatus; if (status.equals(CaptainStatus.COMPLETED)) { @@ -274,27 +321,41 @@ private void checkRequestComplete(CaptainRequestConfig config) { } else if (status.equals(CaptainStatus.CANCELLED)) { requestUpdater.cancel(jobId); return; - } else { // i.e. status.equals(ServiceRequestStatus.PENDING) || status.equals(ServiceRequestStatus.IN_PROGRESS) - throw new RuntimeException(String.format("request %s in an unexpected state. in the status_retriever step it was in status %s", jobId, status)); + } else { // i.e. status.equals(ServiceRequestStatus.PENDING) || + // status.equals(ServiceRequestStatus.IN_PROGRESS) + throw new RuntimeException( + String.format( + "request %s in an unexpected state. in the status_retriever step it was in status %s", + jobId, status)); } - LOG.info(String.format("current step %s, current status: %s, next status: %s", config.getStep(), config.getStatus(), targetStatus)); + LOG.info( + String.format( + "current step %s, current status: %s, next status: %s", + config.getStep(), config.getStatus(), targetStatus)); requestUpdater.setStatus(jobId, config.getStep(), config.getStatus(), targetStatus); if (rammingSpeed && targetStatus.equals(CaptainStatus.COMPLETED)) { - SimpleCaptainConfig newConfig = new SimpleCaptainConfig( - config.getId(), - CaptainStatus.COMPLETED, - config.getStep(), - config.getAppType()); - - CaptainJoblet.of(newConfig, notifier, requestUpdater, manifestManager, supportsPending, rammingSpeed, failedRequestPolicy).run(); + SimpleCaptainConfig newConfig = + new SimpleCaptainConfig( + config.getId(), CaptainStatus.COMPLETED, config.getStep(), config.getAppType()); + + CaptainJoblet.of( + newConfig, + notifier, + requestUpdater, + manifestManager, + supportsPending, + rammingSpeed, + failedRequestPolicy) + .run(); } } catch (Exception e) { - String subject = String.format("%s: error while checking status of req %s in step: %s", - CaptainAlertHelpers.getHostName(), config.getId(), config.getStep() - ); + String subject = + String.format( + "%s: error while checking status of req %s in step: %s", + CaptainAlertHelpers.getHostName(), config.getId(), config.getStep()); notifier.notify(subject, e, CaptainNotifier.NotificationLevel.ERROR); requestUpdater.fail(config.getId()); diff --git a/src/main/java/com/liveramp/captain/daemon/CaptainJobletFactory.java b/src/main/java/com/liveramp/captain/daemon/CaptainJobletFactory.java index efcc43c..e3df1e4 100644 --- a/src/main/java/com/liveramp/captain/daemon/CaptainJobletFactory.java +++ b/src/main/java/com/liveramp/captain/daemon/CaptainJobletFactory.java @@ -2,5 +2,4 @@ import com.liveramp.daemon_lib.JobletFactory; -interface CaptainJobletFactory extends JobletFactory { -} +interface CaptainJobletFactory extends JobletFactory {} diff --git a/src/main/java/com/liveramp/captain/daemon/CaptainRequestConfig.java b/src/main/java/com/liveramp/captain/daemon/CaptainRequestConfig.java index dea1197..c03630d 100644 --- a/src/main/java/com/liveramp/captain/daemon/CaptainRequestConfig.java +++ b/src/main/java/com/liveramp/captain/daemon/CaptainRequestConfig.java @@ -16,11 +16,15 @@ public abstract class CaptainRequestConfig implements JobletConfig { @Override public String toString() { - return "CaptainRequestConfig{" + - "jobId=" + getId() + - ",status=" + getStatus() + - ",step=" + getStep() + - ",appType=" + getAppType() + - "}"; + return "CaptainRequestConfig{" + + "jobId=" + + getId() + + ",status=" + + getStatus() + + ",step=" + + getStep() + + ",appType=" + + getAppType() + + "}"; } } diff --git a/src/main/java/com/liveramp/captain/daemon/CaptainRequestLockingCallbacks.java b/src/main/java/com/liveramp/captain/daemon/CaptainRequestLockingCallbacks.java index 9d3076e..b83cbd9 100644 --- a/src/main/java/com/liveramp/captain/daemon/CaptainRequestLockingCallbacks.java +++ b/src/main/java/com/liveramp/captain/daemon/CaptainRequestLockingCallbacks.java @@ -29,5 +29,4 @@ public void callback(CaptainRequestConfig config) { captainRequestLock.unlock(config.getId()); } } - } diff --git a/src/main/java/com/liveramp/captain/daemon/RequestUpdater.java b/src/main/java/com/liveramp/captain/daemon/RequestUpdater.java index 3dfd835..cf3952e 100644 --- a/src/main/java/com/liveramp/captain/daemon/RequestUpdater.java +++ b/src/main/java/com/liveramp/captain/daemon/RequestUpdater.java @@ -4,21 +4,21 @@ import com.liveramp.captain.step.CaptainStep; public interface RequestUpdater { - void setStatus(long jobId, CaptainStep currentStep, CaptainStatus currentStatus, CaptainStatus newStatus); - - void setStepAndStatus(long jobId, CaptainStep currentStep, CaptainStatus currentStatus, CaptainStep newStep, CaptainStatus newStatus); + void setStatus( + long jobId, CaptainStep currentStep, CaptainStatus currentStatus, CaptainStatus newStatus); + void setStepAndStatus( + long jobId, + CaptainStep currentStep, + CaptainStatus currentStatus, + CaptainStep newStep, + CaptainStatus newStatus); void cancel(long jobId); void fail(long jobId); + default void retry(long jobId) {} - default void retry(long jobId) { - - } - - default void quarantine(long jobId) { - - } + default void quarantine(long jobId) {} } diff --git a/src/main/java/com/liveramp/captain/daemon/SimpleCaptainConfig.java b/src/main/java/com/liveramp/captain/daemon/SimpleCaptainConfig.java index 30b7f04..9b4576e 100644 --- a/src/main/java/com/liveramp/captain/daemon/SimpleCaptainConfig.java +++ b/src/main/java/com/liveramp/captain/daemon/SimpleCaptainConfig.java @@ -10,7 +10,8 @@ public class SimpleCaptainConfig extends CaptainRequestConfig { private CaptainStep step; private CaptainAppType appType; - public SimpleCaptainConfig(long id, CaptainStatus status, CaptainStep step, CaptainAppType appType) { + public SimpleCaptainConfig( + long id, CaptainStatus status, CaptainStep step, CaptainAppType appType) { this.id = id; this.status = status; this.step = step; diff --git a/src/main/java/com/liveramp/captain/daemon_lock/CaptainZkDaemonLock.java b/src/main/java/com/liveramp/captain/daemon_lock/CaptainZkDaemonLock.java index 94f9dbb..0c03a9d 100644 --- a/src/main/java/com/liveramp/captain/daemon_lock/CaptainZkDaemonLock.java +++ b/src/main/java/com/liveramp/captain/daemon_lock/CaptainZkDaemonLock.java @@ -1,12 +1,11 @@ package com.liveramp.captain.daemon; +import com.liveramp.daemon_lib.DaemonLock; +import com.liveramp.daemon_lib.utils.DaemonException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessMutex; -import com.liveramp.daemon_lib.DaemonLock; -import com.liveramp.daemon_lib.utils.DaemonException; - public class CaptainZkDaemonLock implements DaemonLock { private static final String ZK_DAEMON_LOCK_BASE_PATH = "/daemon_lib_zk/zk_daemon_locks/"; @@ -14,7 +13,8 @@ public class CaptainZkDaemonLock implements DaemonLock { public static DaemonLock getProduction(CuratorFramework curatorFramework, String daemonId) { curatorFramework.start(); - Runtime.getRuntime().addShutdownHook(new Thread(new CaptainZkDaemonLock.FrameworkShutdown(curatorFramework))); + Runtime.getRuntime() + .addShutdownHook(new Thread(new CaptainZkDaemonLock.FrameworkShutdown(curatorFramework))); return new CaptainZkDaemonLock(curatorFramework, ZK_DAEMON_LOCK_BASE_PATH + daemonId); } @@ -57,5 +57,4 @@ public void run() { fw.close(); } } - } diff --git a/src/main/java/com/liveramp/captain/exception/CaptainPersistorException.java b/src/main/java/com/liveramp/captain/exception/CaptainPersistorException.java index 4720f4c..3a2663a 100644 --- a/src/main/java/com/liveramp/captain/exception/CaptainPersistorException.java +++ b/src/main/java/com/liveramp/captain/exception/CaptainPersistorException.java @@ -5,10 +5,12 @@ public CaptainPersistorException(String message, Exception e) { super(message, e); } - public static CaptainPersistorException of(Exception e, long jobId, String stepString, ServiceHandle serviceHandle) { - String message = String.format("handle persistor failed for request id: %s at step: %s while attempting to persist service handle: %s. quarantining request.", - jobId, stepString, serviceHandle - ); + public static CaptainPersistorException of( + Exception e, long jobId, String stepString, ServiceHandle serviceHandle) { + String message = + String.format( + "handle persistor failed for request id: %s at step: %s while attempting to persist service handle: %s. quarantining request.", + jobId, stepString, serviceHandle); return new CaptainPersistorException(message, e); } diff --git a/src/main/java/com/liveramp/captain/handle_persistor/HandlePersistorWrapperFactory.java b/src/main/java/com/liveramp/captain/handle_persistor/HandlePersistorWrapperFactory.java index 1db769a..a4b81eb 100644 --- a/src/main/java/com/liveramp/captain/handle_persistor/HandlePersistorWrapperFactory.java +++ b/src/main/java/com/liveramp/captain/handle_persistor/HandlePersistorWrapperFactory.java @@ -1,6 +1,7 @@ package com.liveramp.captain.handle_persistor; -public class HandlePersistorWrapperFactory implements HandlePersistorFactory { +public class HandlePersistorWrapperFactory + implements HandlePersistorFactory { private HandlePersistor handlePersistor; diff --git a/src/main/java/com/liveramp/captain/handle_persistor/NoOpHandlePersistor.java b/src/main/java/com/liveramp/captain/handle_persistor/NoOpHandlePersistor.java index 17eb723..e2c84f4 100644 --- a/src/main/java/com/liveramp/captain/handle_persistor/NoOpHandlePersistor.java +++ b/src/main/java/com/liveramp/captain/handle_persistor/NoOpHandlePersistor.java @@ -2,14 +2,10 @@ public class NoOpHandlePersistor implements HandlePersistor { - private NoOpHandlePersistor() { - - } + private NoOpHandlePersistor() {} @Override - public void persist(Long jobId, ServiceHandle o) { - - } + public void persist(Long jobId, ServiceHandle o) {} public static HandlePersistorFactory get() { return new NoOpHandlePersistor.Factory(); diff --git a/src/main/java/com/liveramp/captain/lib/CaptainAlertHelpers.java b/src/main/java/com/liveramp/captain/lib/CaptainAlertHelpers.java index 41662c3..f288a2f 100644 --- a/src/main/java/com/liveramp/captain/lib/CaptainAlertHelpers.java +++ b/src/main/java/com/liveramp/captain/lib/CaptainAlertHelpers.java @@ -1,7 +1,6 @@ package com.liveramp.captain.lib; import java.net.InetAddress; - import org.apache.commons.io.IOUtils; public class CaptainAlertHelpers { diff --git a/src/main/java/com/liveramp/captain/manifest/DefaultManifestImpl.java b/src/main/java/com/liveramp/captain/manifest/DefaultManifestImpl.java index eee4de4..911e6da 100644 --- a/src/main/java/com/liveramp/captain/manifest/DefaultManifestImpl.java +++ b/src/main/java/com/liveramp/captain/manifest/DefaultManifestImpl.java @@ -1,16 +1,14 @@ package com.liveramp.captain.manifest; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; - import com.google.common.collect.Maps; - import com.liveramp.captain.request_context.NoOpRequestContextProducer; import com.liveramp.captain.request_context.RequestContextProducerFactory; import com.liveramp.captain.step.CaptainStep; import com.liveramp.captain.waypoint.Waypoint; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; public class DefaultManifestImpl implements Manifest { private List steps; @@ -19,7 +17,8 @@ public class DefaultManifestImpl implements Manifest { private Map waypointByStep; private RequestContextProducerFactory requestContextProducerFactory; - public DefaultManifestImpl(List steps, RequestContextProducerFactory requestContextProducerFactory) { + public DefaultManifestImpl( + List steps, RequestContextProducerFactory requestContextProducerFactory) { this.steps = steps; this.requestContextProducerFactory = requestContextProducerFactory; @@ -49,7 +48,9 @@ public Optional getNextStep(CaptainStep step, long jobId) { } else { Integer stepIndex = indexByStep.get(step); if (null == stepIndex) { - throw new RuntimeException(String.format("could not find step: %s in manifest: %s", step, waypointByStep.keySet())); + throw new RuntimeException( + String.format( + "could not find step: %s in manifest: %s", step, waypointByStep.keySet())); } nextStepIndex = indexByStep.get(step) + 1; } diff --git a/src/main/java/com/liveramp/captain/manifest/Manifest.java b/src/main/java/com/liveramp/captain/manifest/Manifest.java index 8a10bb5..ced3ce9 100644 --- a/src/main/java/com/liveramp/captain/manifest/Manifest.java +++ b/src/main/java/com/liveramp/captain/manifest/Manifest.java @@ -1,10 +1,9 @@ package com.liveramp.captain.manifest; -import java.util.Optional; - import com.liveramp.captain.request_context.RequestContextProducerFactory; import com.liveramp.captain.step.CaptainStep; import com.liveramp.captain.waypoint.Waypoint; +import java.util.Optional; public interface Manifest { Optional getNextStep(CaptainStep step, long jobId); diff --git a/src/main/java/com/liveramp/captain/manifest/WrapperManifestFactoryImpl.java b/src/main/java/com/liveramp/captain/manifest/WrapperManifestFactoryImpl.java index 81a8a6f..2f98dcb 100644 --- a/src/main/java/com/liveramp/captain/manifest/WrapperManifestFactoryImpl.java +++ b/src/main/java/com/liveramp/captain/manifest/WrapperManifestFactoryImpl.java @@ -1,6 +1,6 @@ package com.liveramp.captain.manifest; -public class WrapperManifestFactoryImpl implements ManifestFactory{ +public class WrapperManifestFactoryImpl implements ManifestFactory { private final Manifest manifest; public WrapperManifestFactoryImpl(Manifest manifest) { diff --git a/src/main/java/com/liveramp/captain/manifest_manager/ManifestManager.java b/src/main/java/com/liveramp/captain/manifest_manager/ManifestManager.java index a577b9e..3d61406 100644 --- a/src/main/java/com/liveramp/captain/manifest_manager/ManifestManager.java +++ b/src/main/java/com/liveramp/captain/manifest_manager/ManifestManager.java @@ -1,9 +1,8 @@ package com.liveramp.captain.manifest_manager; -import java.util.Set; - import com.liveramp.captain.app_type.CaptainAppType; import com.liveramp.captain.manifest.ManifestFactory; +import java.util.Set; public interface ManifestManager { ManifestFactory getManifestFactory(CaptainAppType appType); diff --git a/src/main/java/com/liveramp/captain/manifest_manager/MultiAppManifestManager.java b/src/main/java/com/liveramp/captain/manifest_manager/MultiAppManifestManager.java index 45a034d..b96cf24 100644 --- a/src/main/java/com/liveramp/captain/manifest_manager/MultiAppManifestManager.java +++ b/src/main/java/com/liveramp/captain/manifest_manager/MultiAppManifestManager.java @@ -1,13 +1,12 @@ package com.liveramp.captain.manifest_manager; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - import com.liveramp.captain.app_type.CaptainAppType; import com.liveramp.captain.manifest.Manifest; import com.liveramp.captain.manifest.ManifestFactory; import com.liveramp.captain.manifest.WrapperManifestFactoryImpl; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; public class MultiAppManifestManager implements ManifestManager { private Map manifestByAppType; @@ -16,11 +15,13 @@ private MultiAppManifestManager(Map manifestByA this.manifestByAppType = manifestByAppType; } - public static MultiAppManifestManager ofManifests(Map manifestByAppType) { + public static MultiAppManifestManager ofManifests( + Map manifestByAppType) { return new MultiAppManifestManager(manifestMapToManifestFactoryMap(manifestByAppType)); } - public static MultiAppManifestManager ofManifestFactories(Map manifestFactoryByAppType) { + public static MultiAppManifestManager ofManifestFactories( + Map manifestFactoryByAppType) { return new MultiAppManifestManager(manifestFactoryByAppType); } @@ -31,7 +32,10 @@ public Map create() { @Override public ManifestFactory getManifestFactory(CaptainAppType appType) { if (!manifestByAppType.containsKey(appType)) { - throw new RuntimeException(String.format("no manifest with app type %s in the registered manifests: %s.", appType, manifestByAppType)); + throw new RuntimeException( + String.format( + "no manifest with app type %s in the registered manifests: %s.", + appType, manifestByAppType)); } return manifestByAppType.get(appType); } @@ -41,8 +45,13 @@ public Set getAvailableCaptainAppTypes() { return manifestByAppType.keySet(); } - private static Map manifestMapToManifestFactoryMap(Map manifestByAppType) { - return manifestByAppType.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, entry -> new WrapperManifestFactoryImpl(entry.getValue()))); + private static Map manifestMapToManifestFactoryMap( + Map manifestByAppType) { + return manifestByAppType + .entrySet() + .stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, entry -> new WrapperManifestFactoryImpl(entry.getValue()))); } } diff --git a/src/main/java/com/liveramp/captain/manifest_manager/SingleAppManifestManager.java b/src/main/java/com/liveramp/captain/manifest_manager/SingleAppManifestManager.java index 799688e..c0efa9e 100644 --- a/src/main/java/com/liveramp/captain/manifest_manager/SingleAppManifestManager.java +++ b/src/main/java/com/liveramp/captain/manifest_manager/SingleAppManifestManager.java @@ -1,13 +1,11 @@ package com.liveramp.captain.manifest_manager; -import java.util.Set; - import com.google.common.collect.Sets; - import com.liveramp.captain.app_type.CaptainAppType; import com.liveramp.captain.manifest.Manifest; import com.liveramp.captain.manifest.ManifestFactory; import com.liveramp.captain.manifest.WrapperManifestFactoryImpl; +import java.util.Set; public class SingleAppManifestManager implements ManifestManager { private final String SINGLE_APP_TYPE_PLACEHOLDER_KEY = "ALL"; @@ -21,7 +19,6 @@ public class SingleAppManifestManager implements ManifestManager { this.manifestFactory = manifestFactory; } - @Override public ManifestFactory getManifestFactory(CaptainAppType appType) { return manifestFactory; diff --git a/src/main/java/com/liveramp/captain/notifier/CaptainDaemonInternalNotifier.java b/src/main/java/com/liveramp/captain/notifier/CaptainDaemonInternalNotifier.java index 6fc9b1d..67d95aa 100644 --- a/src/main/java/com/liveramp/captain/notifier/CaptainDaemonInternalNotifier.java +++ b/src/main/java/com/liveramp/captain/notifier/CaptainDaemonInternalNotifier.java @@ -1,12 +1,10 @@ package com.liveramp.captain.notifier; import com.liveramp.daemon_lib.DaemonNotifier; - import java.util.Optional; public class CaptainDaemonInternalNotifier implements DaemonNotifier { - private final CaptainNotifier notifier; public CaptainDaemonInternalNotifier(final CaptainNotifier notifier) { @@ -14,8 +12,7 @@ public CaptainDaemonInternalNotifier(final CaptainNotifier notifier) { } @Override - public void notify( - String subject, Optional body, Optional t) { + public void notify(String subject, Optional body, Optional t) { if (t.isPresent()) { notifier.notify(subject, body.orElse(""), t.get(), CaptainNotifier.NotificationLevel.INFO); } else { diff --git a/src/main/java/com/liveramp/captain/notifier/CaptainNotifier.java b/src/main/java/com/liveramp/captain/notifier/CaptainNotifier.java index 9ddb8c1..6137328 100644 --- a/src/main/java/com/liveramp/captain/notifier/CaptainNotifier.java +++ b/src/main/java/com/liveramp/captain/notifier/CaptainNotifier.java @@ -3,11 +3,15 @@ public interface CaptainNotifier { void notify(String header, String message, NotificationLevel notificationLevel); + void notify(String header, Throwable t, NotificationLevel notificationLevel); + void notify(String header, String message, Throwable t, NotificationLevel notificationLevel); enum NotificationLevel { - DEBUG, INFO, ERROR; + DEBUG, + INFO, + ERROR; @Override public String toString() { diff --git a/src/main/java/com/liveramp/captain/notifier/DefaultCaptainLoggingNotifier.java b/src/main/java/com/liveramp/captain/notifier/DefaultCaptainLoggingNotifier.java index e043915..652cd43 100644 --- a/src/main/java/com/liveramp/captain/notifier/DefaultCaptainLoggingNotifier.java +++ b/src/main/java/com/liveramp/captain/notifier/DefaultCaptainLoggingNotifier.java @@ -7,27 +7,23 @@ public class DefaultCaptainLoggingNotifier implements CaptainNotifier { private static final Logger LOG = LoggerFactory.getLogger(DefaultCaptainLoggingNotifier.class); - private void notifyInternal(String subject, String body, NotificationLevel notificationLevel) { LOG.info(String.format("[%s]: %s %n%n %s)", notificationLevel.toString(), subject, body)); } @Override - public void notify( - String header, String message, NotificationLevel notificationLevel) { + public void notify(String header, String message, NotificationLevel notificationLevel) { notifyInternal(header, message, notificationLevel); } @Override - public void notify( - String header, Throwable t, NotificationLevel notificationLevel) { + public void notify(String header, Throwable t, NotificationLevel notificationLevel) { notify(header, "", t, notificationLevel); } @Override public void notify( - String header, String message, Throwable t, - NotificationLevel notificationLevel) { + String header, String message, Throwable t, NotificationLevel notificationLevel) { String body = String.format("%s %n %s", message, ExceptionUtils.getFullStackTrace(t)); notifyInternal(header, body, notificationLevel); } diff --git a/src/main/java/com/liveramp/captain/optional_step_predicate/StepPredicateFactory.java b/src/main/java/com/liveramp/captain/optional_step_predicate/StepPredicateFactory.java index 4fe7c57..2665d73 100644 --- a/src/main/java/com/liveramp/captain/optional_step_predicate/StepPredicateFactory.java +++ b/src/main/java/com/liveramp/captain/optional_step_predicate/StepPredicateFactory.java @@ -3,4 +3,3 @@ public interface StepPredicateFactory { StepPredicate create(); } - diff --git a/src/main/java/com/liveramp/captain/request_context/ExternalId.java b/src/main/java/com/liveramp/captain/request_context/ExternalId.java index 2e29e12..d50c458 100644 --- a/src/main/java/com/liveramp/captain/request_context/ExternalId.java +++ b/src/main/java/com/liveramp/captain/request_context/ExternalId.java @@ -1,15 +1,18 @@ package com.liveramp.captain.request_context; -/*** - * The ExternalId exists to help clients associate their requests with each other, and outside objects. +/** + * * The ExternalId exists to help clients associate their requests with each other, and outside + * objects. */ public class ExternalId { private String idType; private String id; - /*** + /** + * * + * * @param idType An identifier for the external id. (e.g. "data_sync_job") - * @param id An identifier for an instance from the external application. (e.g. id of DataSyncJob) + * @param id An identifier for an instance from the external application. (e.g. id of DataSyncJob) */ public ExternalId(String idType, String id) { this.idType = idType; diff --git a/src/main/java/com/liveramp/captain/request_context/NoOpRequestContextProducer.java b/src/main/java/com/liveramp/captain/request_context/NoOpRequestContextProducer.java index fa02a6e..61aafc0 100644 --- a/src/main/java/com/liveramp/captain/request_context/NoOpRequestContextProducer.java +++ b/src/main/java/com/liveramp/captain/request_context/NoOpRequestContextProducer.java @@ -1,6 +1,5 @@ package com.liveramp.captain.request_context; - public class NoOpRequestContextProducer implements RequestContextProducer { @Override public EmptyRequestContext get(long jobId) { diff --git a/src/main/java/com/liveramp/captain/request_lock/CaptainRequestLock.java b/src/main/java/com/liveramp/captain/request_lock/CaptainRequestLock.java index ca6506e..2b25faa 100644 --- a/src/main/java/com/liveramp/captain/request_lock/CaptainRequestLock.java +++ b/src/main/java/com/liveramp/captain/request_lock/CaptainRequestLock.java @@ -1,8 +1,9 @@ package com.liveramp.captain.request_lock; /** - * this interface is sufficient to get captain to run and lock requests properly. if you want to avoid pulling in already - * locked requests to your config producer consider using CaptainRequestLockWithRunningIds + * this interface is sufficient to get captain to run and lock requests properly. if you want to + * avoid pulling in already locked requests to your config producer consider using + * CaptainRequestLockWithRunningIds */ public interface CaptainRequestLock { void lock(long jobId); diff --git a/src/main/java/com/liveramp/captain/request_lock/CaptainRequestLockWithRunningIds.java b/src/main/java/com/liveramp/captain/request_lock/CaptainRequestLockWithRunningIds.java index e03f89c..e53cbeb 100644 --- a/src/main/java/com/liveramp/captain/request_lock/CaptainRequestLockWithRunningIds.java +++ b/src/main/java/com/liveramp/captain/request_lock/CaptainRequestLockWithRunningIds.java @@ -3,8 +3,8 @@ import java.util.Set; /** - * this addition to the CaptainRequestLock iface, gives you the opportunity to, in your config producer, view all of the - * currently locked ids, and ever pulling them in the config producer. + * this addition to the CaptainRequestLock iface, gives you the opportunity to, in your config + * producer, view all of the currently locked ids, and ever pulling them in the config producer. */ public interface CaptainRequestLockWithRunningIds extends CaptainRequestLock { Set getLockedRequestIds(); diff --git a/src/main/java/com/liveramp/captain/request_lock/running_ids_manager_request_lock/RunningIdsManager.java b/src/main/java/com/liveramp/captain/request_lock/running_ids_manager_request_lock/RunningIdsManager.java index ac9daa6..b731330 100644 --- a/src/main/java/com/liveramp/captain/request_lock/running_ids_manager_request_lock/RunningIdsManager.java +++ b/src/main/java/com/liveramp/captain/request_lock/running_ids_manager_request_lock/RunningIdsManager.java @@ -1,8 +1,7 @@ package com.liveramp.captain.request_lock.running_ids_manager_request_lock; -import java.util.Set; - import com.google.common.collect.Sets; +import java.util.Set; class RunningIdsManager { @@ -24,4 +23,3 @@ public synchronized Set getRunningIds() { return Sets.newHashSet(runningIds); } } - diff --git a/src/main/java/com/liveramp/captain/request_lock/running_ids_manager_request_lock/RunningIdsManagerCaptainLock.java b/src/main/java/com/liveramp/captain/request_lock/running_ids_manager_request_lock/RunningIdsManagerCaptainLock.java index 5a8e3a6..ac9e04c 100644 --- a/src/main/java/com/liveramp/captain/request_lock/running_ids_manager_request_lock/RunningIdsManagerCaptainLock.java +++ b/src/main/java/com/liveramp/captain/request_lock/running_ids_manager_request_lock/RunningIdsManagerCaptainLock.java @@ -1,8 +1,7 @@ package com.liveramp.captain.request_lock.running_ids_manager_request_lock; -import java.util.Set; - import com.liveramp.captain.request_lock.CaptainRequestLockWithRunningIds; +import java.util.Set; public class RunningIdsManagerCaptainLock implements CaptainRequestLockWithRunningIds { private final RunningIdsManager runningIdsManager; diff --git a/src/main/java/com/liveramp/captain/request_lock/zk_request_lock/ZKRequestLock.java b/src/main/java/com/liveramp/captain/request_lock/zk_request_lock/ZKRequestLock.java index a039d36..cc022d2 100644 --- a/src/main/java/com/liveramp/captain/request_lock/zk_request_lock/ZKRequestLock.java +++ b/src/main/java/com/liveramp/captain/request_lock/zk_request_lock/ZKRequestLock.java @@ -1,13 +1,12 @@ package com.liveramp.captain.request_lock.zk_request_lock; import com.google.common.collect.Sets; +import java.util.Set; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Set; - class ZKRequestLock { private static final Logger LOG = LoggerFactory.getLogger(ZKRequestLock.class); @@ -30,9 +29,14 @@ public void acquire(String requestId) throws Exception { String requestPath = zkRootPath + "/" + requestId; if (framework.checkExists().forPath(requestPath) != null) { - throw new RuntimeException(String.format("request: %s already locked by another process.", requestId)); + throw new RuntimeException( + String.format("request: %s already locked by another process.", requestId)); } - framework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(requestPath); + framework + .create() + .creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL) + .forPath(requestPath); locks.add(requestId); } @@ -43,7 +47,9 @@ public void release(String requestId) throws Exception { framework.delete().deletingChildrenIfNeeded().forPath(zkRootPath + "/" + requestId); locks.remove(requestId); } else { - throw new RuntimeException(String.format("trying to unlock a lock owned by another process. request id: %s", requestId)); + throw new RuntimeException( + String.format( + "trying to unlock a lock owned by another process. request id: %s", requestId)); } } @@ -51,7 +57,9 @@ public void releaseSafe(String requestId) { try { release(requestId); } catch (Exception e) { - LOG.error("caught exception during safe release of lock. logging exception and continuing execution.", e); + LOG.error( + "caught exception during safe release of lock. logging exception and continuing execution.", + e); } } diff --git a/src/main/java/com/liveramp/captain/request_lock/zk_request_lock/ZkCaptainRequestLock.java b/src/main/java/com/liveramp/captain/request_lock/zk_request_lock/ZkCaptainRequestLock.java index e0e16a0..ddcfe16 100644 --- a/src/main/java/com/liveramp/captain/request_lock/zk_request_lock/ZkCaptainRequestLock.java +++ b/src/main/java/com/liveramp/captain/request_lock/zk_request_lock/ZkCaptainRequestLock.java @@ -1,12 +1,10 @@ package com.liveramp.captain.request_lock.zk_request_lock; +import com.liveramp.captain.request_lock.CaptainRequestLockWithRunningIds; import java.util.Set; import java.util.stream.Collectors; - import org.apache.curator.framework.CuratorFramework; -import com.liveramp.captain.request_lock.CaptainRequestLockWithRunningIds; - public class ZkCaptainRequestLock implements CaptainRequestLockWithRunningIds { private ZKRequestLock requestLock; @@ -40,7 +38,8 @@ public void unlock(long jobId) { } } - public static ZkCaptainRequestLock getProduction(CuratorFramework curatorFramework, String zkRootPath) { + public static ZkCaptainRequestLock getProduction( + CuratorFramework curatorFramework, String zkRootPath) { return new ZkCaptainRequestLock(new ZKRequestLock(curatorFramework, zkRootPath)); } } diff --git a/src/main/java/com/liveramp/captain/request_submitter/NoOpSubmitter.java b/src/main/java/com/liveramp/captain/request_submitter/NoOpSubmitter.java index 90c524a..3cae1c0 100644 --- a/src/main/java/com/liveramp/captain/request_submitter/NoOpSubmitter.java +++ b/src/main/java/com/liveramp/captain/request_submitter/NoOpSubmitter.java @@ -1,9 +1,7 @@ package com.liveramp.captain.request_submitter; - import com.liveramp.captain.request_context.RequestContext; - public class NoOpSubmitter implements RequestSubmitter { @Override public Long submit(long jobId, RequestContext options) { diff --git a/src/main/java/com/liveramp/captain/request_submitter/RequestSubmitter.java b/src/main/java/com/liveramp/captain/request_submitter/RequestSubmitter.java index 703a917..51d45eb 100644 --- a/src/main/java/com/liveramp/captain/request_submitter/RequestSubmitter.java +++ b/src/main/java/com/liveramp/captain/request_submitter/RequestSubmitter.java @@ -1,9 +1,7 @@ package com.liveramp.captain.request_submitter; - import com.liveramp.captain.request_context.RequestContext; public interface RequestSubmitter { RequestHandle submit(long jobId, RequestContext options) throws Exception; } - diff --git a/src/main/java/com/liveramp/captain/request_submitter/RequestSubmitterWrapperFactory.java b/src/main/java/com/liveramp/captain/request_submitter/RequestSubmitterWrapperFactory.java index 5771099..989c249 100644 --- a/src/main/java/com/liveramp/captain/request_submitter/RequestSubmitterWrapperFactory.java +++ b/src/main/java/com/liveramp/captain/request_submitter/RequestSubmitterWrapperFactory.java @@ -1,6 +1,7 @@ package com.liveramp.captain.request_submitter; -public class RequestSubmitterWrapperFactory implements RequestSubmitterFactory { +public class RequestSubmitterWrapperFactory + implements RequestSubmitterFactory { private RequestSubmitter requestSubmitter; diff --git a/src/main/java/com/liveramp/captain/retry/DefaultFailedRequestPolicy.java b/src/main/java/com/liveramp/captain/retry/DefaultFailedRequestPolicy.java index 86bfd4e..35eabcc 100644 --- a/src/main/java/com/liveramp/captain/retry/DefaultFailedRequestPolicy.java +++ b/src/main/java/com/liveramp/captain/retry/DefaultFailedRequestPolicy.java @@ -1,11 +1,9 @@ package com.liveramp.captain.retry; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class DefaultFailedRequestPolicy implements FailedRequestPolicy { - @Override + @Override public FailedRequestAction getFailedRequestAction(Long jobId) { - return FailedRequestAction.NO_OP; - } + return FailedRequestAction.NO_OP; + } } diff --git a/src/main/java/com/liveramp/captain/retry/FailedRequestPolicy.java b/src/main/java/com/liveramp/captain/retry/FailedRequestPolicy.java index 3f2cf03..c3802b0 100644 --- a/src/main/java/com/liveramp/captain/retry/FailedRequestPolicy.java +++ b/src/main/java/com/liveramp/captain/retry/FailedRequestPolicy.java @@ -2,18 +2,19 @@ import com.liveramp.captain.daemon.RequestUpdater; -/*** - * Implementations of this interface are meant to make a decision on what to do when captain picks up a - * request in status FAILED. There are three possible return values: +/** + * * Implementations of this interface are meant to make a decision on what to do when captain picks + * up a request in status FAILED. There are three possible return values: * - * RETRY: Captain will call the retry method in {@link RequestUpdater} - * QUARANTINE: Captain will call the quarantine method in {@link RequestUpdater} - * NO_OP: Captain will do nothing. By default captain uses a FailedRequestPolicy that always returns NO_OP. + *

RETRY: Captain will call the retry method in {@link RequestUpdater} QUARANTINE: Captain will + * call the quarantine method in {@link RequestUpdater} NO_OP: Captain will do nothing. By default + * captain uses a FailedRequestPolicy that always returns NO_OP. */ public interface FailedRequestPolicy { - /*** - * Method called by captain to know what to do with a failed request + /** + * * Method called by captain to know what to do with a failed request + * * @param jobId id of the failed request * @return action to be executed by captain */ diff --git a/src/main/java/com/liveramp/captain/status_retriever/StatusRetriever.java b/src/main/java/com/liveramp/captain/status_retriever/StatusRetriever.java index 495c666..5cd4995 100644 --- a/src/main/java/com/liveramp/captain/status_retriever/StatusRetriever.java +++ b/src/main/java/com/liveramp/captain/status_retriever/StatusRetriever.java @@ -1,6 +1,5 @@ package com.liveramp.captain.status_retriever; - import com.liveramp.captain.status.CaptainStatus; public interface StatusRetriever { diff --git a/src/main/java/com/liveramp/captain/step/CaptainStep.java b/src/main/java/com/liveramp/captain/step/CaptainStep.java index e8105c5..5298af7 100644 --- a/src/main/java/com/liveramp/captain/step/CaptainStep.java +++ b/src/main/java/com/liveramp/captain/step/CaptainStep.java @@ -29,7 +29,7 @@ public boolean equals(Object o) { return false; } - return step.equals(((CaptainStep)o).get()); + return step.equals(((CaptainStep) o).get()); } @Override diff --git a/src/main/java/com/liveramp/captain/waypoint/AsyncWaypoint.java b/src/main/java/com/liveramp/captain/waypoint/AsyncWaypoint.java index b0207df..273e3fa 100644 --- a/src/main/java/com/liveramp/captain/waypoint/AsyncWaypoint.java +++ b/src/main/java/com/liveramp/captain/waypoint/AsyncWaypoint.java @@ -13,7 +13,6 @@ import com.liveramp.captain.status_retriever.StatusRetrieverFactory; import com.liveramp.captain.status_retriever.StatusRetrieverWrapperFactory; import com.liveramp.captain.step.CaptainStep; - import java.util.Optional; public class AsyncWaypoint implements Waypoint { @@ -34,12 +33,31 @@ public AsyncWaypoint( this.optionalStepPredicate = optionalStepPredicate; } - public AsyncWaypoint(CaptainStep step, RequestSubmitter requestSubmitter, HandlePersistor handlePersistor, StatusRetriever statusRetriever) { - this(step, new RequestSubmitterWrapperFactory<>(requestSubmitter), new HandlePersistorWrapperFactory<>(handlePersistor), new StatusRetrieverWrapperFactory(statusRetriever), Optional.empty()); + public AsyncWaypoint( + CaptainStep step, + RequestSubmitter requestSubmitter, + HandlePersistor handlePersistor, + StatusRetriever statusRetriever) { + this( + step, + new RequestSubmitterWrapperFactory<>(requestSubmitter), + new HandlePersistorWrapperFactory<>(handlePersistor), + new StatusRetrieverWrapperFactory(statusRetriever), + Optional.empty()); } - public AsyncWaypoint(CaptainStep step, RequestSubmitter requestSubmitter, HandlePersistor handlePersistor, StatusRetriever statusRetriever, StepPredicate stepPredicate) { - this(step, new RequestSubmitterWrapperFactory<>(requestSubmitter), new HandlePersistorWrapperFactory<>(handlePersistor), new StatusRetrieverWrapperFactory(statusRetriever), Optional.of(new StepPredicateWrapperFactory(stepPredicate))); + public AsyncWaypoint( + CaptainStep step, + RequestSubmitter requestSubmitter, + HandlePersistor handlePersistor, + StatusRetriever statusRetriever, + StepPredicate stepPredicate) { + this( + step, + new RequestSubmitterWrapperFactory<>(requestSubmitter), + new HandlePersistorWrapperFactory<>(handlePersistor), + new StatusRetrieverWrapperFactory(statusRetriever), + Optional.of(new StepPredicateWrapperFactory(stepPredicate))); } @Override diff --git a/src/main/java/com/liveramp/captain/waypoint/ControlFlowWaypoint.java b/src/main/java/com/liveramp/captain/waypoint/ControlFlowWaypoint.java index a62dfdd..053cb79 100644 --- a/src/main/java/com/liveramp/captain/waypoint/ControlFlowWaypoint.java +++ b/src/main/java/com/liveramp/captain/waypoint/ControlFlowWaypoint.java @@ -5,7 +5,6 @@ import com.liveramp.captain.status_retriever.StatusRetrieverFactory; import com.liveramp.captain.status_retriever.StatusRetrieverWrapperFactory; import com.liveramp.captain.step.CaptainStep; - import java.util.Optional; public class ControlFlowWaypoint implements Waypoint { @@ -14,7 +13,10 @@ public class ControlFlowWaypoint implements Waypoint { private final StatusRetrieverFactory statusRetriever; private final Optional optionalStepPredicate; - public ControlFlowWaypoint(CaptainStep step, StatusRetrieverFactory statusRetriever, Optional optionalStepPredicate) { + public ControlFlowWaypoint( + CaptainStep step, + StatusRetrieverFactory statusRetriever, + Optional optionalStepPredicate) { this.step = step; this.statusRetriever = statusRetriever; this.optionalStepPredicate = optionalStepPredicate; diff --git a/src/main/java/com/liveramp/captain/waypoint/NoOpWaypointSubmitter.java b/src/main/java/com/liveramp/captain/waypoint/NoOpWaypointSubmitter.java index 1c1a855..bd9ab5a 100644 --- a/src/main/java/com/liveramp/captain/waypoint/NoOpWaypointSubmitter.java +++ b/src/main/java/com/liveramp/captain/waypoint/NoOpWaypointSubmitter.java @@ -1,15 +1,12 @@ package com.liveramp.captain.waypoint; - import com.liveramp.captain.request_context.RequestContext; public class NoOpWaypointSubmitter implements WaypointSubmitter { - private NoOpWaypointSubmitter() { - } + private NoOpWaypointSubmitter() {} @Override - public void submitServiceRequest(long jobId, RequestContext requestOptions) { - } + public void submitServiceRequest(long jobId, RequestContext requestOptions) {} public static NoOpWaypointSubmitter get() { return new NoOpWaypointSubmitter(); diff --git a/src/main/java/com/liveramp/captain/waypoint/SyncWaypoint.java b/src/main/java/com/liveramp/captain/waypoint/SyncWaypoint.java index 9a68dfa..040a153 100644 --- a/src/main/java/com/liveramp/captain/waypoint/SyncWaypoint.java +++ b/src/main/java/com/liveramp/captain/waypoint/SyncWaypoint.java @@ -11,7 +11,6 @@ import com.liveramp.captain.request_submitter.RequestSubmitterWrapperFactory; import com.liveramp.captain.status_retriever.StatusRetrieverFactory; import com.liveramp.captain.step.CaptainStep; - import java.util.Optional; public class SyncWaypoint implements Waypoint { @@ -33,15 +32,29 @@ public SyncWaypoint(CaptainStep step, RequestSubmitter requestSub this(step, new RequestSubmitterWrapperFactory<>(requestSubmitter), null, Optional.empty()); } - public SyncWaypoint(CaptainStep step, RequestSubmitter requestSubmitter, HandlePersistor handlePersistor) { - this(step, new RequestSubmitterWrapperFactory<>(requestSubmitter), new HandlePersistorWrapperFactory<>(handlePersistor), Optional.empty()); + public SyncWaypoint( + CaptainStep step, + RequestSubmitter requestSubmitter, + HandlePersistor handlePersistor) { + this( + step, + new RequestSubmitterWrapperFactory<>(requestSubmitter), + new HandlePersistorWrapperFactory<>(handlePersistor), + Optional.empty()); } - public SyncWaypoint(CaptainStep step, RequestSubmitter requestSubmitter, HandlePersistor handlePersistor, StepPredicate stepPredicate) { - this(step, new RequestSubmitterWrapperFactory<>(requestSubmitter), new HandlePersistorWrapperFactory<>(handlePersistor), Optional.of(new StepPredicateWrapperFactory(stepPredicate))); + public SyncWaypoint( + CaptainStep step, + RequestSubmitter requestSubmitter, + HandlePersistor handlePersistor, + StepPredicate stepPredicate) { + this( + step, + new RequestSubmitterWrapperFactory<>(requestSubmitter), + new HandlePersistorWrapperFactory<>(handlePersistor), + Optional.of(new StepPredicateWrapperFactory(stepPredicate))); } - @Override public CaptainStep getStep() { return step; diff --git a/src/main/java/com/liveramp/captain/waypoint/Waypoint.java b/src/main/java/com/liveramp/captain/waypoint/Waypoint.java index 1ff8c5d..324116b 100644 --- a/src/main/java/com/liveramp/captain/waypoint/Waypoint.java +++ b/src/main/java/com/liveramp/captain/waypoint/Waypoint.java @@ -1,10 +1,9 @@ package com.liveramp.captain.waypoint; -import java.util.Optional; - import com.liveramp.captain.optional_step_predicate.StepPredicateFactory; import com.liveramp.captain.status_retriever.StatusRetrieverFactory; import com.liveramp.captain.step.CaptainStep; +import java.util.Optional; public interface Waypoint { CaptainStep getStep(); diff --git a/src/main/java/com/liveramp/captain/waypoint/WaypointSubmitter.java b/src/main/java/com/liveramp/captain/waypoint/WaypointSubmitter.java index 5626fc5..cc15051 100644 --- a/src/main/java/com/liveramp/captain/waypoint/WaypointSubmitter.java +++ b/src/main/java/com/liveramp/captain/waypoint/WaypointSubmitter.java @@ -1,6 +1,5 @@ package com.liveramp.captain.waypoint; - import com.liveramp.captain.request_context.RequestContext; public interface WaypointSubmitter { diff --git a/src/main/java/com/liveramp/captain/waypoint/WaypointType.java b/src/main/java/com/liveramp/captain/waypoint/WaypointType.java index 4bffd33..17d84fe 100644 --- a/src/main/java/com/liveramp/captain/waypoint/WaypointType.java +++ b/src/main/java/com/liveramp/captain/waypoint/WaypointType.java @@ -1,3 +1,7 @@ package com.liveramp.captain.waypoint; -public enum WaypointType { ASYNC, SYNC, FLOW_CONTROL} +public enum WaypointType { + ASYNC, + SYNC, + FLOW_CONTROL +} diff --git a/src/test/java/com/liveramp/captain/daemon/TestCaptainJoblet.java b/src/test/java/com/liveramp/captain/daemon/TestCaptainJoblet.java index 1c96014..e8c7729 100644 --- a/src/test/java/com/liveramp/captain/daemon/TestCaptainJoblet.java +++ b/src/test/java/com/liveramp/captain/daemon/TestCaptainJoblet.java @@ -1,16 +1,15 @@ package com.liveramp.captain.daemon; -import java.util.Map; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; -import org.mockito.verification.VerificationMode; - import com.liveramp.captain.app_type.CaptainAppType; import com.liveramp.captain.handle_persistor.HandlePersistor; import com.liveramp.captain.manifest.DefaultManifestImpl; @@ -30,14 +29,13 @@ import com.liveramp.captain.waypoint.AsyncWaypoint; import com.liveramp.captain.waypoint.ControlFlowWaypoint; import com.liveramp.captain.waypoint.SyncWaypoint; - -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.verification.VerificationMode; @SuppressWarnings("unchecked") @RunWith(MockitoJUnitRunner.class) @@ -45,35 +43,22 @@ public class TestCaptainJoblet { private Long JOB_ID = 10L; private Long SERVICE_HANDLE = 22L; - @Mock - private CaptainNotifier notifier; - - @Mock - private RequestSubmitter requestSubmitterStep1; - @Mock - private RequestSubmitter requestSubmitterStep2; - @Mock - private RequestSubmitter requestSubmitterStep4; - @Mock - private RequestSubmitter> requestSubmitterStep5; - @Mock - private RequestSubmitter requestSubmitterStep6; - - @Mock - private HandlePersistor handlePersistor2; - @Mock - private HandlePersistor handlePersistor4; - @Mock - private HandlePersistor> handlePersistor5; - - @Mock - private StatusRetriever statusRetrieverStep2; - @Mock - private StatusRetriever statusRetrieverStep3; - @Mock - private StatusRetriever statusRetrieverStep4; - @Mock - private StatusRetriever statusRetrieverStep5; + @Mock private CaptainNotifier notifier; + + @Mock private RequestSubmitter requestSubmitterStep1; + @Mock private RequestSubmitter requestSubmitterStep2; + @Mock private RequestSubmitter requestSubmitterStep4; + @Mock private RequestSubmitter> requestSubmitterStep5; + @Mock private RequestSubmitter requestSubmitterStep6; + + @Mock private HandlePersistor handlePersistor2; + @Mock private HandlePersistor handlePersistor4; + @Mock private HandlePersistor> handlePersistor5; + + @Mock private StatusRetriever statusRetrieverStep2; + @Mock private StatusRetriever statusRetrieverStep3; + @Mock private StatusRetriever statusRetrieverStep4; + @Mock private StatusRetriever statusRetrieverStep5; private ManifestManager manifestManager; private RequestUpdater requestUpdater; @@ -88,32 +73,36 @@ public class TestCaptainJoblet { private final CaptainStep CAPTAIN_STEP6 = CaptainStep.fromString("CAPTAIN_STEP6"); private final CaptainStep DONE = CaptainStep.fromString("DONE"); - private final CaptainAppType APP_TYPE_1 = CaptainAppType.fromString("APP_TYPE_1"); private final CaptainAppType APP_TYPE_2 = CaptainAppType.fromString("APP_TYPE_2"); private final FailedRequestPolicy FAILED_REQUEST_POLICY = new DefaultFailedRequestPolicy(); - @Mock - private FailedRequestPolicy failedRequestPolicyMock; + @Mock private FailedRequestPolicy failedRequestPolicyMock; @Before public void setup() { - Manifest testManifest = new DefaultManifestImpl(Lists.newArrayList( - new SyncWaypoint(CAPTAIN_STEP1, requestSubmitterStep1, null), - new SyncWaypoint(CAPTAIN_STEP2_A, requestSubmitterStep2, handlePersistor2), - new AsyncWaypoint(CAPTAIN_STEP2_B, requestSubmitterStep2, handlePersistor2, statusRetrieverStep2), - new ControlFlowWaypoint(CAPTAIN_STEP3, statusRetrieverStep3), - new AsyncWaypoint(CAPTAIN_STEP4, requestSubmitterStep4, handlePersistor4, statusRetrieverStep4), - new AsyncWaypoint(CAPTAIN_STEP5, requestSubmitterStep5, handlePersistor5, statusRetrieverStep5), - new SyncWaypoint(CAPTAIN_STEP6, requestSubmitterStep6, null) - )); - - Manifest testManifest2 = new DefaultManifestImpl(Lists.newArrayList( - new SyncWaypoint<>(CAPTAIN_STEP1, requestSubmitterStep1), - new AsyncWaypoint<>(CAPTAIN_STEP5, requestSubmitterStep5, handlePersistor5, statusRetrieverStep5) - )); + Manifest testManifest = + new DefaultManifestImpl( + Lists.newArrayList( + new SyncWaypoint(CAPTAIN_STEP1, requestSubmitterStep1, null), + new SyncWaypoint(CAPTAIN_STEP2_A, requestSubmitterStep2, handlePersistor2), + new AsyncWaypoint( + CAPTAIN_STEP2_B, requestSubmitterStep2, handlePersistor2, statusRetrieverStep2), + new ControlFlowWaypoint(CAPTAIN_STEP3, statusRetrieverStep3), + new AsyncWaypoint( + CAPTAIN_STEP4, requestSubmitterStep4, handlePersistor4, statusRetrieverStep4), + new AsyncWaypoint( + CAPTAIN_STEP5, requestSubmitterStep5, handlePersistor5, statusRetrieverStep5), + new SyncWaypoint(CAPTAIN_STEP6, requestSubmitterStep6, null))); + + Manifest testManifest2 = + new DefaultManifestImpl( + Lists.newArrayList( + new SyncWaypoint<>(CAPTAIN_STEP1, requestSubmitterStep1), + new AsyncWaypoint<>( + CAPTAIN_STEP5, requestSubmitterStep5, handlePersistor5, statusRetrieverStep5))); Map manifestFactoryMap = Maps.newHashMap(); manifestFactoryMap.put(APP_TYPE_1, new TestManifestFactory(testManifest)); @@ -126,194 +115,300 @@ public void setup() { @Test public void testGoToNextStep() throws Exception { - final CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, CAPTAIN_STEP1, APP_TYPE_1); + final CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, CAPTAIN_STEP1, APP_TYPE_1); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); - verify(requestUpdater, times(1)).setStepAndStatus(config.getId(), CAPTAIN_STEP1, CaptainStatus.COMPLETED, CAPTAIN_STEP2_A, CaptainStatus.READY); + verify(requestUpdater, times(1)) + .setStepAndStatus( + config.getId(), + CAPTAIN_STEP1, + CaptainStatus.COMPLETED, + CAPTAIN_STEP2_A, + CaptainStatus.READY); } @Test public void testGoToNextStepInitializing() throws Exception { - final CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, INITIALIZING, APP_TYPE_1); - - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); - - verify(requestUpdater, times(1)).setStepAndStatus(config.getId(), INITIALIZING, CaptainStatus.COMPLETED, CAPTAIN_STEP1, CaptainStatus.READY); + final CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, INITIALIZING, APP_TYPE_1); + + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); + + verify(requestUpdater, times(1)) + .setStepAndStatus( + config.getId(), + INITIALIZING, + CaptainStatus.COMPLETED, + CAPTAIN_STEP1, + CaptainStatus.READY); } @Test public void testGoToNextStepLastStep() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, CAPTAIN_STEP6, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, CAPTAIN_STEP6, APP_TYPE_1); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); - verify(requestUpdater, times(1)).setStepAndStatus(config.getId(), CAPTAIN_STEP6, CaptainStatus.COMPLETED, DONE, CaptainStatus.COMPLETED); + verify(requestUpdater, times(1)) + .setStepAndStatus( + config.getId(), CAPTAIN_STEP6, CaptainStatus.COMPLETED, DONE, CaptainStatus.COMPLETED); } @Test public void testSubmitRequestAsync() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP2_B, APP_TYPE_1); - when(requestSubmitterStep2.submit(eq(JOB_ID), any(RequestContext.class))).thenReturn(SERVICE_HANDLE); + when(requestSubmitterStep2.submit(eq(JOB_ID), any(RequestContext.class))) + .thenReturn(SERVICE_HANDLE); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); verify(requestSubmitterStep2, times(1)).submit(eq(JOB_ID), any(RequestContext.class)); verify(handlePersistor2, times(1)).persist(JOB_ID, SERVICE_HANDLE); - verify(requestUpdater, times(1)).setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.READY, CaptainStatus.PENDING); + verify(requestUpdater, times(1)) + .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.READY, CaptainStatus.PENDING); } @Test public void testSubmitRequestAsyncSupportsPendingFalse() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP2_B, APP_TYPE_1); - when(requestSubmitterStep2.submit(eq(JOB_ID), any(RequestContext.class))).thenReturn(SERVICE_HANDLE); + when(requestSubmitterStep2.submit(eq(JOB_ID), any(RequestContext.class))) + .thenReturn(SERVICE_HANDLE); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, false, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, false, false, FAILED_REQUEST_POLICY) + .run(); verify(requestSubmitterStep2, times(1)).submit(eq(JOB_ID), any(RequestContext.class)); verify(handlePersistor2, times(1)).persist(JOB_ID, SERVICE_HANDLE); - verify(requestUpdater, times(1)).setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.READY, CaptainStatus.IN_PROGRESS); + verify(requestUpdater, times(1)) + .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.READY, CaptainStatus.IN_PROGRESS); } @Test public void testSubmitRequestSync() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP1, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP1, APP_TYPE_1); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); verify(requestSubmitterStep1, times(1)).submit(eq(JOB_ID), any(RequestContext.class)); - verify(requestUpdater, times(1)).setStatus(config.getId(), CAPTAIN_STEP1, CaptainStatus.READY, CaptainStatus.COMPLETED); + verify(requestUpdater, times(1)) + .setStatus(config.getId(), CAPTAIN_STEP1, CaptainStatus.READY, CaptainStatus.COMPLETED); } @Test public void testSubmitRequestSyncWithHandlerPersistor() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP2_A, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP2_A, APP_TYPE_1); - when(requestSubmitterStep2.submit(eq(JOB_ID), any(RequestContext.class))).thenReturn(SERVICE_HANDLE); + when(requestSubmitterStep2.submit(eq(JOB_ID), any(RequestContext.class))) + .thenReturn(SERVICE_HANDLE); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); verify(requestSubmitterStep2, times(1)).submit(eq(JOB_ID), any(RequestContext.class)); verify(handlePersistor2, times(1)).persist(JOB_ID, SERVICE_HANDLE); - verify(requestUpdater, times(1)).setStatus(config.getId(), CAPTAIN_STEP2_A, CaptainStatus.READY, CaptainStatus.COMPLETED); + verify(requestUpdater, times(1)) + .setStatus(config.getId(), CAPTAIN_STEP2_A, CaptainStatus.READY, CaptainStatus.COMPLETED); } @Test public void testSubmitRequestFlowControl1() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP3, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP3, APP_TYPE_1); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); - verify(requestUpdater, times(1)).setStatus(config.getId(), CAPTAIN_STEP3, CaptainStatus.READY, CaptainStatus.PENDING); + verify(requestUpdater, times(1)) + .setStatus(config.getId(), CAPTAIN_STEP3, CaptainStatus.READY, CaptainStatus.PENDING); } @Test public void testCheckRequestStartedInProgress() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.PENDING, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.PENDING, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.IN_PROGRESS); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.COMPLETED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.FAILED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); - verify(requestUpdater, times(3)).setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.IN_PROGRESS); + verify(requestUpdater, times(3)) + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.IN_PROGRESS); } @Test public void testCheckRequestStartedQuarantine() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.PENDING, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.PENDING, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.QUARANTINED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.QUARANTINED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); - verify(requestUpdater, times(2)).setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.QUARANTINED); + verify(requestUpdater, times(2)) + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.QUARANTINED); } @Test public void testCheckRequestStartedUnknownStatus() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.PENDING, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.PENDING, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.PENDING); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); // i.e. do nothing. - verify(requestUpdater, times(0)).setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.IN_PROGRESS); - verify(requestUpdater, times(0)).setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.QUARANTINED); + verify(requestUpdater, times(0)) + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.IN_PROGRESS); + verify(requestUpdater, times(0)) + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.QUARANTINED); } @Test public void testRequestCompleteCompleted() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.COMPLETED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); - verify(requestUpdater, times(1)).setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); + verify(requestUpdater, times(1)) + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); } @Test public void testRequestCompleteQuarantined() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.QUARANTINED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.QUARANTINED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); - verify(requestUpdater, times(2)).setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.QUARANTINED); + verify(requestUpdater, times(2)) + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.QUARANTINED); } @Test public void testRequestCompleteFailed() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.FAILED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); // i.e. do nothing - verify(requestUpdater, times(0)).setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); - verify(requestUpdater, times(1)).setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.FAILED); - verify(requestUpdater, times(0)).setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.QUARANTINED); + verify(requestUpdater, times(0)) + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); + verify(requestUpdater, times(1)) + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.FAILED); + verify(requestUpdater, times(0)) + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.QUARANTINED); } @Test public void testRequestCompleteCancelled() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.CANCELLED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); verify(requestUpdater, times(1)).cancel(JOB_ID); } @Test public void testRequestCompleteCompletedFlowControl() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP3, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP3, APP_TYPE_1); when(statusRetrieverStep3.getStatus(JOB_ID)).thenReturn(CaptainStatus.COMPLETED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); - verify(requestUpdater, times(1)).setStatus(config.getId(), CAPTAIN_STEP3, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); + verify(requestUpdater, times(1)) + .setStatus( + config.getId(), CAPTAIN_STEP3, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); } - @Test public void testUsesCorrectManifestForAppType() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, CAPTAIN_STEP1, APP_TYPE_2); - - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY).run(); - - verify(requestUpdater, times(1)).setStepAndStatus(config.getId(), CAPTAIN_STEP1, CaptainStatus.COMPLETED, CAPTAIN_STEP5, CaptainStatus.READY); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, CAPTAIN_STEP1, APP_TYPE_2); + + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, FAILED_REQUEST_POLICY) + .run(); + + verify(requestUpdater, times(1)) + .setStepAndStatus( + config.getId(), + CAPTAIN_STEP1, + CaptainStatus.COMPLETED, + CAPTAIN_STEP5, + CaptainStatus.READY); } @Test @@ -334,17 +429,16 @@ public void testFailedRequestPolicyNoop() throws Exception { private void testFailedRequestPolicy( FailedRequestPolicy.FailedRequestAction mockedAction, VerificationMode timesRetry, - VerificationMode timesQuarantine) throws Exception { - - CaptainRequestConfig config = new SimpleCaptainConfig( - JOB_ID, - CaptainStatus.FAILED, - CAPTAIN_STEP1, + VerificationMode timesQuarantine) + throws Exception { - APP_TYPE_2); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.FAILED, CAPTAIN_STEP1, APP_TYPE_2); when(failedRequestPolicyMock.getFailedRequestAction(JOB_ID)).thenReturn(mockedAction); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, false, failedRequestPolicyMock).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, false, failedRequestPolicyMock) + .run(); verify(requestUpdater, timesRetry).retry(JOB_ID); verify(requestUpdater, timesQuarantine).quarantine(JOB_ID); diff --git a/src/test/java/com/liveramp/captain/daemon/TestCaptainJobletWithRammingSpeed.java b/src/test/java/com/liveramp/captain/daemon/TestCaptainJobletWithRammingSpeed.java index 88076cf..76a50cb 100644 --- a/src/test/java/com/liveramp/captain/daemon/TestCaptainJobletWithRammingSpeed.java +++ b/src/test/java/com/liveramp/captain/daemon/TestCaptainJobletWithRammingSpeed.java @@ -1,16 +1,16 @@ package com.liveramp.captain.daemon; -import java.util.Map; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InOrder; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; - import com.liveramp.captain.app_type.CaptainAppType; import com.liveramp.captain.handle_persistor.HandlePersistor; import com.liveramp.captain.manifest.DefaultManifestImpl; @@ -30,15 +30,13 @@ import com.liveramp.captain.waypoint.AsyncWaypoint; import com.liveramp.captain.waypoint.ControlFlowWaypoint; import com.liveramp.captain.waypoint.SyncWaypoint; - -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; @SuppressWarnings("unchecked") @RunWith(MockitoJUnitRunner.class) @@ -47,35 +45,22 @@ public class TestCaptainJobletWithRammingSpeed { private Long JOB_ID = 10L; private Long SERVICE_HANDLE = 22L; - @Mock - private CaptainNotifier notifier; - - @Mock - private RequestSubmitter requestSubmitterStep1; - @Mock - private RequestSubmitter requestSubmitterStep2; - @Mock - private RequestSubmitter requestSubmitterStep4; - @Mock - private RequestSubmitter> requestSubmitterStep5; - @Mock - private RequestSubmitter requestSubmitterStep6; - - @Mock - private HandlePersistor handlePersistor2; - @Mock - private HandlePersistor handlePersistor4; - @Mock - private HandlePersistor> handlePersistor5; - - @Mock - private StatusRetriever statusRetrieverStep2; - @Mock - private StatusRetriever statusRetrieverStep3; - @Mock - private StatusRetriever statusRetrieverStep4; - @Mock - private StatusRetriever statusRetrieverStep5; + @Mock private CaptainNotifier notifier; + + @Mock private RequestSubmitter requestSubmitterStep1; + @Mock private RequestSubmitter requestSubmitterStep2; + @Mock private RequestSubmitter requestSubmitterStep4; + @Mock private RequestSubmitter> requestSubmitterStep5; + @Mock private RequestSubmitter requestSubmitterStep6; + + @Mock private HandlePersistor handlePersistor2; + @Mock private HandlePersistor handlePersistor4; + @Mock private HandlePersistor> handlePersistor5; + + @Mock private StatusRetriever statusRetrieverStep2; + @Mock private StatusRetriever statusRetrieverStep3; + @Mock private StatusRetriever statusRetrieverStep4; + @Mock private StatusRetriever statusRetrieverStep5; private ManifestManager manifestManager; private RequestUpdater requestUpdater; @@ -90,7 +75,6 @@ public class TestCaptainJobletWithRammingSpeed { private final CaptainStep CAPTAIN_STEP6 = CaptainStep.fromString("CAPTAIN_STEP6"); private final CaptainStep DONE = CaptainStep.fromString("DONE"); - private final CaptainAppType APP_TYPE_1 = CaptainAppType.fromString("APP_TYPE_1"); private final CaptainAppType APP_TYPE_2 = CaptainAppType.fromString("APP_TYPE_2"); @@ -98,26 +82,32 @@ public class TestCaptainJobletWithRammingSpeed { @Before public void setup() { - Manifest testManifest = new DefaultManifestImpl(Lists.newArrayList( - new SyncWaypoint(CAPTAIN_STEP1, requestSubmitterStep1, null), - new SyncWaypoint(CAPTAIN_STEP2_A, requestSubmitterStep2, handlePersistor2), - new AsyncWaypoint(CAPTAIN_STEP2_B, requestSubmitterStep2, handlePersistor2, statusRetrieverStep2), - new ControlFlowWaypoint(CAPTAIN_STEP3, statusRetrieverStep3), - new AsyncWaypoint(CAPTAIN_STEP4, requestSubmitterStep4, handlePersistor4, statusRetrieverStep4), - new AsyncWaypoint(CAPTAIN_STEP5, requestSubmitterStep5, handlePersistor5, statusRetrieverStep5), - new SyncWaypoint(CAPTAIN_STEP6, requestSubmitterStep6, null) - )); - - Manifest testManifest2 = new DefaultManifestImpl(Lists.newArrayList( - new SyncWaypoint<>(CAPTAIN_STEP1, requestSubmitterStep1), - new AsyncWaypoint<>(CAPTAIN_STEP5, requestSubmitterStep5, handlePersistor5, statusRetrieverStep5) - )); + Manifest testManifest = + new DefaultManifestImpl( + Lists.newArrayList( + new SyncWaypoint(CAPTAIN_STEP1, requestSubmitterStep1, null), + new SyncWaypoint(CAPTAIN_STEP2_A, requestSubmitterStep2, handlePersistor2), + new AsyncWaypoint( + CAPTAIN_STEP2_B, requestSubmitterStep2, handlePersistor2, statusRetrieverStep2), + new ControlFlowWaypoint(CAPTAIN_STEP3, statusRetrieverStep3), + new AsyncWaypoint( + CAPTAIN_STEP4, requestSubmitterStep4, handlePersistor4, statusRetrieverStep4), + new AsyncWaypoint( + CAPTAIN_STEP5, requestSubmitterStep5, handlePersistor5, statusRetrieverStep5), + new SyncWaypoint(CAPTAIN_STEP6, requestSubmitterStep6, null))); + + Manifest testManifest2 = + new DefaultManifestImpl( + Lists.newArrayList( + new SyncWaypoint<>(CAPTAIN_STEP1, requestSubmitterStep1), + new AsyncWaypoint<>( + CAPTAIN_STEP5, requestSubmitterStep5, handlePersistor5, statusRetrieverStep5))); Map manifestFactoryMap = Maps.newHashMap(); - manifestFactoryMap - .put(CaptainAppType.fromString("APP_TYPE_1"), new TestManifestFactory(testManifest)); - manifestFactoryMap - .put(CaptainAppType.fromString("APP_TYPE_2"), new TestManifestFactory(testManifest2)); + manifestFactoryMap.put( + CaptainAppType.fromString("APP_TYPE_1"), new TestManifestFactory(testManifest)); + manifestFactoryMap.put( + CaptainAppType.fromString("APP_TYPE_2"), new TestManifestFactory(testManifest2)); manifestManager = MultiAppManifestManager.ofManifestFactories(manifestFactoryMap); requestUpdater = spy(RequestUpdater.class); @@ -125,21 +115,35 @@ public void setup() { @Test public void testGoToNextStep() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig( - JOB_ID, CaptainStatus.COMPLETED, CAPTAIN_STEP1, - APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, CAPTAIN_STEP1, APP_TYPE_1); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, false, true, FAILED_REQUEST_POLICY) + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, false, true, FAILED_REQUEST_POLICY) .run(); InOrder inOrder = inOrder(requestUpdater); - inOrder.verify(requestUpdater, atLeastOnce()) - .setStepAndStatus(config.getId(), CAPTAIN_STEP1, CaptainStatus.COMPLETED, CAPTAIN_STEP2_A, CaptainStatus.READY); - inOrder.verify(requestUpdater, atLeastOnce()) + inOrder + .verify(requestUpdater, atLeastOnce()) + .setStepAndStatus( + config.getId(), + CAPTAIN_STEP1, + CaptainStatus.COMPLETED, + CAPTAIN_STEP2_A, + CaptainStatus.READY); + inOrder + .verify(requestUpdater, atLeastOnce()) .setStatus(config.getId(), CAPTAIN_STEP2_A, CaptainStatus.READY, CaptainStatus.COMPLETED); - inOrder.verify(requestUpdater, atLeastOnce()) - .setStepAndStatus(config.getId(), CAPTAIN_STEP2_A, CaptainStatus.COMPLETED, CAPTAIN_STEP2_B, CaptainStatus.READY); - inOrder.verify(requestUpdater, atLeastOnce()) + inOrder + .verify(requestUpdater, atLeastOnce()) + .setStepAndStatus( + config.getId(), + CAPTAIN_STEP2_A, + CaptainStatus.COMPLETED, + CAPTAIN_STEP2_B, + CaptainStatus.READY); + inOrder + .verify(requestUpdater, atLeastOnce()) .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.READY, CaptainStatus.IN_PROGRESS); } @@ -148,57 +152,88 @@ public void testGoToNextStepFromAsync() throws Exception { when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.COMPLETED); when(statusRetrieverStep3.getStatus(JOB_ID)).thenReturn(CaptainStatus.COMPLETED); - CaptainRequestConfig config = new SimpleCaptainConfig( - JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, - APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); new CaptainJoblet( - config, notifier, requestUpdater, manifestManager, false, true, FAILED_REQUEST_POLICY) + config, notifier, requestUpdater, manifestManager, false, true, FAILED_REQUEST_POLICY) .run(); InOrder inOrder = inOrder(requestUpdater); - inOrder.verify(requestUpdater, atLeastOnce()) - .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); - inOrder.verify(requestUpdater, atLeastOnce()) - .setStepAndStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.COMPLETED, CAPTAIN_STEP3, CaptainStatus.READY); - inOrder.verify(requestUpdater, atLeastOnce()) + inOrder + .verify(requestUpdater, atLeastOnce()) + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); + inOrder + .verify(requestUpdater, atLeastOnce()) + .setStepAndStatus( + config.getId(), + CAPTAIN_STEP2_B, + CaptainStatus.COMPLETED, + CAPTAIN_STEP3, + CaptainStatus.READY); + inOrder + .verify(requestUpdater, atLeastOnce()) .setStatus(config.getId(), CAPTAIN_STEP3, CaptainStatus.READY, CaptainStatus.IN_PROGRESS); - inOrder.verify(requestUpdater, atLeastOnce()) - .setStatus(config.getId(), CAPTAIN_STEP3, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); - inOrder.verify(requestUpdater, atLeastOnce()) - .setStepAndStatus(config.getId(), CAPTAIN_STEP3, CaptainStatus.COMPLETED, CAPTAIN_STEP4, CaptainStatus.READY); - inOrder.verify(requestUpdater, atLeastOnce()) + inOrder + .verify(requestUpdater, atLeastOnce()) + .setStatus( + config.getId(), CAPTAIN_STEP3, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); + inOrder + .verify(requestUpdater, atLeastOnce()) + .setStepAndStatus( + config.getId(), + CAPTAIN_STEP3, + CaptainStatus.COMPLETED, + CAPTAIN_STEP4, + CaptainStatus.READY); + inOrder + .verify(requestUpdater, atLeastOnce()) .setStatus(config.getId(), CAPTAIN_STEP4, CaptainStatus.READY, CaptainStatus.IN_PROGRESS); } @Test public void testGoToNextStepInitializing() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, INITIALIZING, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, INITIALIZING, APP_TYPE_1); new CaptainJoblet( - config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); verify(requestUpdater, times(1)) - .setStepAndStatus(config.getId(), INITIALIZING, CaptainStatus.COMPLETED, CAPTAIN_STEP1, CaptainStatus.READY); + .setStepAndStatus( + config.getId(), + INITIALIZING, + CaptainStatus.COMPLETED, + CAPTAIN_STEP1, + CaptainStatus.READY); } @Test public void testGoToNextStepLastStep() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, CAPTAIN_STEP6, APP_TYPE_1); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, CAPTAIN_STEP6, APP_TYPE_1); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); verify(requestUpdater, times(1)) - .setStepAndStatus(config.getId(), CAPTAIN_STEP6, CaptainStatus.COMPLETED, DONE, CaptainStatus.COMPLETED); + .setStepAndStatus( + config.getId(), CAPTAIN_STEP6, CaptainStatus.COMPLETED, DONE, CaptainStatus.COMPLETED); } @Test public void testSubmitRequestAsync() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP2_B, APP_TYPE_1); - when(requestSubmitterStep2.submit(eq(JOB_ID), any(RequestContext.class))).thenReturn(SERVICE_HANDLE); + when(requestSubmitterStep2.submit(eq(JOB_ID), any(RequestContext.class))) + .thenReturn(SERVICE_HANDLE); new CaptainJoblet( - config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); verify(requestSubmitterStep2, times(1)).submit(eq(JOB_ID), any(RequestContext.class)); verify(handlePersistor2, times(1)).persist(JOB_ID, SERVICE_HANDLE); @@ -208,11 +243,14 @@ public void testSubmitRequestAsync() throws Exception { @Test public void testSubmitRequestAsyncSupportsPendingFalse() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP2_B, APP_TYPE_1); - when(requestSubmitterStep2.submit(eq(JOB_ID), any(RequestContext.class))).thenReturn(SERVICE_HANDLE); + when(requestSubmitterStep2.submit(eq(JOB_ID), any(RequestContext.class))) + .thenReturn(SERVICE_HANDLE); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, false, true, FAILED_REQUEST_POLICY) + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, false, true, FAILED_REQUEST_POLICY) .run(); verify(requestSubmitterStep2, times(1)).submit(eq(JOB_ID), any(RequestContext.class)); @@ -223,9 +261,12 @@ public void testSubmitRequestAsyncSupportsPendingFalse() throws Exception { @Test public void testSubmitRequestSync() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP1, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP1, APP_TYPE_1); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); verify(requestSubmitterStep1, times(1)).submit(eq(JOB_ID), any(RequestContext.class)); verify(requestUpdater, times(1)) @@ -234,147 +275,210 @@ public void testSubmitRequestSync() throws Exception { @Test public void testSubmitRequestSyncWithHandlerPersistor() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP2_A, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP2_A, APP_TYPE_1); - when(requestSubmitterStep2.submit(eq(JOB_ID), any(RequestContext.class))).thenReturn(SERVICE_HANDLE); + when(requestSubmitterStep2.submit(eq(JOB_ID), any(RequestContext.class))) + .thenReturn(SERVICE_HANDLE); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, false, true, FAILED_REQUEST_POLICY) + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, false, true, FAILED_REQUEST_POLICY) .run(); verify(requestSubmitterStep2, times(2)).submit(eq(JOB_ID), any(RequestContext.class)); verify(handlePersistor2, times(2)).persist(JOB_ID, SERVICE_HANDLE); InOrder inOrder = inOrder(requestUpdater); - inOrder.verify(requestUpdater, atLeastOnce()) + inOrder + .verify(requestUpdater, atLeastOnce()) .setStatus(config.getId(), CAPTAIN_STEP2_A, CaptainStatus.READY, CaptainStatus.COMPLETED); - inOrder.verify(requestUpdater, atLeastOnce()) - .setStepAndStatus(config.getId(), CAPTAIN_STEP2_A, CaptainStatus.COMPLETED, CAPTAIN_STEP2_B, CaptainStatus.READY); - inOrder.verify(requestUpdater, atLeastOnce()) + inOrder + .verify(requestUpdater, atLeastOnce()) + .setStepAndStatus( + config.getId(), + CAPTAIN_STEP2_A, + CaptainStatus.COMPLETED, + CAPTAIN_STEP2_B, + CaptainStatus.READY); + inOrder + .verify(requestUpdater, atLeastOnce()) .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.READY, CaptainStatus.IN_PROGRESS); } @Test public void testSubmitRequestFlowControl1() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP3, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.READY, CAPTAIN_STEP3, APP_TYPE_1); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); - verify(requestUpdater, times(1)).setStatus(config.getId(), CAPTAIN_STEP3, CaptainStatus.READY, CaptainStatus.PENDING); + verify(requestUpdater, times(1)) + .setStatus(config.getId(), CAPTAIN_STEP3, CaptainStatus.READY, CaptainStatus.PENDING); } @Test public void testCheckRequestStartedInProgress() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.PENDING, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.PENDING, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.IN_PROGRESS); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.COMPLETED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.FAILED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); verify(requestUpdater, times(3)) - .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.IN_PROGRESS); + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.IN_PROGRESS); } @Test public void testCheckRequestStartedQuarantine() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.PENDING, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.PENDING, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.QUARANTINED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.QUARANTINED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); verify(requestUpdater, times(2)) - .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.QUARANTINED); + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.QUARANTINED); } @Test public void testCheckRequestStartedUnknownStatus() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.PENDING, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.PENDING, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.PENDING); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); // i.e. do nothing. verify(requestUpdater, times(0)) - .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.IN_PROGRESS); + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.IN_PROGRESS); verify(requestUpdater, times(0)) - .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.QUARANTINED); + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.PENDING, CaptainStatus.QUARANTINED); } @Test public void testRequestCompleteCompleted() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.COMPLETED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); verify(requestUpdater, times(1)) - .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); } @Test public void testRequestCompleteQuarantined() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.QUARANTINED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.QUARANTINED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); verify(requestUpdater, times(2)) - .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.QUARANTINED); + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.QUARANTINED); } @Test public void testRequestCompleteFailed() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.FAILED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); // i.e. do nothing verify(requestUpdater, times(0)) - .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); verify(requestUpdater, times(0)) - .setStatus(config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.QUARANTINED); + .setStatus( + config.getId(), CAPTAIN_STEP2_B, CaptainStatus.IN_PROGRESS, CaptainStatus.QUARANTINED); } @Test public void testRequestCompleteCancelled() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP2_B, APP_TYPE_1); when(statusRetrieverStep2.getStatus(JOB_ID)).thenReturn(CaptainStatus.CANCELLED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); verify(requestUpdater, times(1)).cancel(JOB_ID); } @Test public void testRequestCompleteCompletedFlowControl() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP3, APP_TYPE_1); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.IN_PROGRESS, CAPTAIN_STEP3, APP_TYPE_1); when(statusRetrieverStep3.getStatus(JOB_ID)).thenReturn(CaptainStatus.COMPLETED); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); verify(requestUpdater, times(1)) - .setStatus(config.getId(), CAPTAIN_STEP3, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); + .setStatus( + config.getId(), CAPTAIN_STEP3, CaptainStatus.IN_PROGRESS, CaptainStatus.COMPLETED); } - @Test public void testUsesCorrectManifestForAppType() throws Exception { - CaptainRequestConfig config = new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, CAPTAIN_STEP1, APP_TYPE_2); + CaptainRequestConfig config = + new SimpleCaptainConfig(JOB_ID, CaptainStatus.COMPLETED, CAPTAIN_STEP1, APP_TYPE_2); - new CaptainJoblet(config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY).run(); + new CaptainJoblet( + config, notifier, requestUpdater, manifestManager, true, true, FAILED_REQUEST_POLICY) + .run(); verify(requestUpdater, times(1)) - .setStepAndStatus(config.getId(), CAPTAIN_STEP1, CaptainStatus.COMPLETED, CAPTAIN_STEP5, CaptainStatus.READY); + .setStepAndStatus( + config.getId(), + CAPTAIN_STEP1, + CaptainStatus.COMPLETED, + CAPTAIN_STEP5, + CaptainStatus.READY); } } diff --git a/src/test/java/com/liveramp/captain/daemon/manifest/TestDefaultManifestImpl.java b/src/test/java/com/liveramp/captain/daemon/manifest/TestDefaultManifestImpl.java index 7d12579..b956091 100644 --- a/src/test/java/com/liveramp/captain/daemon/manifest/TestDefaultManifestImpl.java +++ b/src/test/java/com/liveramp/captain/daemon/manifest/TestDefaultManifestImpl.java @@ -1,11 +1,10 @@ package com.liveramp.captain.daemon.manifest; -import java.util.Optional; +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; import com.google.common.collect.Lists; -import org.junit.Before; -import org.junit.Test; - import com.liveramp.captain.handle_persistor.HandlePersistor; import com.liveramp.captain.manifest.DefaultManifestImpl; import com.liveramp.captain.manifest.Manifest; @@ -14,10 +13,9 @@ import com.liveramp.captain.step.CaptainStep; import com.liveramp.captain.waypoint.SyncWaypoint; import com.liveramp.captain.waypoint.Waypoint; - -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; +import java.util.Optional; +import org.junit.Before; +import org.junit.Test; public class TestDefaultManifestImpl { @@ -38,65 +36,71 @@ public class TestDefaultManifestImpl { @Before public void setup() { - requestSubmitter = (RequestSubmitter)mock(RequestSubmitter.class); - handlePersistor = (HandlePersistor)mock(HandlePersistor.class); - - nonOptionalWaypoint1 = new SyncWaypoint(CAPTAIN_STEP1, requestSubmitter, handlePersistor, StepPredicates.alwaysFalse()); - nonOptionalWaypoint2 = new SyncWaypoint(CAPTAIN_STEP2, requestSubmitter, handlePersistor, StepPredicates.alwaysFalse()); - optionalWaypoint1 = new SyncWaypoint(CAPTAIN_STEP3, requestSubmitter, handlePersistor, StepPredicates.alwaysTrue()); - optionalWaypoint2 = new SyncWaypoint(CAPTAIN_STEP4, requestSubmitter, handlePersistor, StepPredicates.alwaysTrue()); - waypointMissingStepPredicate = new SyncWaypoint(CAPTAIN_STEP5, requestSubmitter, handlePersistor, StepPredicates.alwaysFalse()); + requestSubmitter = (RequestSubmitter) mock(RequestSubmitter.class); + handlePersistor = (HandlePersistor) mock(HandlePersistor.class); + + nonOptionalWaypoint1 = + new SyncWaypoint( + CAPTAIN_STEP1, requestSubmitter, handlePersistor, StepPredicates.alwaysFalse()); + nonOptionalWaypoint2 = + new SyncWaypoint( + CAPTAIN_STEP2, requestSubmitter, handlePersistor, StepPredicates.alwaysFalse()); + optionalWaypoint1 = + new SyncWaypoint( + CAPTAIN_STEP3, requestSubmitter, handlePersistor, StepPredicates.alwaysTrue()); + optionalWaypoint2 = + new SyncWaypoint( + CAPTAIN_STEP4, requestSubmitter, handlePersistor, StepPredicates.alwaysTrue()); + waypointMissingStepPredicate = + new SyncWaypoint( + CAPTAIN_STEP5, requestSubmitter, handlePersistor, StepPredicates.alwaysFalse()); } @Test public void testGetNextStepNonoptional() { - Manifest testManifest = new DefaultManifestImpl(Lists.newArrayList( - nonOptionalWaypoint1, - nonOptionalWaypoint2 - )); - checkExpectedNextStep(testManifest, nonOptionalWaypoint1.getStep(), nonOptionalWaypoint2.getStep()); + Manifest testManifest = + new DefaultManifestImpl(Lists.newArrayList(nonOptionalWaypoint1, nonOptionalWaypoint2)); + checkExpectedNextStep( + testManifest, nonOptionalWaypoint1.getStep(), nonOptionalWaypoint2.getStep()); } @Test public void testGetNextStepOptional() { - Manifest testManifest = new DefaultManifestImpl(Lists.newArrayList( - nonOptionalWaypoint1, - optionalWaypoint1, - optionalWaypoint2, - nonOptionalWaypoint2 - )); - checkExpectedNextStep(testManifest, nonOptionalWaypoint1.getStep(), nonOptionalWaypoint2.getStep()); + Manifest testManifest = + new DefaultManifestImpl( + Lists.newArrayList( + nonOptionalWaypoint1, optionalWaypoint1, optionalWaypoint2, nonOptionalWaypoint2)); + checkExpectedNextStep( + testManifest, nonOptionalWaypoint1.getStep(), nonOptionalWaypoint2.getStep()); } @Test public void testFirstStepOptional() { - Manifest testManifest = new DefaultManifestImpl(Lists.newArrayList( - optionalWaypoint1, - nonOptionalWaypoint1 - )); - checkExpectedNextStep(testManifest, CaptainStep.fromString("INITIALIZING"), nonOptionalWaypoint1.getStep()); + Manifest testManifest = + new DefaultManifestImpl(Lists.newArrayList(optionalWaypoint1, nonOptionalWaypoint1)); + checkExpectedNextStep( + testManifest, CaptainStep.fromString("INITIALIZING"), nonOptionalWaypoint1.getStep()); } @Test public void testGetNextStepOptionalLastStep() { - Manifest testManifest = new DefaultManifestImpl(Lists.newArrayList( - nonOptionalWaypoint1, - optionalWaypoint1 - )); + Manifest testManifest = + new DefaultManifestImpl(Lists.newArrayList(nonOptionalWaypoint1, optionalWaypoint1)); checkExpectedNextStep(testManifest, nonOptionalWaypoint1.getStep(), null); } @Test public void testMissingOptionalPredicate() { - Manifest testManifest = new DefaultManifestImpl(Lists.newArrayList( - nonOptionalWaypoint1, - optionalWaypoint1, - waypointMissingStepPredicate - )); - checkExpectedNextStep(testManifest, nonOptionalWaypoint1.getStep(), waypointMissingStepPredicate.getStep()); + Manifest testManifest = + new DefaultManifestImpl( + Lists.newArrayList( + nonOptionalWaypoint1, optionalWaypoint1, waypointMissingStepPredicate)); + checkExpectedNextStep( + testManifest, nonOptionalWaypoint1.getStep(), waypointMissingStepPredicate.getStep()); } - private void checkExpectedNextStep(Manifest manifest, CaptainStep currentStep, CaptainStep expectedStep) { + private void checkExpectedNextStep( + Manifest manifest, CaptainStep currentStep, CaptainStep expectedStep) { Optional nextStep = manifest.getNextStep(currentStep, 0); if (expectedStep == null) { assertTrue(!nextStep.isPresent());