Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public boolean equals(Object o) {
return false;
}

return appType.equals(((CaptainAppType)o).get());
return appType.equals(((CaptainAppType) o).get());
}

@Override
Expand Down
94 changes: 53 additions & 41 deletions src/main/java/com/liveramp/captain/daemon/BaseCaptainBuilder.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package com.liveramp.captain.daemon;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;

import com.liveramp.captain.manifest_manager.ManifestManager;
import com.liveramp.captain.notifier.CaptainDaemonInternalNotifier;
import com.liveramp.captain.notifier.CaptainNotifier;
Expand All @@ -18,18 +12,22 @@
import com.liveramp.daemon_lib.JobletCallback;
import com.liveramp.daemon_lib.builders.ThreadingDaemonBuilder;
import com.liveramp.daemon_lib.utils.JobletCallbackUtil;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;

public class BaseCaptainBuilder<T extends BaseCaptainBuilder<T>> {

/**
* allows the base builder to return the type of the subclass that using it.
* allows us to have the unchecked exception in one place so that it can suppressed once for code cleanliness.
* by making it protected, it allows a consumer to extend a builder that they build.
* by handling this in an instance variable as opposed to an abstract method, we avoid making the extender having
* to worry about implementing a "get self" method.
* allows the base builder to return the type of the subclass that using it. allows us to have the
* unchecked exception in one place so that it can suppressed once for code cleanliness. by making
* it protected, it allows a consumer to extend a builder that they build. by handling this in an
* instance variable as opposed to an abstract method, we avoid making the extender having to
* worry about implementing a "get self" method.
*/
@SuppressWarnings("unchecked")
protected final T self = (T)this;
protected final T self = (T) this;

private boolean DEFAULT_SUPPORTS_PENDING = false;
private boolean DEFAULT_SUPPORTS_RAMMING_SPEED = false;
Expand All @@ -56,7 +54,11 @@ public class BaseCaptainBuilder<T extends BaseCaptainBuilder<T>> {
private Optional<JobletCallback<CaptainRequestConfig>> failureCallback = Optional.empty();
private FailedRequestPolicy failedRequestPolicy = new DefaultFailedRequestPolicy();

protected BaseCaptainBuilder(String identifier, CaptainConfigProducer configProducer, ManifestManager manifestManager, RequestUpdater requestUpdater) {
protected BaseCaptainBuilder(
String identifier,
CaptainConfigProducer configProducer,
ManifestManager manifestManager,
RequestUpdater requestUpdater) {
this.identifier = identifier;
this.configProducer = configProducer;
this.manifestManager = manifestManager;
Expand All @@ -68,9 +70,7 @@ public T setFailedRequestPolicy(FailedRequestPolicy failedRequestPolicy) {
return self;
}

/**
* BETA
*/
/** BETA */
public T setRammingSpeed(boolean rammingSpeed) {
this.rammingSpeed = rammingSpeed;

Expand All @@ -96,13 +96,16 @@ public T setDaemonConfigProductionLock(DaemonLock daemonConfigProductionLock) {
}

/**
* built-in: if you have a zookeeper cluster you can point to, we can handle the locking infra for you.
* built-in: if you have a zookeeper cluster you can point to, we can handle the locking infra for
* you.
*
* @param curatorFramework
* @return
*/
public T setZkDaemonLock(CuratorFramework curatorFramework) {
setDaemonConfigProductionLock(com.liveramp.captain.daemon.CaptainZkDaemonLock.getProduction(curatorFramework, identifier));
setDaemonConfigProductionLock(
com.liveramp.captain.daemon.CaptainZkDaemonLock.getProduction(
curatorFramework, identifier));

return self;
}
Expand Down Expand Up @@ -150,7 +153,8 @@ public T setConfigWaitTime(int configWaitTime, TimeUnit unit) {
* @return
*/
public T setExecutionSlotWaitTime(int executionSlotWaitTime, TimeUnit unit) {
this.executionSlotWaitSeconds = Optional.of(Math.toIntExact(unit.toSeconds(executionSlotWaitTime)));
this.executionSlotWaitSeconds =
Optional.of(Math.toIntExact(unit.toSeconds(executionSlotWaitTime)));

return self;
}
Expand Down Expand Up @@ -185,24 +189,23 @@ public T setFailureCallback(JobletCallback<CaptainRequestConfig> failureCallback
return self;
}

public Daemon<CaptainRequestConfig> build() throws IllegalAccessException, IOException, InstantiationException {
CaptainNotifier resolvedCaptainNotifier = notifier != null ? notifier : new DefaultCaptainLoggingNotifier();
public Daemon<CaptainRequestConfig> build()
throws IllegalAccessException, IOException, InstantiationException {
CaptainNotifier resolvedCaptainNotifier =
notifier != null ? notifier : new DefaultCaptainLoggingNotifier();

CaptainJobletFactory jobletFactory = new ThreadedCaptainJobletFactoryImpl(
requestUpdater,
manifestManager,
resolvedCaptainNotifier,
supportsPending,
rammingSpeed,
failedRequestPolicy
);
CaptainJobletFactory jobletFactory =
new ThreadedCaptainJobletFactoryImpl(
requestUpdater,
manifestManager,
resolvedCaptainNotifier,
supportsPending,
rammingSpeed,
failedRequestPolicy);

ThreadingDaemonBuilder<CaptainRequestConfig> daemonBuilder = new ThreadingDaemonBuilder<>(
identifier,
jobletFactory,
configProducer
)
.setNotifier(new CaptainDaemonInternalNotifier(resolvedCaptainNotifier));
ThreadingDaemonBuilder<CaptainRequestConfig> daemonBuilder =
new ThreadingDaemonBuilder<>(identifier, jobletFactory, configProducer)
.setNotifier(new CaptainDaemonInternalNotifier(resolvedCaptainNotifier));

maxThreads.ifPresent(daemonBuilder::setMaxThreads);
nextConfigWaitSeconds.ifPresent(daemonBuilder::setNextConfigWaitSeconds);
Expand All @@ -211,8 +214,10 @@ public Daemon<CaptainRequestConfig> build() throws IllegalAccessException, IOExc
failureWaitSeconds.ifPresent(daemonBuilder::setFailureWaitSeconds);
daemonConfigProductionLock.ifPresent(daemonBuilder::setDaemonConfigProductionLock);

Optional<JobletCallback<CaptainRequestConfig>> lockRequestCallbackOptional = generateCallbackOptionalFromRequestLockOptional(requestLock, true);
Optional<JobletCallback<CaptainRequestConfig>> unlockRequestCallbackOptional = generateCallbackOptionalFromRequestLockOptional(requestLock, false);
Optional<JobletCallback<CaptainRequestConfig>> lockRequestCallbackOptional =
generateCallbackOptionalFromRequestLockOptional(requestLock, true);
Optional<JobletCallback<CaptainRequestConfig>> unlockRequestCallbackOptional =
generateCallbackOptionalFromRequestLockOptional(requestLock, false);

composeRequestLockCallbackAndOtherCallbacks(lockRequestCallbackOptional, onNewConfigCallback)
.ifPresent(daemonBuilder::setOnNewConfigCallback);
Expand All @@ -224,19 +229,26 @@ public Daemon<CaptainRequestConfig> build() throws IllegalAccessException, IOExc
return daemonBuilder.build();
}

private Optional<JobletCallback<CaptainRequestConfig>> generateCallbackOptionalFromRequestLockOptional(Optional<CaptainRequestLock> requestLock, boolean lock) {
private Optional<JobletCallback<CaptainRequestConfig>>
generateCallbackOptionalFromRequestLockOptional(
Optional<CaptainRequestLock> requestLock, boolean lock) {
if (requestLock.isPresent()) {
if (lock) {
return Optional.of(new CaptainRequestLockingCallbacks.CaptainRequestLockCallback(requestLock.get()));
return Optional.of(
new CaptainRequestLockingCallbacks.CaptainRequestLockCallback(requestLock.get()));
} else {
return Optional.of(new CaptainRequestLockingCallbacks.CaptainRequestUnlockCallback(requestLock.get()));
return Optional.of(
new CaptainRequestLockingCallbacks.CaptainRequestUnlockCallback(requestLock.get()));
}
} else {
return Optional.empty();
}
}

private Optional<JobletCallback<CaptainRequestConfig>> composeRequestLockCallbackAndOtherCallbacks(Optional<JobletCallback<CaptainRequestConfig>> requestLock, Optional<JobletCallback<CaptainRequestConfig>> callback) {
private Optional<JobletCallback<CaptainRequestConfig>>
composeRequestLockCallbackAndOtherCallbacks(
Optional<JobletCallback<CaptainRequestConfig>> requestLock,
Optional<JobletCallback<CaptainRequestConfig>> callback) {
if (requestLock.isPresent() && callback.isPresent()) {
return Optional.of(JobletCallbackUtil.compose(requestLock.get(), callback.get()));
} else if (requestLock.isPresent()) {
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/liveramp/captain/daemon/CaptainBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class CaptainBuilder extends BaseCaptainBuilder<CaptainBuilder> {
public CaptainBuilder(String identifier, CaptainConfigProducer configProducer, ManifestManager manifestManager, RequestUpdater requestUpdater) {
public CaptainBuilder(
String identifier,
CaptainConfigProducer configProducer,
ManifestManager manifestManager,
RequestUpdater requestUpdater) {
super(identifier, configProducer, manifestManager, requestUpdater);
}

public static CaptainBuilder of(String identifier, CaptainConfigProducer configProducer, ManifestManager multiAppManifestManager, RequestUpdater requestUpdater) {
public static CaptainBuilder of(
String identifier,
CaptainConfigProducer configProducer,
ManifestManager multiAppManifestManager,
RequestUpdater requestUpdater) {
return new CaptainBuilder(identifier, configProducer, multiAppManifestManager, requestUpdater);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,3 @@
public interface CaptainConfigProducer extends JobletConfigProducer<CaptainRequestConfig> {
CaptainRequestConfig getNextConfig() throws DaemonException;
}

Loading