diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java index 2f8a27429d..21fc62f73b 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java @@ -3,6 +3,7 @@ public enum SpanOperationType { START_WORKFLOW("StartWorkflow"), SIGNAL_WITH_START_WORKFLOW("SignalWithStartWorkflow"), + UPDATE_WITH_START_WORKFLOW("UpdateWithStartWorkflow"), RUN_WORKFLOW("RunWorkflow"), START_CHILD_WORKFLOW("StartChildWorkflow"), START_CONTINUE_AS_NEW_WORKFLOW("StartContinueAsNewWorkflow"), diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java index 1734f3a36d..8e089bf09a 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java @@ -54,6 +54,7 @@ protected Map getSpanTags(SpanCreationContext context) { SpanOperationType operationType = context.getSpanOperationType(); switch (operationType) { case START_WORKFLOW: + case UPDATE_WITH_START_WORKFLOW: case SIGNAL_WITH_START_WORKFLOW: return ImmutableMap.of(StandardTagNames.WORKFLOW_ID, context.getWorkflowId()); case START_CHILD_WORKFLOW: diff --git a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java index 79a7c1c21a..c5e36db480 100644 --- a/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java +++ b/temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java @@ -78,6 +78,24 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu } } + @Override + public WorkflowUpdateWithStartOutput updateWithStart(WorkflowUpdateWithStartInput input) { + WorkflowStartInput workflowStartInput = input.getWorkflowStartInput(); + Span workflowStartSpan = + contextAccessor.writeSpanContextToHeader( + () -> + createWorkflowStartSpanBuilder( + workflowStartInput, SpanOperationType.UPDATE_WITH_START_WORKFLOW) + .start(), + workflowStartInput.getHeader(), + tracer); + try (Scope ignored = tracer.scopeManager().activate(workflowStartSpan)) { + return super.updateWithStart(input); + } finally { + workflowStartSpan.finish(); + } + } + @Override public QueryOutput query(QueryInput input) { Span workflowQuerySpan = diff --git a/temporal-opentracing/src/test/java/io/temporal/opentracing/UpdateWithStartTest.java b/temporal-opentracing/src/test/java/io/temporal/opentracing/UpdateWithStartTest.java new file mode 100644 index 0000000000..4553ece6af --- /dev/null +++ b/temporal-opentracing/src/test/java/io/temporal/opentracing/UpdateWithStartTest.java @@ -0,0 +1,128 @@ +package io.temporal.opentracing; + +import static org.junit.Assert.assertEquals; + +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.util.ThreadLocalScopeManager; +import io.temporal.api.enums.v1.WorkflowIdConflictPolicy; +import io.temporal.client.*; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.WorkerFactoryOptions; +import io.temporal.workflow.UpdateMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.util.List; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; + +public class UpdateWithStartTest { + + private static final MockTracer mockTracer = + new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP); + + private final OpenTracingOptions OT_OPTIONS = + OpenTracingOptions.newBuilder().setTracer(mockTracer).build(); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setInterceptors(new OpenTracingClientInterceptor(OT_OPTIONS)) + .validateAndBuildWithDefaults()) + .setWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setWorkerInterceptors(new OpenTracingWorkerInterceptor(OT_OPTIONS)) + .validateAndBuildWithDefaults()) + .setWorkflowTypes(WorkflowImpl.class) + .build(); + + @After + public void tearDown() { + mockTracer.reset(); + } + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + String run(String input); + + @UpdateMethod + void update(String update); + } + + public static class WorkflowImpl implements TestWorkflow { + + private String update; + + @Override + public String run(String input) { + return update; + } + + @Override + public void update(String update) { + this.update = update; + } + } + + @Test + public void updateWithStart() { + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + TestWorkflow workflow = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowIdConflictPolicy( + WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING) + .validateBuildWithDefaults()); + + Span span = mockTracer.buildSpan("ClientFunction").start(); + + try (Scope scope = mockTracer.scopeManager().activate(span)) { + WorkflowClient.executeUpdateWithStart( + workflow::update, + "input", + UpdateOptions.newBuilder().build(), + new WithStartWorkflowOperation<>(workflow::run, "updateInput")); + } finally { + span.finish(); + } + + // wait for the workflow completion + WorkflowStub.fromTyped(workflow).getResult(String.class); + + OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans()); + + MockSpan clientSpan = spansHelper.getSpanByOperationName("ClientFunction"); + + MockSpan workflowStartSpan = spansHelper.getByParentSpan(clientSpan).get(0); + assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId()); + assertEquals("UpdateWithStartWorkflow:TestWorkflow", workflowStartSpan.operationName()); + + if (SDKTestWorkflowRule.useExternalService) { + List workflowSpans = spansHelper.getByParentSpan(workflowStartSpan); + assertEquals(2, workflowSpans.size()); + + MockSpan workflowUpdateSpan = workflowSpans.get(0); + assertEquals(workflowStartSpan.context().spanId(), workflowUpdateSpan.parentId()); + assertEquals("HandleUpdate:update", workflowUpdateSpan.operationName()); + + MockSpan workflowRunSpan = workflowSpans.get(1); + assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); + } else { + List workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan); + assertEquals(1, workflowRunSpans.size()); + + MockSpan workflowRunSpan = workflowRunSpans.get(0); + assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId()); + assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName()); + } + } +}