3939import io .opentelemetry .sdk .metrics .InstrumentType ;
4040import io .opentelemetry .sdk .metrics .data .AggregationTemporality ;
4141import io .opentelemetry .sdk .metrics .data .MetricData ;
42+ import io .opentelemetry .sdk .metrics .data .PointData ;
4243import io .opentelemetry .sdk .metrics .export .MetricExporter ;
4344import java .io .IOException ;
4445import java .time .Duration ;
4950import java .util .logging .Level ;
5051import java .util .logging .Logger ;
5152import java .util .stream .Collectors ;
53+ import javax .annotation .Nonnull ;
5254import javax .annotation .Nullable ;
5355
5456/**
@@ -66,7 +68,7 @@ class SpannerCloudMonitoringExporter implements MetricExporter {
6668 // https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
6769 private static final int EXPORT_BATCH_SIZE_LIMIT = 200 ;
6870 private final AtomicBoolean spannerExportFailureLogged = new AtomicBoolean (false );
69- private CompletableResultCode lastExportCode ;
71+ private final AtomicBoolean lastExportSkippedData = new AtomicBoolean ( false ) ;
7072 private final MetricServiceClient client ;
7173 private final String spannerProjectId ;
7274
@@ -101,44 +103,49 @@ static SpannerCloudMonitoringExporter create(
101103 }
102104
103105 @ Override
104- public CompletableResultCode export (Collection <MetricData > collection ) {
106+ public CompletableResultCode export (@ Nonnull Collection <MetricData > collection ) {
105107 if (client .isShutdown ()) {
106108 logger .log (Level .WARNING , "Exporter is shut down" );
107109 return CompletableResultCode .ofFailure ();
108110 }
109111
110- this .lastExportCode = exportSpannerClientMetrics (collection );
111- return lastExportCode ;
112+ return exportSpannerClientMetrics (collection );
112113 }
113114
114115 /** Export client built in metrics */
115116 private CompletableResultCode exportSpannerClientMetrics (Collection <MetricData > collection ) {
116- // Filter spanner metrics
117+ // Filter spanner metrics. Only include metrics that contain a project and instance ID.
117118 List <MetricData > spannerMetricData =
118119 collection .stream ()
119120 .filter (md -> SPANNER_METRICS .contains (md .getName ()))
120121 .collect (Collectors .toList ());
121122
122- // Skips exporting if there's none
123- if (spannerMetricData .isEmpty ()) {
124- return CompletableResultCode .ofSuccess ();
125- }
126-
127- // Verifies metrics project id is the same as the spanner project id set on this client
128- if (!spannerMetricData .stream ()
123+ // Log warnings for metrics that will be skipped.
124+ boolean mustFilter = false ;
125+ if (spannerMetricData .stream ()
129126 .flatMap (metricData -> metricData .getData ().getPoints ().stream ())
130- .allMatch (
131- pd -> spannerProjectId . equals ( SpannerCloudMonitoringExporterUtils . getProjectId ( pd )))) {
132- logger . log ( Level .WARNING , "Metric data has a different projectId. Skipping export ." );
133- return CompletableResultCode . ofFailure () ;
127+ .anyMatch ( this :: shouldSkipPointDataDueToProjectId )) {
128+ logger . log (
129+ Level .WARNING , "Some metric data contain a different projectId. These will be skipped ." );
130+ mustFilter = true ;
134131 }
135-
136- // Verifies if metrics data has missing instance id.
137132 if (spannerMetricData .stream ()
138133 .flatMap (metricData -> metricData .getData ().getPoints ().stream ())
139- .anyMatch (pd -> SpannerCloudMonitoringExporterUtils .getInstanceId (pd ) == null )) {
140- logger .log (Level .WARNING , "Metric data has missing instanceId. Skipping export." );
141- return CompletableResultCode .ofFailure ();
134+ .anyMatch (this ::shouldSkipPointDataDueToMissingInstanceId )) {
135+ logger .log (Level .WARNING , "Some metric data miss instanceId. These will be skipped." );
136+ mustFilter = true ;
137+ }
138+ if (mustFilter ) {
139+ spannerMetricData =
140+ spannerMetricData .stream ()
141+ .filter (this ::shouldSkipMetricData )
142+ .collect (Collectors .toList ());
143+ }
144+ lastExportSkippedData .set (mustFilter );
145+
146+ // Skips exporting if there's none
147+ if (spannerMetricData .isEmpty ()) {
148+ return CompletableResultCode .ofSuccess ();
142149 }
143150
144151 List <TimeSeries > spannerTimeSeries ;
@@ -190,6 +197,26 @@ public void onSuccess(List<Empty> empty) {
190197 return spannerExportCode ;
191198 }
192199
200+ private boolean shouldSkipMetricData (MetricData metricData ) {
201+ return metricData .getData ().getPoints ().stream ()
202+ .anyMatch (
203+ pd ->
204+ shouldSkipPointDataDueToProjectId (pd )
205+ || shouldSkipPointDataDueToMissingInstanceId (pd ));
206+ }
207+
208+ private boolean shouldSkipPointDataDueToProjectId (PointData pointData ) {
209+ return !spannerProjectId .equals (SpannerCloudMonitoringExporterUtils .getProjectId (pointData ));
210+ }
211+
212+ private boolean shouldSkipPointDataDueToMissingInstanceId (PointData pointData ) {
213+ return SpannerCloudMonitoringExporterUtils .getInstanceId (pointData ) == null ;
214+ }
215+
216+ boolean lastExportSkippedData () {
217+ return this .lastExportSkippedData .get ();
218+ }
219+
193220 private ApiFuture <List <Empty >> exportTimeSeriesInBatch (
194221 ProjectName projectName , List <TimeSeries > timeSeries ) {
195222 List <ApiFuture <Empty >> batchResults = new ArrayList <>();
@@ -233,7 +260,7 @@ public CompletableResultCode shutdown() {
233260 * metric over time.
234261 */
235262 @ Override
236- public AggregationTemporality getAggregationTemporality (InstrumentType instrumentType ) {
263+ public AggregationTemporality getAggregationTemporality (@ Nonnull InstrumentType instrumentType ) {
237264 return AggregationTemporality .CUMULATIVE ;
238265 }
239266}
0 commit comments