4242import com .google .common .base .MoreObjects ;
4343import com .google .common .collect .ImmutableList ;
4444import com .google .common .collect .ImmutableSet ;
45+ import com .google .common .collect .Iterables ;
4546import com .google .common .util .concurrent .MoreExecutors ;
4647import com .google .monitoring .v3 .CreateTimeSeriesRequest ;
4748import com .google .monitoring .v3 .ProjectName ;
5354import io .opentelemetry .sdk .metrics .data .MetricData ;
5455import io .opentelemetry .sdk .metrics .export .MetricExporter ;
5556import java .io .IOException ;
57+ import java .util .ArrayList ;
5658import java .util .Arrays ;
5759import java .util .Collection ;
5860import java .util .List ;
@@ -85,6 +87,10 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {
8587
8688 private static final String APPLICATION_RESOURCE_PROJECT_ID = "project_id" ;
8789
90+ // This the quota limit from Cloud Monitoring. More details in
91+ // https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
92+ private static final int EXPORT_BATCH_SIZE_LIMIT = 200 ;
93+
8894 private final MetricServiceClient client ;
8995
9096 private final String bigtableProjectId ;
@@ -216,19 +222,12 @@ private CompletableResultCode exportBigtableResourceMetrics(Collection<MetricDat
216222 }
217223
218224 ProjectName projectName = ProjectName .of (bigtableProjectId );
219- CreateTimeSeriesRequest bigtableRequest =
220- CreateTimeSeriesRequest .newBuilder ()
221- .setName (projectName .toString ())
222- .addAllTimeSeries (bigtableTimeSeries )
223- .build ();
224-
225- ApiFuture <Empty > future =
226- this .client .createServiceTimeSeriesCallable ().futureCall (bigtableRequest );
225+ ApiFuture <List <Empty >> future = exportTimeSeries (projectName , bigtableTimeSeries );
227226
228227 CompletableResultCode bigtableExportCode = new CompletableResultCode ();
229228 ApiFutures .addCallback (
230229 future ,
231- new ApiFutureCallback <Empty >() {
230+ new ApiFutureCallback <List < Empty > >() {
232231 @ Override
233232 public void onFailure (Throwable throwable ) {
234233 if (bigtableExportFailureLogged .compareAndSet (false , true )) {
@@ -245,7 +244,7 @@ public void onFailure(Throwable throwable) {
245244 }
246245
247246 @ Override
248- public void onSuccess (Empty empty ) {
247+ public void onSuccess (List < Empty > emptyList ) {
249248 // When an export succeeded reset the export failure flag to false so if there's a
250249 // transient failure it'll be logged.
251250 bigtableExportFailureLogged .set (false );
@@ -290,22 +289,17 @@ private CompletableResultCode exportApplicationResourceMetrics(
290289
291290 // Construct the request. The project id will be the project id of the detected monitored
292291 // resource.
293- ApiFuture <Empty > gceOrGkeFuture ;
292+ ApiFuture <List < Empty > > gceOrGkeFuture ;
294293 CompletableResultCode exportCode = new CompletableResultCode ();
295294 try {
296295 ProjectName projectName =
297296 ProjectName .of (applicationResource .getLabelsOrThrow (APPLICATION_RESOURCE_PROJECT_ID ));
298- CreateTimeSeriesRequest request =
299- CreateTimeSeriesRequest .newBuilder ()
300- .setName (projectName .toString ())
301- .addAllTimeSeries (timeSeries )
302- .build ();
303297
304- gceOrGkeFuture = this . client . createServiceTimeSeriesCallable (). futureCall ( request );
298+ gceOrGkeFuture = exportTimeSeries ( projectName , timeSeries );
305299
306300 ApiFutures .addCallback (
307301 gceOrGkeFuture ,
308- new ApiFutureCallback <Empty >() {
302+ new ApiFutureCallback <List < Empty > >() {
309303 @ Override
310304 public void onFailure (Throwable throwable ) {
311305 if (applicationExportFailureLogged .compareAndSet (false , true )) {
@@ -322,7 +316,7 @@ public void onFailure(Throwable throwable) {
322316 }
323317
324318 @ Override
325- public void onSuccess (Empty empty ) {
319+ public void onSuccess (List < Empty > emptyList ) {
326320 // When an export succeeded reset the export failure flag to false so if there's a
327321 // transient failure it'll be logged.
328322 applicationExportFailureLogged .set (false );
@@ -341,6 +335,23 @@ public void onSuccess(Empty empty) {
341335 return exportCode ;
342336 }
343337
338+ private ApiFuture <List <Empty >> exportTimeSeries (
339+ ProjectName projectName , List <TimeSeries > timeSeries ) {
340+ List <ApiFuture <Empty >> batchResults = new ArrayList <>();
341+
342+ for (List <TimeSeries > batch : Iterables .partition (timeSeries , EXPORT_BATCH_SIZE_LIMIT )) {
343+ CreateTimeSeriesRequest req =
344+ CreateTimeSeriesRequest .newBuilder ()
345+ .setName (projectName .toString ())
346+ .addAllTimeSeries (batch )
347+ .build ();
348+ ApiFuture <Empty > f = this .client .createServiceTimeSeriesCallable ().futureCall (req );
349+ batchResults .add (f );
350+ }
351+
352+ return ApiFutures .allAsList (batchResults );
353+ }
354+
344355 @ Override
345356 public CompletableResultCode flush () {
346357 if (lastExportCode != null ) {
0 commit comments