4040import com .google .cloud .monitoring .v3 .MetricServiceClient ;
4141import com .google .cloud .monitoring .v3 .MetricServiceSettings ;
4242import com .google .common .annotations .VisibleForTesting ;
43+ import com .google .common .base .Preconditions ;
4344import com .google .common .base .Supplier ;
44- import com .google .common .base .Suppliers ;
4545import com .google .common .collect .ImmutableList ;
46+ import com .google .common .collect .ImmutableMap ;
4647import com .google .common .collect .ImmutableSet ;
4748import com .google .common .collect .Iterables ;
4849import com .google .common .util .concurrent .MoreExecutors ;
5657import io .opentelemetry .sdk .metrics .data .MetricData ;
5758import io .opentelemetry .sdk .metrics .export .MetricExporter ;
5859import java .io .IOException ;
60+ import java .time .Duration ;
5961import java .util .ArrayList ;
6062import java .util .Arrays ;
6163import java .util .Collection ;
@@ -94,43 +96,25 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {
9496 // https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
9597 private static final int EXPORT_BATCH_SIZE_LIMIT = 200 ;
9698
97- private final MetricServiceClient client ;
99+ private final String exporterName ;
98100
99- private final String taskId ;
101+ private final MetricServiceClient client ;
100102
101- // Application resource is initialized on the first export, which runs on a background thread
102- // to avoid slowness when starting the client.
103- private final Supplier <MonitoredResource > applicationResource ;
103+ private final TimeSeriesConverter timeSeriesConverter ;
104104
105105 private final AtomicBoolean isShutdown = new AtomicBoolean (false );
106106
107107 private CompletableResultCode lastExportCode ;
108108
109- private final AtomicBoolean bigtableExportFailureLogged = new AtomicBoolean (false );
110- private final AtomicBoolean applicationExportFailureLogged = new AtomicBoolean (false );
111-
112- private static final ImmutableList <String > BIGTABLE_TABLE_METRICS =
113- ImmutableSet .of (
114- OPERATION_LATENCIES_NAME ,
115- ATTEMPT_LATENCIES_NAME ,
116- SERVER_LATENCIES_NAME ,
117- FIRST_RESPONSE_LATENCIES_NAME ,
118- CLIENT_BLOCKING_LATENCIES_NAME ,
119- APPLICATION_BLOCKING_LATENCIES_NAME ,
120- RETRY_COUNT_NAME ,
121- CONNECTIVITY_ERROR_COUNT_NAME ,
122- REMAINING_DEADLINE_NAME )
123- .stream ()
124- .map (m -> METER_NAME + m )
125- .collect (ImmutableList .toImmutableList ());
126-
127- private static final ImmutableList <String > APPLICATION_METRICS =
128- ImmutableSet .of (PER_CONNECTION_ERROR_COUNT_NAME ).stream ()
129- .map (m -> METER_NAME + m )
130- .collect (ImmutableList .toImmutableList ());
131-
132- public static BigtableCloudMonitoringExporter create (
133- @ Nullable Credentials credentials , @ Nullable String endpoint ) throws IOException {
109+ private final AtomicBoolean exportFailureLogged = new AtomicBoolean (false );
110+
111+ static BigtableCloudMonitoringExporter create (
112+ String exporterName ,
113+ @ Nullable Credentials credentials ,
114+ @ Nullable String endpoint ,
115+ TimeSeriesConverter converter )
116+ throws IOException {
117+
134118 MetricServiceSettings .Builder settingsBuilder = MetricServiceSettings .newBuilder ();
135119 CredentialsProvider credentialsProvider =
136120 Optional .ofNullable (credentials )
@@ -146,79 +130,64 @@ public static BigtableCloudMonitoringExporter create(
146130 settingsBuilder .setEndpoint (endpoint );
147131 }
148132
149- java . time . Duration timeout = java . time . Duration .ofMinutes (1 );
133+ Duration timeout = Duration .ofMinutes (1 );
150134 // TODO: createServiceTimeSeries needs special handling if the request failed. Leaving
151135 // it as not retried for now.
152136 settingsBuilder .createServiceTimeSeriesSettings ().setSimpleTimeoutNoRetriesDuration (timeout );
153137
154138 return new BigtableCloudMonitoringExporter (
155- MetricServiceClient .create (settingsBuilder .build ()),
156- Suppliers .memoize (BigtableExporterUtils ::detectResourceSafe ),
157- BigtableExporterUtils .getDefaultTaskValue ());
139+ exporterName , MetricServiceClient .create (settingsBuilder .build ()), converter );
158140 }
159141
160142 @ VisibleForTesting
161143 BigtableCloudMonitoringExporter (
162- MetricServiceClient client , Supplier <MonitoredResource > applicationResource , String taskId ) {
144+ String exporterName , MetricServiceClient client , TimeSeriesConverter converter ) {
145+ this .exporterName = exporterName ;
163146 this .client = client ;
164- this .taskId = taskId ;
165- this .applicationResource = applicationResource ;
147+ this .timeSeriesConverter = converter ;
166148 }
167149
168150 @ Override
169- public CompletableResultCode export (Collection <MetricData > collection ) {
170- if (isShutdown .get ()) {
171- logger .log (Level .WARNING , "Exporter is shutting down" );
172- return CompletableResultCode .ofFailure ();
173- }
174-
175- CompletableResultCode bigtableExportCode = exportBigtableResourceMetrics (collection );
176- CompletableResultCode applicationExportCode = exportApplicationResourceMetrics (collection );
177-
178- lastExportCode =
179- CompletableResultCode .ofAll (ImmutableList .of (applicationExportCode , bigtableExportCode ));
151+ public CompletableResultCode export (Collection <MetricData > metricData ) {
152+ Preconditions .checkState (!isShutdown .get (), "Exporter is shutting down" );
180153
154+ lastExportCode = doExport (metricData );
181155 return lastExportCode ;
182156 }
183157
184158 /** Export metrics associated with a BigtableTable resource. */
185- private CompletableResultCode exportBigtableResourceMetrics (Collection <MetricData > collection ) {
186- // Filter bigtable table metrics
187- List <MetricData > bigtableMetricData =
188- collection .stream ()
189- .filter (md -> BIGTABLE_TABLE_METRICS .contains (md .getName ()))
190- .collect (Collectors .toList ());
159+ private CompletableResultCode doExport (Collection <MetricData > metricData ) {
160+ Map <ProjectName , List <TimeSeries >> bigtableTimeSeries ;
191161
192- // Skips exporting if there's none
193- if (bigtableMetricData .isEmpty ()) {
194- return CompletableResultCode .ofSuccess ();
195- }
196-
197- // List of timeseries by project id
198- Map <String , List <TimeSeries >> bigtableTimeSeries ;
199162 try {
200- bigtableTimeSeries =
201- BigtableExporterUtils .convertToBigtableTimeSeries (bigtableMetricData , taskId );
202- } catch (Throwable e ) {
163+ bigtableTimeSeries = timeSeriesConverter .convert (metricData );
164+ } catch (Throwable t ) {
203165 logger .log (
204166 Level .WARNING ,
205- "Failed to convert bigtable table metric data to cloud monitoring timeseries." ,
206- e );
167+ String .format (
168+ "Failed to convert %s metric data to cloud monitoring timeseries." , exporterName ),
169+ t );
207170 return CompletableResultCode .ofFailure ();
208171 }
209172
210- CompletableResultCode bigtableExportCode = new CompletableResultCode ();
173+ // Skips exporting if there's none
174+ if (bigtableTimeSeries .isEmpty ()) {
175+ return CompletableResultCode .ofSuccess ();
176+ }
177+
178+ CompletableResultCode exportCode = new CompletableResultCode ();
211179 bigtableTimeSeries .forEach (
212- (projectId , ts ) -> {
213- ProjectName projectName = ProjectName .of (projectId );
180+ (projectName , ts ) -> {
214181 ApiFuture <List <Empty >> future = exportTimeSeries (projectName , ts );
215182 ApiFutures .addCallback (
216183 future ,
217184 new ApiFutureCallback <List <Empty >>() {
218185 @ Override
219186 public void onFailure (Throwable throwable ) {
220- if (bigtableExportFailureLogged .compareAndSet (false , true )) {
221- String msg = "createServiceTimeSeries request failed for bigtable metrics." ;
187+ if (exportFailureLogged .compareAndSet (false , true )) {
188+ String msg =
189+ String .format (
190+ "createServiceTimeSeries request failed for %s." , exporterName );
222191 if (throwable instanceof PermissionDeniedException ) {
223192 msg +=
224193 String .format (
@@ -227,100 +196,20 @@ public void onFailure(Throwable throwable) {
227196 }
228197 logger .log (Level .WARNING , msg , throwable );
229198 }
230- bigtableExportCode .fail ();
199+ exportCode .fail ();
231200 }
232201
233202 @ Override
234203 public void onSuccess (List <Empty > emptyList ) {
235204 // When an export succeeded reset the export failure flag to false so if there's a
236205 // transient failure it'll be logged.
237- bigtableExportFailureLogged .set (false );
238- bigtableExportCode .succeed ();
206+ exportFailureLogged .set (false );
207+ exportCode .succeed ();
239208 }
240209 },
241210 MoreExecutors .directExecutor ());
242211 });
243212
244- return bigtableExportCode ;
245- }
246-
247- /** Export metrics associated with the resource the Application is running on. */
248- private CompletableResultCode exportApplicationResourceMetrics (
249- Collection <MetricData > collection ) {
250- if (applicationResource .get () == null ) {
251- return CompletableResultCode .ofSuccess ();
252- }
253-
254- // Filter application level metrics
255- List <MetricData > metricData =
256- collection .stream ()
257- .filter (md -> APPLICATION_METRICS .contains (md .getName ()))
258- .collect (Collectors .toList ());
259-
260- // Skip exporting if there's none
261- if (metricData .isEmpty ()) {
262- return CompletableResultCode .ofSuccess ();
263- }
264-
265- List <TimeSeries > timeSeries ;
266- try {
267- timeSeries =
268- BigtableExporterUtils .convertToApplicationResourceTimeSeries (
269- metricData , taskId , applicationResource .get ());
270- } catch (Throwable e ) {
271- logger .log (
272- Level .WARNING ,
273- "Failed to convert application metric data to cloud monitoring timeseries." ,
274- e );
275- return CompletableResultCode .ofFailure ();
276- }
277-
278- // Construct the request. The project id will be the project id of the detected monitored
279- // resource.
280- ApiFuture <List <Empty >> gceOrGkeFuture ;
281- CompletableResultCode exportCode = new CompletableResultCode ();
282- try {
283- ProjectName projectName =
284- ProjectName .of (
285- applicationResource .get ().getLabelsOrThrow (APPLICATION_RESOURCE_PROJECT_ID ));
286-
287- gceOrGkeFuture = exportTimeSeries (projectName , timeSeries );
288-
289- ApiFutures .addCallback (
290- gceOrGkeFuture ,
291- new ApiFutureCallback <List <Empty >>() {
292- @ Override
293- public void onFailure (Throwable throwable ) {
294- if (applicationExportFailureLogged .compareAndSet (false , true )) {
295- String msg = "createServiceTimeSeries request failed for bigtable metrics." ;
296- if (throwable instanceof PermissionDeniedException ) {
297- msg +=
298- String .format (
299- " Need monitoring metric writer permission on project=%s. Follow https://cloud.google.com/bigtable/docs/client-side-metrics-setup to set up permissions." ,
300- projectName .getProject ());
301- }
302- logger .log (Level .WARNING , msg , throwable );
303- }
304- exportCode .fail ();
305- }
306-
307- @ Override
308- public void onSuccess (List <Empty > emptyList ) {
309- // When an export succeeded reset the export failure flag to false so if there's a
310- // transient failure it'll be logged.
311- applicationExportFailureLogged .set (false );
312- exportCode .succeed ();
313- }
314- },
315- MoreExecutors .directExecutor ());
316-
317- } catch (Exception e ) {
318- logger .log (
319- Level .WARNING ,
320- "Failed to get projectName for application resource " + applicationResource );
321- return CompletableResultCode .ofFailure ();
322- }
323-
324213 return exportCode ;
325214 }
326215
@@ -383,4 +272,87 @@ public CompletableResultCode shutdown() {
383272 public AggregationTemporality getAggregationTemporality (InstrumentType instrumentType ) {
384273 return AggregationTemporality .CUMULATIVE ;
385274 }
275+
276+ interface TimeSeriesConverter {
277+ Map <ProjectName , List <TimeSeries >> convert (Collection <MetricData > metricData );
278+ }
279+
280+ static class PublicTimeSeriesConverter implements TimeSeriesConverter {
281+ private static final ImmutableList <String > BIGTABLE_TABLE_METRICS =
282+ ImmutableSet .of (
283+ OPERATION_LATENCIES_NAME ,
284+ ATTEMPT_LATENCIES_NAME ,
285+ SERVER_LATENCIES_NAME ,
286+ FIRST_RESPONSE_LATENCIES_NAME ,
287+ CLIENT_BLOCKING_LATENCIES_NAME ,
288+ APPLICATION_BLOCKING_LATENCIES_NAME ,
289+ RETRY_COUNT_NAME ,
290+ CONNECTIVITY_ERROR_COUNT_NAME ,
291+ REMAINING_DEADLINE_NAME )
292+ .stream ()
293+ .map (m -> METER_NAME + m )
294+ .collect (ImmutableList .toImmutableList ());
295+
296+ private final String taskId ;
297+
298+ PublicTimeSeriesConverter () {
299+ this (BigtableExporterUtils .DEFAULT_TABLE_VALUE .get ());
300+ }
301+
302+ PublicTimeSeriesConverter (String taskId ) {
303+ this .taskId = taskId ;
304+ }
305+
306+ @ Override
307+ public Map <ProjectName , List <TimeSeries >> convert (Collection <MetricData > metricData ) {
308+ List <MetricData > relevantData =
309+ metricData .stream ()
310+ .filter (md -> BIGTABLE_TABLE_METRICS .contains (md .getName ()))
311+ .collect (Collectors .toList ());
312+ if (relevantData .isEmpty ()) {
313+ return ImmutableMap .of ();
314+ }
315+ return BigtableExporterUtils .convertToBigtableTimeSeries (relevantData , taskId );
316+ }
317+ }
318+
319+ static class InternalTimeSeriesConverter implements TimeSeriesConverter {
320+ private static final ImmutableList <String > APPLICATION_METRICS =
321+ ImmutableSet .of (PER_CONNECTION_ERROR_COUNT_NAME ).stream ()
322+ .map (m -> METER_NAME + m )
323+ .collect (ImmutableList .toImmutableList ());
324+
325+ private final String taskId ;
326+ private final Supplier <MonitoredResource > monitoredResource ;
327+
328+ InternalTimeSeriesConverter (Supplier <MonitoredResource > monitoredResource ) {
329+ this (monitoredResource , BigtableExporterUtils .DEFAULT_TABLE_VALUE .get ());
330+ }
331+
332+ InternalTimeSeriesConverter (Supplier <MonitoredResource > monitoredResource , String taskId ) {
333+ this .monitoredResource = monitoredResource ;
334+ this .taskId = taskId ;
335+ }
336+
337+ @ Override
338+ public Map <ProjectName , List <TimeSeries >> convert (Collection <MetricData > metricData ) {
339+ MonitoredResource monitoredResource = this .monitoredResource .get ();
340+ if (monitoredResource == null ) {
341+ return ImmutableMap .of ();
342+ }
343+
344+ List <MetricData > relevantData =
345+ metricData .stream ()
346+ .filter (md -> APPLICATION_METRICS .contains (md .getName ()))
347+ .collect (Collectors .toList ());
348+ if (relevantData .isEmpty ()) {
349+ return ImmutableMap .of ();
350+ }
351+
352+ return ImmutableMap .of (
353+ ProjectName .of (monitoredResource .getLabelsOrThrow (APPLICATION_RESOURCE_PROJECT_ID )),
354+ BigtableExporterUtils .convertToApplicationResourceTimeSeries (
355+ relevantData , taskId , monitoredResource ));
356+ }
357+ }
386358}
0 commit comments