Skip to content

Sdk 4131 #2625

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ plugins {

allprojects {
repositories {
mavenLocal()
mavenCentral()
}
}
Expand All @@ -30,7 +31,7 @@ ext {
// Platforms
grpcVersion = '1.58.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
jacksonVersion = '2.14.2' // [2.9.0,)
nexusVersion = '0.4.0-alpha'
nexusVersion = '0.5.0-SNAPSHOT'
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.temporal.opentracing;

import io.temporal.common.interceptors.NexusServiceClientCallsInterceptor;
import io.temporal.common.interceptors.WorkflowClientCallsInterceptor;
import io.temporal.common.interceptors.WorkflowClientInterceptorBase;
import io.temporal.opentracing.internal.ContextAccessor;
import io.temporal.opentracing.internal.OpenTracingNexusServiceClientCallsInterceptor;
import io.temporal.opentracing.internal.OpenTracingWorkflowClientCallsInterceptor;
import io.temporal.opentracing.internal.SpanFactory;

Expand All @@ -27,4 +29,11 @@ public WorkflowClientCallsInterceptor workflowClientCallsInterceptor(
return new OpenTracingWorkflowClientCallsInterceptor(
next, options, spanFactory, contextAccessor);
}

@Override
public NexusServiceClientCallsInterceptor nexusServiceClientCallsInterceptor(
NexusServiceClientCallsInterceptor next) {
return new OpenTracingNexusServiceClientCallsInterceptor(
next, options, spanFactory, contextAccessor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ public enum SpanOperationType {
HANDLE_UPDATE("HandleUpdate"),
START_NEXUS_OPERATION("StartNexusOperation"),
RUN_START_NEXUS_OPERATION("RunStartNexusOperationHandler"),
RUN_CANCEL_NEXUS_OPERATION("RunCancelNexusOperationHandler");
RUN_CANCEL_NEXUS_OPERATION("RunCancelNexusOperationHandler"),
RUN_FETCH_NEXUS_OPERATION_INFO("RunFetchNexusOperationInfoHandler"),
RUN_FETCH_NEXUS_OPERATION_RESULT("RunFetchNexusOperationResultHandler"),
CLIENT_START_NEXUS_OPERATION("ClientStartNexusOperation"),
CLIENT_CANCEL_NEXUS_OPERATION("ClientCancelNexusOperation"),
CLIENT_FETCH_NEXUS_OPERATION_INFO("ClientFetchNexusOperationInfo"),
CLIENT_FETCH_NEXUS_OPERATION_RESULT("ClientFetchNexusOperationResult");

private final String defaultPrefix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,13 @@ protected Map<String, String> getSpanTags(SpanCreationContext context) {
StandardTagNames.RUN_ID, context.getRunId());
case RUN_START_NEXUS_OPERATION:
case RUN_CANCEL_NEXUS_OPERATION:
case RUN_FETCH_NEXUS_OPERATION_INFO:
case RUN_FETCH_NEXUS_OPERATION_RESULT:
case HANDLE_QUERY:
case CLIENT_START_NEXUS_OPERATION:
case CLIENT_CANCEL_NEXUS_OPERATION:
case CLIENT_FETCH_NEXUS_OPERATION_INFO:
case CLIENT_FETCH_NEXUS_OPERATION_RESULT:
return ImmutableMap.of();
}
throw new IllegalArgumentException("Unknown span operation type provided");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.temporal.opentracing.internal;

import io.nexusrpc.OperationException;
import io.nexusrpc.OperationStillRunningException;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.SpanContext;
Expand Down Expand Up @@ -73,4 +74,48 @@ public CancelOperationOutput cancelOperation(CancelOperationInput input) {
operationCancelSpan.finish();
}
}

@Override
public FetchOperationResultOutput fetchOperationResult(FetchOperationResultInput input)
throws OperationException, OperationStillRunningException {
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer);

Span operationFetchResultSpan =
spanFactory
.createFetchNexusOperationResultSpan(
tracer,
input.getOperationContext().getService(),
input.getOperationContext().getOperation(),
rootSpanContext)
.start();
try (Scope scope = tracer.scopeManager().activate(operationFetchResultSpan)) {
return super.fetchOperationResult(input);
} catch (Throwable t) {
spanFactory.logFail(operationFetchResultSpan, t);
throw t;
} finally {
operationFetchResultSpan.finish();
}
}

@Override
public FetchOperationInfoResponse fetchOperationInfo(FetchOperationInfoInput input) {
SpanContext rootSpanContext =
contextAccessor.readSpanContextFromHeader(input.getOperationContext().getHeaders(), tracer);

Span operationFetchInfoSpan =
spanFactory
.createFetchNexusOperationInfoSpan(
tracer,
input.getOperationContext().getService(),
input.getOperationContext().getOperation(),
rootSpanContext)
.start();
try (Scope scope = tracer.scopeManager().activate(operationFetchInfoSpan)) {
return super.fetchOperationInfo(input);
} finally {
operationFetchInfoSpan.finish();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package io.temporal.opentracing.internal;

import io.nexusrpc.OperationException;
import io.nexusrpc.OperationStillRunningException;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.temporal.common.interceptors.NexusServiceClientCallsInterceptor;
import io.temporal.common.interceptors.NexusServiceClientCallsInterceptorBase;
import io.temporal.opentracing.OpenTracingOptions;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Nexus service client interceptor that creates OpenTracing spans and propagates the active span
* context.
*/
public class OpenTracingNexusServiceClientCallsInterceptor
extends NexusServiceClientCallsInterceptorBase {
private final SpanFactory spanFactory;
private final Tracer tracer;
private final ContextAccessor contextAccessor;

public OpenTracingNexusServiceClientCallsInterceptor(
NexusServiceClientCallsInterceptor next,
OpenTracingOptions options,
SpanFactory spanFactory,
ContextAccessor contextAccessor) {
super(next);
this.spanFactory = spanFactory;
this.tracer = options.getTracer();
this.contextAccessor = contextAccessor;
}

@Override
public StartOperationOutput startOperation(StartOperationInput input) throws OperationException {
Span span =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createClientStartNexusOperationSpan(
tracer, input.getServiceName(), input.getOperationName())
.start(),
input.getOptions().getHeaders(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.startOperation(input);
} catch (Throwable t) {
spanFactory.logFail(span, t);
throw t;
} finally {
span.finish();
}
}

@Override
public CompletableFuture<StartOperationOutput> startOperationAsync(StartOperationInput input) {
Span span =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createClientStartNexusOperationSpan(
tracer, input.getServiceName(), input.getOperationName())
.start(),
input.getOptions().getHeaders(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.startOperationAsync(input)
.whenComplete(
(r, e) -> {
if (e != null) {
spanFactory.logFail(span, e);
}
span.finish();
});
}
}

@Override
public CancelOperationOutput cancelOperation(CancelOperationInput input) {
Span span =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createClientCancelNexusOperationSpan(
tracer, input.getServiceName(), input.getOperationName())
.start(),
input.getOptions().getHeaders(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.cancelOperation(input);
} catch (Throwable t) {
spanFactory.logFail(span, t);
throw t;
} finally {
span.finish();
}
}

@Override
public CompletableFuture<CancelOperationOutput> cancelOperationAsync(CancelOperationInput input) {
Span span =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createClientCancelNexusOperationSpan(
tracer, input.getServiceName(), input.getOperationName())
.start(),
input.getOptions().getHeaders(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.cancelOperationAsync(input)
.whenComplete(
(r, e) -> {
if (e != null) {
spanFactory.logFail(span, e);
}
span.finish();
});
}
}

@Override
public FetchOperationResultOutput fetchOperationResult(FetchOperationResultInput input)
throws OperationException, OperationStillRunningException {
Span span =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createClientFetchNexusOperationResultSpan(
tracer,
input.getServiceName(),
input.getOperationName(),
input.getOperationToken())
.start(),
input.getOptions().getHeaders(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.fetchOperationResult(input);
} catch (Throwable t) {
spanFactory.logFail(span, t);
throw t;
} finally {
span.finish();
}
}

@Override
public FetchOperationInfoOutput fetchOperationInfo(FetchOperationInfoInput input) {
Span span =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createClientFetchNexusOperationInfoSpan(
tracer, input.getServiceName(), input.getOperationName())
.start(),
input.getOptions().getHeaders(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.fetchOperationInfo(input);
} catch (Throwable t) {
spanFactory.logFail(span, t);
throw t;
} finally {
span.finish();
}
}

@Override
public CompleteOperationOutput completeOperation(CompleteOperationInput input) {
propagate(input.getOptions().getHeaders());
return super.completeOperation(input);
}

@Override
public CompletableFuture<FetchOperationResultOutput> fetchOperationResultAsync(
FetchOperationResultInput input) {
Span span =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createClientFetchNexusOperationResultSpan(
tracer,
input.getServiceName(),
input.getOperationName(),
input.getOperationToken())
.start(),
input.getOptions().getHeaders(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.fetchOperationResultAsync(input)
.whenComplete(
(r, e) -> {
if (e != null) {
spanFactory.logFail(span, e);
}
span.finish();
});
}
}

@Override
public CompletableFuture<FetchOperationInfoOutput> fetchOperationInfoAsync(
FetchOperationInfoInput input) {
Span span =
contextAccessor.writeSpanContextToHeader(
() ->
spanFactory
.createClientFetchNexusOperationInfoSpan(
tracer, input.getServiceName(), input.getOperationName())
.start(),
input.getOptions().getHeaders(),
tracer);
try (Scope ignored = tracer.scopeManager().activate(span)) {
return super.fetchOperationInfoAsync(input)
.whenComplete(
(r, e) -> {
if (e != null) {
spanFactory.logFail(span, e);
}
span.finish();
});
}
}

@Override
public CompletableFuture<CompleteOperationOutput> completeOperationAsync(
CompleteOperationAsyncInput input) {
propagate(input.getOptions().getHeaders());
return super.completeOperationAsync(input);
}

private void propagate(Map<String, String> headers) {
Span activeSpan = tracer.scopeManager().activeSpan();
if (activeSpan != null) {
contextAccessor.writeSpanContextToHeader(activeSpan.context(), headers, tracer);
}
}
}
Loading