Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,17 @@
import com.uber.cadence.ActivityType;
import com.uber.cadence.Decision;
import com.uber.cadence.DecisionType;
import com.uber.cadence.DescribeWorkflowExecutionRequest;
import com.uber.cadence.DescribeWorkflowExecutionResponse;
import com.uber.cadence.EntityNotExistsError;
import com.uber.cadence.EventType;
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
import com.uber.cadence.History;
import com.uber.cadence.HistoryEvent;
import com.uber.cadence.HistoryEventFilterType;
import com.uber.cadence.StartWorkflowExecutionRequest;
import com.uber.cadence.TaskList;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.WorkflowExecutionCloseStatus;
import com.uber.cadence.WorkflowExecutionContinuedAsNewEventAttributes;
import com.uber.cadence.WorkflowExecutionFailedEventAttributes;
import com.uber.cadence.WorkflowExecutionInfo;
import com.uber.cadence.WorkflowExecutionTerminatedEventAttributes;
import com.uber.cadence.WorkflowExecutionTimedOutEventAttributes;
import com.uber.cadence.WorkflowType;
Expand Down Expand Up @@ -389,42 +384,6 @@ public static boolean isWorkflowExecutionCompleteDecision(Decision decision) {
|| decision.getDecisionType() == DecisionType.ContinueAsNewWorkflowExecution));
}

public static boolean isActivityTaskClosedEvent(HistoryEvent event) {
return ((event != null)
&& (event.getEventType() == EventType.ActivityTaskCompleted
|| event.getEventType() == EventType.ActivityTaskCanceled
|| event.getEventType() == EventType.ActivityTaskFailed
|| event.getEventType() == EventType.ActivityTaskTimedOut));
}

public static boolean isExternalWorkflowClosedEvent(HistoryEvent event) {
return ((event != null)
&& (event.getEventType() == EventType.ChildWorkflowExecutionCompleted
|| event.getEventType() == EventType.ChildWorkflowExecutionCanceled
|| event.getEventType() == EventType.ChildWorkflowExecutionFailed
|| event.getEventType() == EventType.ChildWorkflowExecutionTerminated
|| event.getEventType() == EventType.ChildWorkflowExecutionTimedOut));
}

public static WorkflowExecution getWorkflowIdFromExternalWorkflowCompletedEvent(
HistoryEvent event) {
if (event != null) {
if (event.getEventType() == EventType.ChildWorkflowExecutionCompleted) {
return event.getChildWorkflowExecutionCompletedEventAttributes().getWorkflowExecution();
} else if (event.getEventType() == EventType.ChildWorkflowExecutionCanceled) {
return event.getChildWorkflowExecutionCanceledEventAttributes().getWorkflowExecution();
} else if (event.getEventType() == EventType.ChildWorkflowExecutionFailed) {
return event.getChildWorkflowExecutionFailedEventAttributes().getWorkflowExecution();
} else if (event.getEventType() == EventType.ChildWorkflowExecutionTerminated) {
return event.getChildWorkflowExecutionTerminatedEventAttributes().getWorkflowExecution();
} else if (event.getEventType() == EventType.ChildWorkflowExecutionTimedOut) {
return event.getChildWorkflowExecutionTimedOutEventAttributes().getWorkflowExecution();
}
}

return null;
}

public static String getId(HistoryEvent historyEvent) {
String id = null;
if (historyEvent != null) {
Expand All @@ -436,67 +395,6 @@ public static String getId(HistoryEvent historyEvent) {
return id;
}

public static String getFailureCause(HistoryEvent historyEvent) {
String failureCause = null;
if (historyEvent != null) {
if (historyEvent.getEventType() == EventType.StartChildWorkflowExecutionFailed) {
failureCause =
historyEvent
.getStartChildWorkflowExecutionFailedEventAttributes()
.getCause()
.toString();
// } else if (historyEvent.getEventType() ==
// EventType.SignalExternalWorkflowExecutionFailed) {
// failureCause =
// historyEvent.getSignalExternalWorkflowExecutionFailedEventAttributes().getCause();
} else {
failureCause = "Cannot extract failure cause from " + historyEvent.getEventType();
}
}

return failureCause;
}

/**
* Blocks until workflow instance completes. <strong>Never</strong> use in production setting as
* polling for worklow instance status is an expensive operation.
*
* @param workflowExecution result of {@link
* IWorkflowService#StartWorkflowExecution(StartWorkflowExecutionRequest)}
* @return instance close status
*/
public static WorkflowExecutionCloseStatus waitForWorkflowInstanceCompletion(
IWorkflowService service, String domain, WorkflowExecution workflowExecution)
throws EntityNotExistsError {
try {
return waitForWorkflowInstanceCompletion(
service, domain, workflowExecution, 0, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new Error("should never happen", e);
}
}

/**
* Waits up to specified timeout for workflow instance completion. <strong>Never</strong> use in
* production setting as polling for worklow instance status is an expensive operation.
*
* @param workflowExecution result of {@link
* IWorkflowService#StartWorkflowExecution(StartWorkflowExecutionRequest)}
* @param timeout maximum time to wait for completion. 0 means wait forever.
* @return instance close status
*/
public static WorkflowExecutionCloseStatus waitForWorkflowInstanceCompletion(
IWorkflowService service,
String domain,
WorkflowExecution workflowExecution,
long timeout,
TimeUnit unit)
throws TimeoutException, EntityNotExistsError {
HistoryEvent closeEvent =
getInstanceCloseEvent(service, domain, workflowExecution, timeout, unit);
return getCloseStatus(closeEvent);
}

public static WorkflowExecutionCloseStatus getCloseStatus(HistoryEvent event) {
switch (event.getEventType()) {
case WorkflowExecutionCanceled:
Expand All @@ -516,88 +414,6 @@ public static WorkflowExecutionCloseStatus getCloseStatus(HistoryEvent event) {
}
}

/**
* Like {@link #waitForWorkflowInstanceCompletion(IWorkflowService, String, WorkflowExecution,
* long, TimeUnit)} , except will wait for continued generations of the original workflow
* execution too.
*
* @see #waitForWorkflowInstanceCompletion(IWorkflowService, String, WorkflowExecution, long,
* TimeUnit)
*/
public static WorkflowExecutionCloseStatus waitForWorkflowInstanceCompletionAcrossGenerations(
IWorkflowService service,
String domain,
WorkflowExecution workflowExecution,
long timeout,
TimeUnit unit)
throws TimeoutException, EntityNotExistsError {

WorkflowExecution lastExecutionToRun = workflowExecution;
long millisecondsAtFirstWait = System.currentTimeMillis();
WorkflowExecutionCloseStatus lastExecutionToRunCloseStatus =
waitForWorkflowInstanceCompletion(service, domain, lastExecutionToRun, timeout, unit);

// keep waiting if the instance continued as new
while (lastExecutionToRunCloseStatus == WorkflowExecutionCloseStatus.CONTINUED_AS_NEW) {
// get the new execution's information
HistoryEvent closeEvent =
getInstanceCloseEvent(service, domain, lastExecutionToRun, timeout, unit);
WorkflowExecutionContinuedAsNewEventAttributes continuedAsNewAttributes =
closeEvent.getWorkflowExecutionContinuedAsNewEventAttributes();

WorkflowExecution newGenerationExecution = new WorkflowExecution();
newGenerationExecution.setRunId(continuedAsNewAttributes.getNewExecutionRunId());
newGenerationExecution.setWorkflowId(lastExecutionToRun.getWorkflowId());

// and wait for it
long currentTime = System.currentTimeMillis();
long millisecondsSinceFirstWait = currentTime - millisecondsAtFirstWait;
long timeoutInSecondsForNextWait =
unit.toMillis(timeout) - (millisecondsSinceFirstWait / 1000L);

lastExecutionToRunCloseStatus =
waitForWorkflowInstanceCompletion(
service,
domain,
newGenerationExecution,
timeoutInSecondsForNextWait,
TimeUnit.MILLISECONDS);
lastExecutionToRun = newGenerationExecution;
}

return lastExecutionToRunCloseStatus;
}

/**
* Like {@link #waitForWorkflowInstanceCompletion(IWorkflowService, String, WorkflowExecution,
* long, TimeUnit)} , but with no timeout.*
*/
public static WorkflowExecutionCloseStatus waitForWorkflowInstanceCompletionAcrossGenerations(
IWorkflowService service, String domain, WorkflowExecution workflowExecution)
throws InterruptedException, EntityNotExistsError {
try {
return waitForWorkflowInstanceCompletionAcrossGenerations(
service, domain, workflowExecution, 0L, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new Error("should never happen", e);
}
}

public static WorkflowExecutionInfo describeWorkflowInstance(
IWorkflowService service, String domain, WorkflowExecution workflowExecution) {
DescribeWorkflowExecutionRequest describeRequest = new DescribeWorkflowExecutionRequest();
describeRequest.setDomain(domain);
describeRequest.setExecution(workflowExecution);
DescribeWorkflowExecutionResponse executionDetail = null;
try {
executionDetail = service.DescribeWorkflowExecution(describeRequest);
} catch (TException e) {
throw new RuntimeException(e);
}
WorkflowExecutionInfo instanceMetadata = executionDetail.getWorkflowExecutionInfo();
return instanceMetadata;
}

public static GetWorkflowExecutionHistoryResponse getHistoryPage(
byte[] nextPageToken,
IWorkflowService service,
Expand All @@ -621,26 +437,6 @@ public static GetWorkflowExecutionHistoryResponse getHistoryPage(
return history;
}

/** Returns workflow instance history in a human readable format. */
public static String prettyPrintHistory(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be useful; I wonder why have you decided to drop this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's potentially useful but it's long unused and untested. If we need similar functionality we can rewrite it.

IWorkflowService service, String domain, WorkflowExecution workflowExecution) {
return prettyPrintHistory(service, domain, workflowExecution, true);
}
/**
* Returns workflow instance history in a human readable format.
*
* @param showWorkflowTasks when set to false workflow task events (decider events) are not
* included
*/
public static String prettyPrintHistory(
IWorkflowService service,
String domain,
WorkflowExecution workflowExecution,
boolean showWorkflowTasks) {
Iterator<HistoryEvent> events = getHistory(service, domain, workflowExecution);
return prettyPrintHistory(events, showWorkflowTasks);
}

public static Iterator<HistoryEvent> getHistory(
IWorkflowService service, String domain, WorkflowExecution workflowExecution) {
return new Iterator<HistoryEvent>() {
Expand Down Expand Up @@ -1012,38 +808,6 @@ public static boolean isDecisionEvent(HistoryEvent event) {
return result;
}

public static EventType getEventTypeForDecision(DecisionType decisionType) {
switch (decisionType) {
case ScheduleActivityTask:
return EventType.ActivityTaskScheduled;
case RequestCancelActivityTask:
return EventType.ActivityTaskCancelRequested;
case StartTimer:
return EventType.TimerStarted;
case CompleteWorkflowExecution:
return EventType.WorkflowExecutionCompleted;
case FailWorkflowExecution:
return EventType.WorkflowExecutionFailed;
case CancelTimer:
return EventType.TimerCanceled;
case CancelWorkflowExecution:
return EventType.WorkflowExecutionCanceled;
case RequestCancelExternalWorkflowExecution:
return EventType.ExternalWorkflowExecutionCancelRequested;
case RecordMarker:
return EventType.MarkerRecorded;
case ContinueAsNewWorkflowExecution:
return EventType.WorkflowExecutionContinuedAsNew;
case StartChildWorkflowExecution:
return EventType.StartChildWorkflowExecutionInitiated;
case SignalExternalWorkflowExecution:
return EventType.SignalExternalWorkflowExecutionInitiated;
case UpsertWorkflowSearchAttributes:
return EventType.UpsertWorkflowSearchAttributes;
}
throw new IllegalArgumentException("Unknown decisionType");
}

public static WorkflowExecutionHistory readHistoryFromResource(String resourceFileName)
throws IOException {
ClassLoader classLoader = WorkflowExecutionUtils.class.getClassLoader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,6 @@ public ActivityDecisionStateMachine(
this.scheduleAttributes = scheduleAttributes;
}

/** Used for unit testing */
ActivityDecisionStateMachine(
DecisionId id,
ScheduleActivityTaskDecisionAttributes scheduleAttributes,
DecisionState state) {
super(id, state);
this.scheduleAttributes = scheduleAttributes;
}

@Override
public Decision getDecision() {
switch (state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,6 @@ public ChildWorkflowDecisionStateMachine(
this.startAttributes = startAttributes;
}

/** Used for unit testing */
ChildWorkflowDecisionStateMachine(
DecisionId id,
StartChildWorkflowExecutionDecisionAttributes startAttributes,
DecisionState state) {
super(id, state);
this.startAttributes = startAttributes;
}

@Override
public Decision getDecision() {
switch (state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,6 @@ public SignalDecisionStateMachine(
this.attributes = attributes;
}

/** Used for unit testing */
SignalDecisionStateMachine(
DecisionId id,
SignalExternalWorkflowExecutionDecisionAttributes attributes,
DecisionState state) {
super(id, state);
this.attributes = attributes;
}

@Override
public Decision getDecision() {
switch (state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ public TimerDecisionStateMachine(DecisionId id, StartTimerDecisionAttributes att
this.attributes = attributes;
}

/** Used for unit testing */
TimerDecisionStateMachine(
DecisionId id, StartTimerDecisionAttributes attributes, DecisionState state) {
super(id, state);
this.attributes = attributes;
}

@Override
public Decision getDecision() {
switch (state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ final class SimulatedTimeoutExceptionInternal extends RuntimeException {
this.details = details;
}

SimulatedTimeoutExceptionInternal(TimeoutType timeoutType) {
this.timeoutType = timeoutType;
this.details = null;
}

TimeoutType getTimeoutType() {
return timeoutType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.uber.cadence.internal.sync;

import com.uber.cadence.PollForDecisionTaskResponse;
import com.uber.cadence.WorkflowExecution;
import com.uber.cadence.common.WorkflowExecutionHistory;
import com.uber.cadence.converter.DataConverter;
import com.uber.cadence.internal.common.InternalUtils;
Expand Down Expand Up @@ -224,18 +223,6 @@ public boolean isSuspended() {
return workflowWorker.isSuspended() && laWorker.isSuspended() && ldaWorker.isSuspended();
}

public <R> R queryWorkflowExecution(
WorkflowExecution execution,
String queryType,
Class<R> resultClass,
Type resultType,
Object[] args)
throws Exception {
byte[] serializedArgs = dataConverter.toData(args);
byte[] result = workflowWorker.queryWorkflowExecution(execution, queryType, serializedArgs);
return dataConverter.fromData(result, resultClass, resultType);
}

public <R> R queryWorkflowExecution(
WorkflowExecutionHistory history,
String queryType,
Expand Down
Loading