@@ -664,6 +664,72 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
664664 methodName , new MutateRowCallable (retrying , requestContext ));
665665 }
666666
667+ /**
668+ * Internal helper to create the base MutateRows callable chain. The chain is responsible for
669+ * retrying individual entry in case of error.
670+ *
671+ * <p>NOTE: the caller is responsible for adding tracing & metrics.
672+ *
673+ * @see MutateRowsRetryingCallable for more details
674+ */
675+ private UnaryCallable <MutateRowsRequest , Void > createMutateRowsBaseCallable () {
676+ ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > base =
677+ GrpcRawCallableFactory .createServerStreamingCallable (
678+ GrpcCallSettings .<MutateRowsRequest , MutateRowsResponse >newBuilder ()
679+ .setMethodDescriptor (BigtableGrpc .getMutateRowsMethod ())
680+ .setParamsExtractor (
681+ new RequestParamsExtractor <MutateRowsRequest >() {
682+ @ Override
683+ public Map <String , String > extract (MutateRowsRequest mutateRowsRequest ) {
684+ return ImmutableMap .of (
685+ "table_name" , mutateRowsRequest .getTableName (),
686+ "app_profile_id" , mutateRowsRequest .getAppProfileId ());
687+ }
688+ })
689+ .build (),
690+ settings .bulkMutateRowsSettings ().getRetryableCodes ());
691+
692+ ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > callable =
693+ new StatsHeadersServerStreamingCallable <>(base );
694+
695+ if (settings .bulkMutateRowsSettings ().isServerInitiatedFlowControlEnabled ()) {
696+ callable = new RateLimitingServerStreamingCallable (callable );
697+ }
698+
699+ // Sometimes MutateRows connections are disconnected via an RST frame. This error is transient
700+ // and
701+ // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
702+ // which by default is not retryable. Convert the exception so it can be retried in the client.
703+ ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > convertException =
704+ new ConvertExceptionCallable <>(callable );
705+
706+ ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > withBigtableTracer =
707+ new BigtableTracerStreamingCallable <>(convertException );
708+
709+ BasicResultRetryAlgorithm <Void > resultRetryAlgorithm ;
710+ if (settings .getEnableRetryInfo ()) {
711+ resultRetryAlgorithm = new RetryInfoRetryAlgorithm <>();
712+ } else {
713+ resultRetryAlgorithm = new ApiResultRetryAlgorithm <>();
714+ }
715+
716+ RetryAlgorithm <Void > retryAlgorithm =
717+ new RetryAlgorithm <>(
718+ resultRetryAlgorithm ,
719+ new ExponentialRetryAlgorithm (
720+ settings .bulkMutateRowsSettings ().getRetrySettings (), clientContext .getClock ()));
721+
722+ RetryingExecutorWithContext <Void > retryingExecutor =
723+ new ScheduledRetryingExecutor <>(retryAlgorithm , clientContext .getExecutor ());
724+
725+ return new MutateRowsRetryingCallable (
726+ clientContext .getDefaultCallContext (),
727+ withBigtableTracer ,
728+ retryingExecutor ,
729+ settings .bulkMutateRowsSettings ().getRetryableCodes (),
730+ retryAlgorithm );
731+ }
732+
667733 /**
668734 * Creates a callable chain to handle MutatesRows RPCs. This is meant to be used for manual
669735 * batching. The chain will:
@@ -773,72 +839,6 @@ public Batcher<ByteString, Row> newBulkReadRowsBatcher(
773839 MoreObjects .firstNonNull (ctx , clientContext .getDefaultCallContext ()));
774840 }
775841
776- /**
777- * Internal helper to create the base MutateRows callable chain. The chain is responsible for
778- * retrying individual entry in case of error.
779- *
780- * <p>NOTE: the caller is responsible for adding tracing & metrics.
781- *
782- * @see MutateRowsRetryingCallable for more details
783- */
784- private UnaryCallable <MutateRowsRequest , Void > createMutateRowsBaseCallable () {
785- ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > base =
786- GrpcRawCallableFactory .createServerStreamingCallable (
787- GrpcCallSettings .<MutateRowsRequest , MutateRowsResponse >newBuilder ()
788- .setMethodDescriptor (BigtableGrpc .getMutateRowsMethod ())
789- .setParamsExtractor (
790- new RequestParamsExtractor <MutateRowsRequest >() {
791- @ Override
792- public Map <String , String > extract (MutateRowsRequest mutateRowsRequest ) {
793- return ImmutableMap .of (
794- "table_name" , mutateRowsRequest .getTableName (),
795- "app_profile_id" , mutateRowsRequest .getAppProfileId ());
796- }
797- })
798- .build (),
799- settings .bulkMutateRowsSettings ().getRetryableCodes ());
800-
801- ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > callable =
802- new StatsHeadersServerStreamingCallable <>(base );
803-
804- if (settings .bulkMutateRowsSettings ().isServerInitiatedFlowControlEnabled ()) {
805- callable = new RateLimitingServerStreamingCallable (callable );
806- }
807-
808- // Sometimes MutateRows connections are disconnected via an RST frame. This error is transient
809- // and
810- // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
811- // which by default is not retryable. Convert the exception so it can be retried in the client.
812- ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > convertException =
813- new ConvertExceptionCallable <>(callable );
814-
815- ServerStreamingCallable <MutateRowsRequest , MutateRowsResponse > withBigtableTracer =
816- new BigtableTracerStreamingCallable <>(convertException );
817-
818- BasicResultRetryAlgorithm <Void > resultRetryAlgorithm ;
819- if (settings .getEnableRetryInfo ()) {
820- resultRetryAlgorithm = new RetryInfoRetryAlgorithm <>();
821- } else {
822- resultRetryAlgorithm = new ApiResultRetryAlgorithm <>();
823- }
824-
825- RetryAlgorithm <Void > retryAlgorithm =
826- new RetryAlgorithm <>(
827- resultRetryAlgorithm ,
828- new ExponentialRetryAlgorithm (
829- settings .bulkMutateRowsSettings ().getRetrySettings (), clientContext .getClock ()));
830-
831- RetryingExecutorWithContext <Void > retryingExecutor =
832- new ScheduledRetryingExecutor <>(retryAlgorithm , clientContext .getExecutor ());
833-
834- return new MutateRowsRetryingCallable (
835- clientContext .getDefaultCallContext (),
836- withBigtableTracer ,
837- retryingExecutor ,
838- settings .bulkMutateRowsSettings ().getRetryableCodes (),
839- retryAlgorithm );
840- }
841-
842842 /**
843843 * Creates a callable chain to handle CheckAndMutateRow RPCs. THe chain will:
844844 *
0 commit comments