Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ private CompletableResultCode exportSpannerClientMetrics(Collection<MetricData>
List<TimeSeries> spannerTimeSeries;
try {
spannerTimeSeries =
SpannerCloudMonitoringExporterUtils.convertToSpannerTimeSeries(spannerMetricData);
SpannerCloudMonitoringExporterUtils.convertToSpannerTimeSeries(
spannerMetricData, this.spannerProjectId);
} catch (Throwable e) {
logger.log(
Level.WARNING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,24 @@
import com.google.api.MetricDescriptor.MetricKind;
import com.google.api.MetricDescriptor.ValueType;
import com.google.api.MonitoredResource;
import com.google.monitoring.v3.DroppedLabels;
import com.google.monitoring.v3.Point;
import com.google.monitoring.v3.SpanContext;
import com.google.monitoring.v3.TimeInterval;
import com.google.monitoring.v3.TimeSeries;
import com.google.monitoring.v3.TypedValue;
import com.google.protobuf.Any;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.HistogramData;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
Expand All @@ -57,6 +64,7 @@
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

class SpannerCloudMonitoringExporterUtils {

Expand All @@ -69,7 +77,8 @@ static String getProjectId(Resource resource) {
return resource.getAttributes().get(PROJECT_ID_KEY);
}

static List<TimeSeries> convertToSpannerTimeSeries(List<MetricData> collection) {
static List<TimeSeries> convertToSpannerTimeSeries(
List<MetricData> collection, String projectId) {
List<TimeSeries> allTimeSeries = new ArrayList<>();

for (MetricData metricData : collection) {
Expand All @@ -94,7 +103,8 @@ static List<TimeSeries> convertToSpannerTimeSeries(List<MetricData> collection)
metricData.getData().getPoints().stream()
.map(
pointData ->
convertPointToSpannerTimeSeries(metricData, pointData, monitoredResourceBuilder))
convertPointToSpannerTimeSeries(
metricData, pointData, monitoredResourceBuilder, projectId))
.forEach(allTimeSeries::add);
}
return allTimeSeries;
Expand All @@ -103,7 +113,8 @@ static List<TimeSeries> convertToSpannerTimeSeries(List<MetricData> collection)
private static TimeSeries convertPointToSpannerTimeSeries(
MetricData metricData,
PointData pointData,
MonitoredResource.Builder monitoredResourceBuilder) {
MonitoredResource.Builder monitoredResourceBuilder,
String projectId) {
TimeSeries.Builder builder =
TimeSeries.newBuilder()
.setMetricKind(convertMetricKind(metricData))
Expand Down Expand Up @@ -135,7 +146,7 @@ private static TimeSeries convertPointToSpannerTimeSeries(
.setEndTime(Timestamps.fromNanos(pointData.getEpochNanos()))
.build();

builder.addPoints(createPoint(metricData.getType(), pointData, timeInterval));
builder.addPoints(createPoint(metricData.getType(), pointData, timeInterval, projectId));

return builder.build();
}
Expand Down Expand Up @@ -191,15 +202,16 @@ private static ValueType convertValueType(MetricDataType metricDataType) {
}

private static Point createPoint(
MetricDataType type, PointData pointData, TimeInterval timeInterval) {
MetricDataType type, PointData pointData, TimeInterval timeInterval, String projectId) {
Point.Builder builder = Point.newBuilder().setInterval(timeInterval);
switch (type) {
case HISTOGRAM:
case EXPONENTIAL_HISTOGRAM:
return builder
.setValue(
TypedValue.newBuilder()
.setDistributionValue(convertHistogramData((HistogramPointData) pointData))
.setDistributionValue(
convertHistogramData((HistogramPointData) pointData, projectId))
.build())
.build();
case DOUBLE_GAUGE:
Expand All @@ -221,14 +233,73 @@ private static Point createPoint(
}
}

private static Distribution convertHistogramData(HistogramPointData pointData) {
private static Distribution convertHistogramData(HistogramPointData pointData, String projectId) {
return Distribution.newBuilder()
.setCount(pointData.getCount())
.setMean(pointData.getCount() == 0L ? 0.0D : pointData.getSum() / pointData.getCount())
.setBucketOptions(
BucketOptions.newBuilder()
.setExplicitBuckets(Explicit.newBuilder().addAllBounds(pointData.getBoundaries())))
.addAllBucketCounts(pointData.getCounts())
.addAllExemplars(
pointData.getExemplars().stream()
.map(e -> mapExemplar(e, projectId))
.collect(Collectors.toList()))
.build();
}

private static Distribution.Exemplar mapExemplar(ExemplarData exemplar, String projectId) {
double value = 0;
if (exemplar instanceof DoubleExemplarData) {
value = ((DoubleExemplarData) exemplar).getValue();
} else if (exemplar instanceof LongExemplarData) {
value = ((LongExemplarData) exemplar).getValue();
}

Distribution.Exemplar.Builder exemplarBuilder =
Distribution.Exemplar.newBuilder()
.setValue(value)
.setTimestamp(mapTimestamp(exemplar.getEpochNanos()));
if (exemplar.getSpanContext().isValid()) {
exemplarBuilder.addAttachments(
Any.pack(
SpanContext.newBuilder()
.setSpanName(
makeSpanName(
projectId,
exemplar.getSpanContext().getTraceId(),
exemplar.getSpanContext().getSpanId()))
.build()));
}
if (!exemplar.getFilteredAttributes().isEmpty()) {
exemplarBuilder.addAttachments(
Any.pack(mapFilteredAttributes(exemplar.getFilteredAttributes())));
}
return exemplarBuilder.build();
}

static final long NANO_PER_SECOND = (long) 1e9;

private static Timestamp mapTimestamp(long epochNanos) {
return Timestamp.newBuilder()
.setSeconds(epochNanos / NANO_PER_SECOND)
.setNanos((int) (epochNanos % NANO_PER_SECOND))
.build();
}

private static String makeSpanName(String projectId, String traceId, String spanId) {
return String.format("projects/%s/traces/%s/spans/%s", projectId, traceId, spanId);
}

private static DroppedLabels mapFilteredAttributes(Attributes attributes) {
DroppedLabels.Builder labels = DroppedLabels.newBuilder();
attributes.forEach((k, v) -> labels.putLabel(cleanAttributeKey(k.getKey()), v.toString()));
return labels.build();
}

private static String cleanAttributeKey(String key) {
// . is commonly used in OTel but disallowed in GCM label names,
// https://cloud.google.com/monitoring/api/ref_v3/rest/v3/LabelDescriptor#:~:text=Matches%20the%20following%20regular%20expression%3A
return key.replace('.', '_');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,21 @@
import com.google.cloud.monitoring.v3.stub.MetricServiceStub;
import com.google.common.collect.ImmutableList;
import com.google.monitoring.v3.CreateTimeSeriesRequest;
import com.google.monitoring.v3.DroppedLabels;
import com.google.monitoring.v3.TimeSeries;
import com.google.protobuf.Empty;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.HistogramPointData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoubleExemplarData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
Expand Down Expand Up @@ -347,6 +353,86 @@ public void testExportingSumDataInBatches() {
}
}

@Test
public void testExportingHistogramDataWithExemplars() {
ArgumentCaptor<CreateTimeSeriesRequest> argumentCaptor =
ArgumentCaptor.forClass(CreateTimeSeriesRequest.class);

UnaryCallable<CreateTimeSeriesRequest, Empty> mockCallable = mock(UnaryCallable.class);
when(mockMetricServiceStub.createServiceTimeSeriesCallable()).thenReturn(mockCallable);
ApiFuture<Empty> future = ApiFutures.immediateFuture(Empty.getDefaultInstance());
when(mockCallable.futureCall(argumentCaptor.capture())).thenReturn(future);

long startEpoch = 10 * 1_000_000_000L;
long endEpoch = 15 * 1_000_000_000L;
long recordTimeEpoch = 12_123_456_789L;

DoubleExemplarData exemplar =
ImmutableDoubleExemplarData.create(
Attributes.builder().put("request_id", "test").build(),
recordTimeEpoch,
SpanContext.create(
"0123456789abcdef0123456789abcdef",
"0123456789abcdef",
TraceFlags.getSampled(),
TraceState.getDefault()),
1.5);

HistogramPointData histogramPointData =
ImmutableHistogramPointData.create(
startEpoch,
endEpoch,
attributes,
3d,
true,
1d,
true,
2d,
Collections.singletonList(1.0),
Arrays.asList(1L, 2L),
Collections.singletonList(exemplar) // ← add exemplar
);

MetricData histogramData =
ImmutableMetricData.createDoubleHistogram(
resource,
scope,
"spanner.googleapis.com/internal/client/" + OPERATION_LATENCIES_NAME,
"description",
"ms",
ImmutableHistogramData.create(
AggregationTemporality.CUMULATIVE, ImmutableList.of(histogramPointData)));

exporter.export(Collections.singletonList(histogramData));
assertFalse(exporter.lastExportSkippedData());

CreateTimeSeriesRequest request = argumentCaptor.getValue();
TimeSeries timeSeries = request.getTimeSeriesList().get(0);
Distribution distribution = timeSeries.getPoints(0).getValue().getDistributionValue();

// Assert exemplar exists and has expected value
assertThat(distribution.getExemplarsCount()).isEqualTo(1);
Distribution.Exemplar exportedExemplar = distribution.getExemplars(0);
assertThat(exportedExemplar.getValue()).isEqualTo(1.5);

// Assert timestamp mapping
assertThat(exportedExemplar.getTimestamp().getSeconds())
.isEqualTo(recordTimeEpoch / 1_000_000_000L);
assertThat(exportedExemplar.getTimestamp().getNanos())
.isEqualTo((int) (recordTimeEpoch % 1_000_000_000L));

// Assert attachments: SpanContext
boolean hasSpanAttachment =
exportedExemplar.getAttachmentsList().stream()
.anyMatch(any -> any.is(com.google.monitoring.v3.SpanContext.class));
assertThat(hasSpanAttachment).isTrue();

// Assert attachments: DroppedLabels (filtered attributes)
boolean hasFilteredAttrs =
exportedExemplar.getAttachmentsList().stream().anyMatch(any -> any.is(DroppedLabels.class));
assertThat(hasFilteredAttrs).isTrue();
}

@Test
public void getAggregationTemporality() throws IOException {
SpannerCloudMonitoringExporter actualExporter =
Expand Down