Skip to content
Open
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package com.google.cloud.pubsub.v1;

import io.opentelemetry.api.common.AttributesBuilder;

public class AckHandler implements RpcOperationHandler{
@Override
public String getCodeFunction() {
return "sendAckOperations";
}

@Override
public String getOperationName() {
return "ack";
}

@Override
public void addAttributes(AttributesBuilder builder, int ackDeadline, boolean isReceiptModack) {
// No extra attributes for "ack"
}

@Override
public void addEvent(PubsubMessageWrapper message) {
message.addAckStartEvent();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package com.google.cloud.pubsub.v1;

import io.opentelemetry.api.common.AttributesBuilder;

public class ModAckHandler implements RpcOperationHandler{

private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";

@Override
public String getCodeFunction() {
return "sendModAckOperations";
}

@Override
public String getOperationName() {
return "modack";
}

@Override
public void addAttributes(AttributesBuilder builder, int ackDeadline, boolean isReceiptModack) {
builder.put("messaging.gcp_pubsub.message.ack_deadline", ackDeadline)
.put("messaging.gcp_pubsub.is_receipt_modack", isReceiptModack);
}

@Override
public void addEvent(PubsubMessageWrapper message) {
message.addModAckStartEvent();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package com.google.cloud.pubsub.v1;

import io.opentelemetry.api.common.AttributesBuilder;

public class NackHandler implements RpcOperationHandler{
@Override
public String getCodeFunction() {
return "sendModAckOperations";
}

@Override
public String getOperationName() {
return "nack";
}

@Override
public void addAttributes(AttributesBuilder builder, int ackDeadline, boolean isReceiptModack) {
// No extra attributes for "nack"
}

@Override
public void addEvent(PubsubMessageWrapper message) {
message.addNackStartEvent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,25 @@
*/
public class OpenCensusUtil {
private static final Logger logger = Logger.getLogger(OpenCensusUtil.class.getName());

private static final OpenCensusUtil INSTANCE = new OpenCensusUtil();
public static final String TAG_CONTEXT_KEY = "googclient_OpenCensusTagContextKey";
public static final String TRACE_CONTEXT_KEY = "googclient_OpenCensusTraceContextKey";
@VisibleForTesting static final String MESSAGE_RECEIVER_SPAN_NAME = "OpenCensusMessageReceiver";
private static final String TRACEPARENT_KEY = "traceparent";

private static final Tagger tagger = Tags.getTagger();
private static final TagContextBinarySerializer serializer =
Tags.getTagPropagationComponent().getBinarySerializer();

private static final Tracer tracer = Tracing.getTracer();
private static final TextFormat traceContextTextFormat =
Tracing.getPropagationComponent().getTraceContextFormat();
private final Tagger tagger;
private final TagContextBinarySerializer serializer ;

private final Tracer tracer;
private final TextFormat traceContextTextFormat ;

public OpenCensusUtil() {
this.tagger = Tags.getTagger();
this.serializer = Tags.getTagPropagationComponent().getBinarySerializer();
this.tracer = Tracing.getTracer();
this.traceContextTextFormat = Tracing.getPropagationComponent().getTraceContextFormat();
}

/**
* Propagates active OpenCensus trace and tag contexts from the Publisher by adding them as
Expand All @@ -66,8 +72,8 @@ public class OpenCensusUtil {
@Override
public PubsubMessage apply(PubsubMessage message) {
PubsubMessage.Builder builder = PubsubMessage.newBuilder(message);
String encodedSpanContext = encodeSpanContext(tracer.getCurrentSpan().getContext());
String encodedTagContext = encodeTagContext(tagger.getCurrentTagContext());
String encodedSpanContext = encodeSpanContext(INSTANCE.tracer.getCurrentSpan().getContext());
String encodedTagContext = encodeTagContext(INSTANCE.tagger.getCurrentTagContext());
if (encodedSpanContext.isEmpty() && encodedTagContext.isEmpty()) {
return message;
}
Expand Down Expand Up @@ -102,7 +108,7 @@ public String get(String carrier, String key) {
@VisibleForTesting
static String encodeSpanContext(SpanContext ctxt) {
StringBuilder builder = new StringBuilder();
traceContextTextFormat.inject(ctxt, builder, setter);
INSTANCE.traceContextTextFormat.inject(ctxt, builder, setter);
return builder.toString();
}

Expand All @@ -115,14 +121,14 @@ private static String encodeTagContext(TagContext tags) {
// TODO: update this code once the text encoding of tags has been resolved
// (https://github.com/census-instrumentation/opencensus-specs/issues/65).
private static Scope createScopedTagContext(String encodedTags) {
return tagger.withTagContext(tagger.getCurrentTagContext());
return INSTANCE.tagger.withTagContext(INSTANCE.tagger.getCurrentTagContext());
}

@VisibleForTesting
@MustBeClosed
static Scope createScopedSpan(String name) {
return tracer
.spanBuilderWithExplicitParent(name, tracer.getCurrentSpan())
return INSTANCE.tracer
.spanBuilderWithExplicitParent(name, INSTANCE.tracer.getCurrentSpan())
.setRecordEvents(true)
// Note: we preserve the sampling decision from the publisher.
.setSampler(Samplers.alwaysSample())
Expand All @@ -131,8 +137,8 @@ static Scope createScopedSpan(String name) {

private static void addParentLink(String encodedParentSpanContext) {
try {
SpanContext ctxt = traceContextTextFormat.extract(encodedParentSpanContext, getter);
tracer.getCurrentSpan().addLink(Link.fromSpanContext(ctxt, Link.Type.PARENT_LINKED_SPAN));
SpanContext ctxt = INSTANCE.traceContextTextFormat.extract(encodedParentSpanContext, getter);
INSTANCE.tracer.getCurrentSpan().addLink(Link.fromSpanContext(ctxt, Link.Type.PARENT_LINKED_SPAN));
} catch (SpanContextParseException exn) {
logger.log(Level.INFO, "OpenCensus: Trace Context Deserialization Exception: " + exn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public class OpenTelemetryPubsubTracer {
"messaging.gcp_pubsub.message.exactly_once_delivery";
private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY =
"messaging.gcp_pubsub.message.delivery_attempt";
private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";
private static final String PROJECT_ATTR_KEY = "gcp.project_id";
private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish";

Expand Down Expand Up @@ -366,20 +364,21 @@ Span startSubscribeRpcSpan(
if (!enabled) {
return null;
}
String codeFunction = rpcOperation == "ack" ? "sendAckOperations" : "sendModAckOperations";

// Get the appropriate handler for the operation
RpcOperationHandler handler = getHandler(rpcOperation);

AttributesBuilder attributesBuilder =
createCommonSpanAttributesBuilder(
subscriptionName.getSubscription(),
subscriptionName.getProject(),
codeFunction,
handler.getCodeFunction(),
rpcOperation)
.put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size());

// Ack deadline and receipt modack are specific to the modack operation
if (rpcOperation == "modack") {
attributesBuilder
.put(ACK_DEADLINE_ATTR_KEY, ackDeadline)
.put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack);
handler.addAttributes(attributesBuilder, ackDeadline, isReceiptModack);
}

SpanBuilder rpcSpanBuilder =
Expand All @@ -399,22 +398,25 @@ Span startSubscribeRpcSpan(
for (PubsubMessageWrapper message : messages) {
if (rpcSpan.getSpanContext().isSampled()) {
message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes);
switch (rpcOperation) {
case "ack":
message.addAckStartEvent();
break;
case "modack":
message.addModAckStartEvent();
break;
case "nack":
message.addNackStartEvent();
break;
}
handler.addEvent(message);
}
}
return rpcSpan;
}

private RpcOperationHandler getHandler(String rpcOperation) {
switch (rpcOperation) {
case "ack":
return new AckHandler();
case "modack":
return new ModAckHandler();
case "nack":
return new NackHandler();
default:
throw new IllegalArgumentException("Unknown RPC operation: " + rpcOperation);
}
}

/** Ends the given subscribe RPC span if it exists. */
void endSubscribeRpcSpan(Span rpcSpan) {
if (!enabled) {
Expand Down
Loading