diff --git a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-mapreduce/pom.xml b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-mapreduce/pom.xml
index 756c0d6693..264497b28c 100644
--- a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-mapreduce/pom.xml
+++ b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-mapreduce/pom.xml
@@ -188,6 +188,11 @@ limitations under the License.
org.apache.hbase:hbase-mapreduce
+
+ javax.activation:activation
+ javax.xml.bind:jaxb-api
+ javax.xml.stream:stax-api
+
diff --git a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml
index 661c34de58..a5be6d570b 100644
--- a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml
+++ b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x-shaded/pom.xml
@@ -291,6 +291,10 @@ limitations under the License.
io.opentelemetry
com.google.bigtable.repackaged.io.opentelemetry
+
+ javax.activation
+ com.google.bigtable.repackaged.javax.activation
+
META-INF/versions/9/io/opentelemetry
META-INF/versions/9/com/google/cloud/bigtable/repackaged/io/opentelemetry
diff --git a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java
index 0a7b948026..83f6eb5529 100644
--- a/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java
+++ b/bigtable-hbase-2.x-parent/bigtable-hbase-2.x/src/main/java/com/google/cloud/bigtable/hbase2_x/BigtableAsyncAdmin.java
@@ -695,6 +695,11 @@ public int getStoreFileCount() {
public Size getStoreFileSize() {
return new Size(size, Unit.BYTE);
}
+
+ @Override
+ public Size getMemStoreSize() {
+ return new Size(size, Unit.BYTE);
+ }
}
/** Handler for unsupported operations for generating Admin class at runtime. */
public static class UnsupportedOperationsHandler implements InvocationHandler {
diff --git a/bigtable-test/bigtable-build-helper/src/it/verify-shaded-jar-entries-ok/pom.xml b/bigtable-test/bigtable-build-helper/src/it/verify-shaded-jar-entries-ok/pom.xml
index 0c29be6832..3de383b0ba 100644
--- a/bigtable-test/bigtable-build-helper/src/it/verify-shaded-jar-entries-ok/pom.xml
+++ b/bigtable-test/bigtable-build-helper/src/it/verify-shaded-jar-entries-ok/pom.xml
@@ -57,6 +57,12 @@ limitations under the License.
com.google.bigtable.repackaged.javax.annotation
+
+ javax.activation
+
+ com.google.bigtable.repackaged.javax.activation
+
+
org.checkerframework
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java
index a91652823a..1623afdaa3 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java
@@ -20,6 +20,7 @@
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
public class MirroringConnection
@@ -41,4 +42,22 @@ public MirroringConnection(MirroringConfiguration mirroringConfiguration, Execut
throws IOException {
super(mirroringConfiguration, pool);
}
+
+ @Override
+ protected Table getMirroringTable(Table primaryTable, Table secondaryTable) {
+ return new MirroringTable(
+ primaryTable,
+ secondaryTable,
+ executorService,
+ this.mismatchDetector,
+ this.flowController,
+ this.secondaryWriteErrorConsumer,
+ this.readSampler,
+ this.timestamper,
+ this.performWritesConcurrently,
+ this.waitForSecondaryWrites,
+ this.mirroringTracer,
+ this.referenceCounter,
+ this.configuration.mirroringOptions.maxLoggedBinaryValueLength);
+ }
}
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java
new file mode 100644
index 0000000000..8af7776ff2
--- /dev/null
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.mirroring.hbase1_x;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler;
+import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumer;
+import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController;
+import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
+import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer;
+import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter;
+import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper;
+import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector;
+import io.opencensus.common.Scope;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Table;
+
+@InternalApi("For internal usage only")
+public class MirroringTable extends com.google.cloud.bigtable.mirroring.core.MirroringTable
+ implements Table {
+ /**
+ * @param executorService ExecutorService is used to perform operations on secondaryTable and
+ * verification tasks.
+ * @param mismatchDetector Detects mismatches in results from operations preformed on both
+ * databases.
+ * @param secondaryWriteErrorConsumer Consumer secondary write errors.
+ */
+ public MirroringTable(
+ Table primaryTable,
+ Table secondaryTable,
+ ExecutorService executorService,
+ MismatchDetector mismatchDetector,
+ FlowController flowController,
+ SecondaryWriteErrorConsumer secondaryWriteErrorConsumer,
+ ReadSampler readSampler,
+ Timestamper timestamper,
+ boolean performWritesConcurrently,
+ boolean waitForSecondaryWrites,
+ MirroringTracer mirroringTracer,
+ ReferenceCounter parentReferenceCounter,
+ int resultScannerBufferedMismatchedResults) {
+ super(
+ primaryTable,
+ secondaryTable,
+ executorService,
+ mismatchDetector,
+ flowController,
+ secondaryWriteErrorConsumer,
+ readSampler,
+ timestamper,
+ performWritesConcurrently,
+ waitForSecondaryWrites,
+ mirroringTracer,
+ parentReferenceCounter,
+ resultScannerBufferedMismatchedResults);
+ }
+
+ @Override
+ public void mutateRow(final RowMutations rowMutations) throws IOException {
+ try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.MUTATE_ROW)) {
+ Log.trace("[%s] mutateRow(rowMutations=%s)", this.getName(), rowMutations);
+ this.batcher.batchSingleWriteOperation(rowMutations);
+ }
+ }
+}
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnection.java
index 86216141cb..caf4effc89 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnection.java
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnection.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2022 Google LLC
+ * Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,8 @@
import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONFIG_PREFIX_KEY;
import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONNECTION_CLASS_KEY;
import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import com.google.cloud.bigtable.mirroring.core.TestConnection;
import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringConnection;
@@ -27,12 +29,23 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class TestMirroringConnection {
+ private Connection connection;
+
+ @Before
+ public void setUp() throws IOException {
+ TestConnection.reset();
+ Configuration configuration = createConfiguration();
+ connection = ConnectionFactory.createConnection(configuration);
+ assertThat(TestConnection.connectionMocks.size()).isEqualTo(2);
+ }
+
private Configuration createConfiguration() {
Configuration configuration = new Configuration();
configuration.set("hbase.client.connection.impl", MirroringConnection.class.getCanonicalName());
@@ -40,8 +53,10 @@ private Configuration createConfiguration() {
MIRRORING_PRIMARY_CONNECTION_CLASS_KEY, TestConnection.class.getCanonicalName());
configuration.set(
MIRRORING_SECONDARY_CONNECTION_CLASS_KEY, TestConnection.class.getCanonicalName());
- configuration.set(MIRRORING_PRIMARY_CONFIG_PREFIX_KEY, "1");
- configuration.set(MIRRORING_SECONDARY_CONFIG_PREFIX_KEY, "2");
+ // Prefix keys have to be set because we are using the same class as primary and secondary
+ // connection class.
+ configuration.set(MIRRORING_PRIMARY_CONFIG_PREFIX_KEY, "primary-connection");
+ configuration.set(MIRRORING_SECONDARY_CONFIG_PREFIX_KEY, "secondary-connection");
configuration.set(
"google.bigtable.mirroring.write-error-log.appender.prefix-path", "/tmp/test-");
configuration.set("google.bigtable.mirroring.write-error-log.appender.max-buffer-size", "1024");
@@ -52,12 +67,29 @@ private Configuration createConfiguration() {
@Test
public void testConnectionFactoryCreatesMirroringConnection() throws IOException {
- Configuration configuration = createConfiguration();
- Connection connection = ConnectionFactory.createConnection(configuration);
assertThat(connection).isInstanceOf(MirroringConnection.class);
assertThat(((MirroringConnection) connection).getPrimaryConnection())
.isInstanceOf(TestConnection.class);
assertThat(((MirroringConnection) connection).getSecondaryConnection())
.isInstanceOf(TestConnection.class);
}
+
+ @Test
+ public void testCloseClosesUnderlyingConnections() throws IOException {
+ connection.close();
+ assertThat(connection.isClosed()).isTrue();
+ verify(TestConnection.connectionMocks.get(0), times(1)).close();
+ verify(TestConnection.connectionMocks.get(1), times(1)).close();
+ }
+
+ @Test
+ public void testAbortAbortsUnderlyingConnections() throws IOException {
+ String expectedString = "expected";
+ Throwable expectedThrowable = new Exception();
+ connection.abort(expectedString, expectedThrowable);
+ verify(TestConnection.connectionMocks.get(0), times(1))
+ .abort(expectedString, expectedThrowable);
+ verify(TestConnection.connectionMocks.get(1), times(1))
+ .abort(expectedString, expectedThrowable);
+ }
}
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnectionClosing.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnectionClosing.java
new file mode 100644
index 0000000000..83e3ffcaad
--- /dev/null
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringConnectionClosing.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package hbase1_x;
+
+import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONFIG_PREFIX_KEY;
+import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONNECTION_CLASS_KEY;
+import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONFIG_PREFIX_KEY;
+import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_SECONDARY_CONNECTION_CLASS_KEY;
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.cloud.bigtable.mirroring.core.ExecutorServiceRule;
+import com.google.cloud.bigtable.mirroring.core.MirroringResultScanner;
+import com.google.cloud.bigtable.mirroring.core.TestConnection;
+import com.google.cloud.bigtable.mirroring.core.TestHelpers;
+import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringConnection;
+import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringTable;
+import com.google.common.util.concurrent.SettableFuture;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Scan;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+
+@RunWith(JUnit4.class)
+public class TestMirroringConnectionClosing {
+ @Rule
+ public final ExecutorServiceRule executorServiceRule =
+ ExecutorServiceRule.spyedSingleThreadedExecutor();
+
+ private Configuration createConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.set("hbase.client.connection.impl", MirroringConnection.class.getCanonicalName());
+ configuration.set(
+ MIRRORING_PRIMARY_CONNECTION_CLASS_KEY, TestConnection.class.getCanonicalName());
+ configuration.set(
+ MIRRORING_SECONDARY_CONNECTION_CLASS_KEY, TestConnection.class.getCanonicalName());
+ // Prefix keys have to be set because we are using the same class as primary and secondary
+ // connection class.
+ configuration.set(MIRRORING_PRIMARY_CONFIG_PREFIX_KEY, "primary-connection");
+ configuration.set(MIRRORING_SECONDARY_CONFIG_PREFIX_KEY, "secondary-connection");
+ configuration.set(
+ "google.bigtable.mirroring.write-error-log.appender.prefix-path", "/tmp/test-");
+ configuration.set("google.bigtable.mirroring.write-error-log.appender.max-buffer-size", "1024");
+ configuration.set(
+ "google.bigtable.mirroring.write-error-log.appender.drop-on-overflow", "false");
+ return configuration;
+ }
+
+ MirroringConnection mirroringConnection;
+ MirroringTable mirroringTable;
+ MirroringResultScanner mirroringScanner;
+
+ @Before
+ public void setUp() throws IOException {
+ TestConnection.reset();
+ Configuration configuration = createConfiguration();
+
+ mirroringConnection =
+ spy(
+ (MirroringConnection)
+ ConnectionFactory.createConnection(
+ configuration, executorServiceRule.executorService));
+ assertThat(TestConnection.connectionMocks.size()).isEqualTo(2);
+
+ mirroringTable = (MirroringTable) mirroringConnection.getTable(TableName.valueOf("test"));
+ mirroringScanner = (MirroringResultScanner) mirroringTable.getScanner(new Scan());
+ }
+
+ @Test
+ public void testUnderlingObjectsAreClosedInCorrectOrder()
+ throws IOException, InterruptedException, ExecutionException, TimeoutException {
+ final SettableFuture unblockSecondaryScanner = SettableFuture.create();
+ final SettableFuture scannerAndTableClosed = SettableFuture.create();
+ final SettableFuture closeFinished = SettableFuture.create();
+ TestHelpers.blockMethodCall(TestConnection.scannerMocks.get(1), unblockSecondaryScanner).next();
+
+ // We expect secondary objects to be closed in correct order - from the innermost to the
+ // outermost.
+ // TestConnection object is created for each both primary and secondary, that connections,
+ // tables and scanners created using those connections are stored in static *Mocks field of
+ // TestConnection, in order of creation.
+ // Thus, `TestConnection.connectionMocks.get(1)` is secondary connection mock,
+ // `TestConnection.tableMocks.get(1)` is a table created using this connection,
+ // and `TestConnection.scannerMocks.get(1)` is a scanner created using this table.
+ InOrder inOrder =
+ Mockito.inOrder(
+ TestConnection.scannerMocks.get(1),
+ TestConnection.tableMocks.get(1),
+ TestConnection.connectionMocks.get(1));
+
+ Thread t =
+ new Thread(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ mirroringScanner.next();
+
+ mirroringScanner.close();
+ mirroringTable.close();
+
+ scannerAndTableClosed.set(null);
+ mirroringConnection.close();
+ closeFinished.set(null);
+ } catch (Exception e) {
+ closeFinished.setException(e);
+ }
+ }
+ });
+ t.start();
+
+ // Wait until secondary request is scheduled.
+ scannerAndTableClosed.get(5, TimeUnit.SECONDS);
+ // Give mirroringConnection.close() some time to run
+ Thread.sleep(3000);
+ // and verify that it was called.
+ verify(mirroringConnection).close();
+
+ // Finish async call.
+ unblockSecondaryScanner.set(null);
+ // The close() should finish.
+ closeFinished.get(5, TimeUnit.SECONDS);
+ t.join();
+
+ executorServiceRule.waitForExecutor();
+
+ inOrder.verify(TestConnection.scannerMocks.get(1)).close();
+ inOrder.verify(TestConnection.tableMocks.get(1)).close();
+ inOrder.verify(TestConnection.connectionMocks.get(1)).close();
+
+ assertThat(mirroringConnection.isClosed()).isTrue();
+ verify(TestConnection.connectionMocks.get(0), times(1)).close();
+ verify(TestConnection.tableMocks.get(0), times(1)).close();
+ verify(TestConnection.scannerMocks.get(0), times(1)).close();
+ }
+
+ @Test(timeout = 5000)
+ public void testClosingConnectionWithoutClosingUnderlyingObjectsShouldntBlock()
+ throws IOException {
+ // We have created a connection, table and scanner.
+ // They are not use asynchronously now, thus connection should be closed without delay.
+ mirroringConnection.close();
+ verify(TestConnection.connectionMocks.get(0)).close();
+ verify(TestConnection.connectionMocks.get(1)).close();
+ }
+
+ @Test
+ public void testInFlightRequestBlockClosingConnection()
+ throws IOException, InterruptedException, TimeoutException, ExecutionException {
+ final SettableFuture unblockSecondaryScanner = SettableFuture.create();
+ final SettableFuture asyncScheduled = SettableFuture.create();
+ final SettableFuture closeFinished = SettableFuture.create();
+ TestHelpers.blockMethodCall(TestConnection.scannerMocks.get(1), unblockSecondaryScanner).next();
+
+ Thread t =
+ new Thread(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ mirroringScanner.next();
+
+ // Not calling close on scanner nor on table.
+ asyncScheduled.set(null);
+
+ mirroringConnection.close();
+ closeFinished.set(null);
+ } catch (Exception e) {
+ closeFinished.setException(e);
+ }
+ }
+ });
+ t.start();
+
+ // Wait until secondary request is scheduled.
+ asyncScheduled.get(5, TimeUnit.SECONDS);
+ // Give mirroringConnection.close() some time to run
+ Thread.sleep(3000);
+ // and verify that it was called.
+ verify(mirroringConnection).close();
+
+ // Finish async call.
+ unblockSecondaryScanner.set(null);
+ // The close() should finish even though we didn't close scanner nor table.
+ closeFinished.get(5, TimeUnit.SECONDS);
+ t.join();
+ }
+
+ @Test
+ public void testConnectionWaitsForAsynchronousClose()
+ throws IOException, InterruptedException, TimeoutException, ExecutionException {
+ final SettableFuture unblockScannerNext = SettableFuture.create();
+ final SettableFuture unblockScannerClose = SettableFuture.create();
+ final SettableFuture asyncScheduled = SettableFuture.create();
+ final SettableFuture closeFinished = SettableFuture.create();
+ TestHelpers.blockMethodCall(TestConnection.scannerMocks.get(1), unblockScannerNext).next();
+ TestHelpers.blockMethodCall(TestConnection.scannerMocks.get(1), unblockScannerClose).close();
+
+ Thread t =
+ new Thread(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ mirroringScanner.next();
+ mirroringScanner.close();
+ asyncScheduled.set(null);
+ mirroringConnection.close();
+ closeFinished.set(null);
+ } catch (Exception e) {
+ closeFinished.setException(e);
+ }
+ }
+ });
+ t.start();
+
+ // Wait until secondary request is scheduled.
+ asyncScheduled.get(5, TimeUnit.SECONDS);
+ // Unblock scanner next.
+ unblockScannerNext.set(null);
+ // Give mirroringConnection.close() and secondaryScanner.close() some time to run
+ Thread.sleep(1000);
+ // and verify that they were called.
+ verify(mirroringConnection).close();
+ verify(TestConnection.scannerMocks.get(1)).close();
+
+ // secondary.close() was not yet finished, close should be blocked.
+ try {
+ closeFinished.get(2, TimeUnit.SECONDS);
+ fail();
+ } catch (TimeoutException expected) {
+ // async operation has not finished - close should block.
+ }
+
+ // Finish secondaryScanner.close().
+ unblockScannerClose.set(null);
+ // And now connection.close() should unblock.
+ closeFinished.get(5, TimeUnit.SECONDS);
+ t.join();
+ }
+}
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringTable.java
new file mode 100644
index 0000000000..d904934c12
--- /dev/null
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/hbase1_x/TestMirroringTable.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package hbase1_x;
+
+import static com.google.cloud.bigtable.mirroring.core.TestHelpers.mockBatch;
+import static com.google.cloud.bigtable.mirroring.core.TestHelpers.setupFlowControllerMock;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.cloud.bigtable.mirroring.core.ExecutorServiceRule;
+import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler;
+import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumerWithMetrics;
+import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController;
+import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringMetricsRecorder;
+import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringSpanFactory;
+import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer;
+import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter;
+import com.google.cloud.bigtable.mirroring.core.utils.timestamper.NoopTimestamper;
+import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper;
+import com.google.cloud.bigtable.mirroring.core.verification.DefaultMismatchDetector;
+import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector;
+import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringTable;
+import io.opencensus.trace.Tracing;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Table;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@RunWith(JUnit4.class)
+public class TestMirroringTable {
+ @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Rule
+ public final ExecutorServiceRule executorServiceRule =
+ ExecutorServiceRule.singleThreadedExecutor();
+
+ @Mock Table primaryTable;
+ @Mock Table secondaryTable;
+ @Mock FlowController flowController;
+ @Mock MirroringMetricsRecorder mirroringMetricsRecorder;
+ @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer;
+ @Mock ReferenceCounter referenceCounter;
+
+ Timestamper timestamper = new NoopTimestamper();
+ MismatchDetector mismatchDetector;
+ MirroringTable mirroringTable;
+ MirroringTracer mirroringTracer;
+
+ @Before
+ public void setUp() {
+ setupFlowControllerMock(flowController);
+ this.mirroringTracer =
+ new MirroringTracer(
+ new MirroringSpanFactory(Tracing.getTracer(), mirroringMetricsRecorder),
+ mirroringMetricsRecorder);
+ this.mismatchDetector = spy(new DefaultMismatchDetector(this.mirroringTracer, 100));
+ this.mirroringTable =
+ spy(
+ new MirroringTable(
+ primaryTable,
+ secondaryTable,
+ this.executorServiceRule.executorService,
+ mismatchDetector,
+ flowController,
+ secondaryWriteErrorConsumer,
+ new ReadSampler(100),
+ this.timestamper,
+ false,
+ false,
+ this.mirroringTracer,
+ this.referenceCounter,
+ 1000));
+ }
+
+ @Test
+ public void testMutateRow() throws IOException, InterruptedException {
+ RowMutations mutations = new RowMutations("r1".getBytes());
+ List extends Row> listOfMutations = Arrays.asList(mutations);
+ mockBatch(primaryTable, secondaryTable, mutations, new Result());
+ mirroringTable.mutateRow(mutations);
+ executorServiceRule.waitForExecutor();
+ verify(primaryTable, times(1)).batch(eq(listOfMutations), any(Object[].class));
+ verify(secondaryTable, times(1)).batch(eq(listOfMutations), any(Object[].class));
+ }
+}
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java
index 75ffe6fe33..6a0eb84f75 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java
@@ -80,7 +80,7 @@ public FailingHBaseHRegion2(
}
@Override
- public HRegion.RegionScannerImpl getScanner(Scan scan, List additionalScanners)
+ public RegionScannerImpl getScanner(Scan scan, List additionalScanners)
throws IOException {
// HBase 2.x implements Gets as Scans with start row == end row == requested row.
processRowThrow(scan.getStartRow());
@@ -88,9 +88,9 @@ public HRegion.RegionScannerImpl getScanner(Scan scan, List add
}
@Override
- public void mutateRow(RowMutations rm) throws IOException {
+ public Result mutateRow(RowMutations rm) throws IOException {
processRowThrow(rm.getRow());
- super.mutateRow(rm);
+ return super.mutateRow(rm);
}
@Override
@@ -100,12 +100,6 @@ public OperationStatus[] batchMutate(
mutations, (m) -> super.batchMutate(m, atomic, nonceGroup, nonce));
}
- @Override
- public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
- throws IOException {
- return batchMutateWithFailures(mutations, (m) -> super.batchMutate(m, nonceGroup, nonce));
- }
-
@Override
public Result get(Get get) throws IOException {
processRowThrow(get.getRow());
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-hadoop/pom.xml b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-hadoop/pom.xml
index a9d66c0b2b..9066e808f3 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-hadoop/pom.xml
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-hadoop/pom.xml
@@ -42,6 +42,10 @@ limitations under the License.
log4j
log4j
+
+ ch.qos.reload4j
+ reload4j
+
@@ -202,6 +206,7 @@ limitations under the License.
commons-logging:commons-logging
log4j:log4j
+ ch.qos.reload4j:reload4j
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-shaded/pom.xml b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-shaded/pom.xml
index 9293d1123e..3cdc16de8c 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-shaded/pom.xml
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-shaded/pom.xml
@@ -126,6 +126,18 @@ limitations under the License.
io.opencensus
${shading-prefix}.io.opencensus
+
+ io.opentelemetry
+ com.google.cloud.bigtable.mirroring.repackaged.io.opentelemetry
+
+
+ META-INF/versions/9/io/opentelemetry
+ META-INF/versions/9/com/google/cloud/bigtable/mirroring/repackaged/io/opentelemetry
+
+
+ javax.activation
+ com.google.cloud.bigtable.mirroring.repackaged.javax.activation
+
io.grpc
@@ -198,6 +210,13 @@ limitations under the License.
org.apache.hbase:hbase-shaded-client
+
+
+ io.opentelemetry:opentelemetry-api
+ io.opentelemetry:opentelemetry-context
+ io.opentelemetry:opentelemetry-semconv
+
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java
index e0e4a7e1ba..6f8b7840de 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java
@@ -370,6 +370,16 @@ public AsyncTableBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit) {
return this;
}
+ @Override
+ public AsyncTableBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit) {
+ setTimeParameter(
+ pause,
+ unit,
+ this.primaryTableBuilder::setRetryPauseForServerOverloaded,
+ this.secondaryTableBuilder::setRetryPauseForServerOverloaded);
+ return this;
+ }
+
@Override
public AsyncTableBuilder setMaxAttempts(int maxAttempts) {
setIntegerParameter(
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java
index d92fe51911..a3157f6759 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java
@@ -63,6 +63,8 @@
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
@@ -202,9 +204,9 @@ public CompletableFuture increment(Increment increment) {
}
@Override
- public CompletableFuture mutateRow(RowMutations rowMutations) {
+ public CompletableFuture mutateRow(RowMutations rowMutations) {
this.timestamper.fillTimestamp(rowMutations);
- CompletableFuture primaryFuture = this.primaryTable.mutateRow(rowMutations);
+ CompletableFuture primaryFuture = this.primaryTable.mutateRow(rowMutations);
return writeWithFlowControl(
new WriteOperationInfo(rowMutations),
primaryFuture,
@@ -534,6 +536,16 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(
throw new UnsupportedOperationException("not implemented");
}
+ @Override
+ public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
+ @Override
+ public List> checkAndMutate(List list) {
+ throw new UnsupportedOperationException("not implemented");
+ }
+
@Override
public void scan(Scan scan, C consumer) {
this.primaryTable.scan(scan, consumer);
@@ -561,7 +573,7 @@ public MirroringCheckAndMutateBuilder(CheckAndMutateBuilder primaryBuilder) {
private OperationStages> checkAndMutate(
WriteOperationInfo writeOperationInfo,
CompletableFuture primary,
- Supplier> secondary) {
+ Supplier> secondary) {
OperationStages> returnedValue =
new OperationStages<>(new CompletableFuture<>());
primary
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java
index ffe21d4871..c6cabb0294 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java
@@ -30,6 +30,24 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService
super(conf, managed, pool, user);
}
+ @Override
+ protected Table getMirroringTable(Table primaryTable, Table secondaryTable) {
+ return new MirroringTable(
+ primaryTable,
+ secondaryTable,
+ executorService,
+ this.mismatchDetector,
+ this.flowController,
+ this.secondaryWriteErrorConsumer,
+ this.readSampler,
+ this.timestamper,
+ this.performWritesConcurrently,
+ this.waitForSecondaryWrites,
+ this.mirroringTracer,
+ this.referenceCounter,
+ this.configuration.mirroringOptions.maxLoggedBinaryValueLength);
+ }
+
public MirroringConnection(Configuration conf, ExecutorService pool, User user) throws Throwable {
this(conf, false, pool, user);
}
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java
index aad985884a..47c38a7e27 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java
@@ -15,26 +15,39 @@
*/
package com.google.cloud.bigtable.mirroring.hbase2_x;
+import com.google.cloud.bigtable.mirroring.core.utils.CallableThrowingIOException;
+import com.google.cloud.bigtable.mirroring.core.utils.OperationUtils;
import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler;
import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumer;
import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController;
+import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.WriteOperationInfo;
+import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation;
import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer;
import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter;
import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper;
import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector;
+import io.opencensus.common.Scope;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
public class MirroringTable extends com.google.cloud.bigtable.mirroring.core.MirroringTable
implements Table {
+ // We keep a reference to the secondary table to get around the
+ // api change for mutate rows (used to return void, not returns Result)
+ private Table secondaryTable;
+
public MirroringTable(
Table primaryTable,
Table secondaryTable,
@@ -63,6 +76,7 @@ public MirroringTable(
mirroringTracer,
referenceCounter,
resultScannerBufferedMismatchedResults);
+ this.secondaryTable = secondaryTable;
}
@Override
@@ -80,6 +94,54 @@ public boolean[] exists(List gets) throws IOException {
return existsAll(gets);
}
+ @Override
+ public Result mutateRow(final RowMutations rowMutations) throws IOException {
+ try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.MUTATE_ROW)) {
+ Log.trace("[%s] mutateRow(mutateRow=%s)", this.getName(), rowMutations);
+
+ if (rowMutations.getMutations().isEmpty()) {
+ return Result.EMPTY_RESULT;
+ }
+
+ Result result =
+ this.mirroringTracer.spanFactory.wrapPrimaryOperation(
+ new CallableThrowingIOException() {
+ @Override
+ public Result call() throws IOException {
+ return primaryTable.mutateRow(rowMutations);
+ }
+ },
+ HBaseOperation.MUTATE_ROW);
+
+ Mutation firstMutation = rowMutations.getMutations().get(0);
+
+ // If it is either Append or Increment, the underlying operation is a rmw and we need
+ // the result of that operation to apply on the secondary table
+ if (firstMutation instanceof Append || firstMutation instanceof Increment) {
+ Put put = OperationUtils.makePutFromResult(result);
+
+ scheduleSequentialWriteOperation(
+ new WriteOperationInfo(put), this.secondaryAsyncWrapper.put(put));
+ } else {
+ // Async wrapper mutateRow implementation returns void. HBase 2.4+ returns result
+ // so we skip the syntatic sugar
+ scheduleSequentialWriteOperation(
+ new WriteOperationInfo(rowMutations),
+ this.secondaryAsyncWrapper.createSubmitTaskSupplier(
+ new CallableThrowingIOException() {
+ @Override
+ public Result call() throws IOException {
+ Log.trace("mutateRow(RowMutations)");
+ return secondaryTable.mutateRow(rowMutations);
+ }
+ },
+ HBaseOperation.MUTATE_ROW));
+ }
+
+ return result;
+ }
+ }
+
/**
* HBase 1.x's {@link Table#append} returns {@code null} when {@link Append#isReturnResults} is
* {@code false}
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestConnection.java
new file mode 100644
index 0000000000..f0c2b48205
--- /dev/null
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestConnection.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.mirroring.hbase2_x;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableBuilder;
+import org.apache.hadoop.hbase.security.User;
+
+public class TestConnection extends com.google.cloud.bigtable.mirroring.core.TestConnection
+ implements Connection {
+
+ public TestConnection(Configuration conf, boolean managed, ExecutorService pool, User user) {
+ super(conf, managed, pool, user);
+ }
+
+ public TestConnection(Configuration conf, ExecutorService pool, User user) {
+ super(conf, false, pool, user);
+ }
+
+ @Override
+ public void clearRegionLocationCache() {}
+
+ @Override
+ public TableBuilder getTableBuilder(TableName tableName, ExecutorService executorService) {
+ return null;
+ }
+}
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java
index 4d2c5839d0..5252729080 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java
@@ -757,7 +757,7 @@ public void testConditionalWriteHappensWhenSecondaryErred()
when(primaryBuilder.thenMutate(mutations)).thenReturn(primaryFuture);
IOException ioe = new IOException("expected");
- CompletableFuture exceptionalFuture = new CompletableFuture<>();
+ CompletableFuture exceptionalFuture = new CompletableFuture<>();
exceptionalFuture.completeExceptionally(ioe);
when(secondaryTable.mutateRow(mutations)).thenReturn(exceptionalFuture);
@@ -841,12 +841,12 @@ public void testDelete() throws InterruptedException, ExecutionException {
@Test
public void testMutateRow() throws ExecutionException, InterruptedException {
RowMutations mutations = new RowMutations("r1".getBytes());
- CompletableFuture primaryFuture = new CompletableFuture<>();
- CompletableFuture secondaryFuture = new CompletableFuture<>();
+ CompletableFuture primaryFuture = new CompletableFuture<>();
+ CompletableFuture secondaryFuture = new CompletableFuture<>();
when(primaryTable.mutateRow(mutations)).thenReturn(primaryFuture);
when(secondaryTable.mutateRow(mutations)).thenReturn(secondaryFuture);
- CompletableFuture resultFuture = mirroringTable.mutateRow(mutations);
+ CompletableFuture resultFuture = mirroringTable.mutateRow(mutations);
primaryFuture.complete(null);
secondaryFuture.complete(null);
resultFuture.get();
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnection.java
similarity index 94%
rename from hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringConnection.java
rename to hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnection.java
index 9c75fd0e1b..363795e59e 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringConnection.java
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnection.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Google LLC
+ * Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.google.cloud.bigtable.mirroring.core;
+package com.google.cloud.bigtable.mirroring.hbase2_x;
import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONFIG_PREFIX_KEY;
import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONNECTION_CLASS_KEY;
@@ -90,9 +90,4 @@ public void testAbortAbortsUnderlyingConnections() throws IOException {
verify(TestConnection.connectionMocks.get(1), times(1))
.abort(expectedString, expectedThrowable);
}
-
- @Test
- public void testConstructorTakingMirroringConfiguration() throws IOException {
- new MirroringConnection(new MirroringConfiguration(createConfiguration()), null);
- }
}
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringConnectionClosing.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnectionClosing.java
similarity index 97%
rename from hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringConnectionClosing.java
rename to hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnectionClosing.java
index 1a82fe38f1..525d16347d 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/test/java/com/google/cloud/bigtable/mirroring/core/TestMirroringConnectionClosing.java
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringConnectionClosing.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.google.cloud.bigtable.mirroring.core;
+package com.google.cloud.bigtable.mirroring.hbase2_x;
import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONFIG_PREFIX_KEY;
import static com.google.cloud.bigtable.mirroring.core.utils.MirroringConfigurationHelper.MIRRORING_PRIMARY_CONNECTION_CLASS_KEY;
@@ -25,6 +25,9 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import com.google.cloud.bigtable.mirroring.core.ExecutorServiceRule;
+import com.google.cloud.bigtable.mirroring.core.MirroringResultScanner;
+import com.google.cloud.bigtable.mirroring.core.TestHelpers;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringTable.java
new file mode 100644
index 0000000000..3fc5040d54
--- /dev/null
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringTable.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.mirroring.hbase2_x;
+
+import static com.google.cloud.bigtable.mirroring.core.TestHelpers.setupFlowControllerMock;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.bigtable.mirroring.core.ExecutorServiceRule;
+import com.google.cloud.bigtable.mirroring.core.utils.ReadSampler;
+import com.google.cloud.bigtable.mirroring.core.utils.SecondaryWriteErrorConsumerWithMetrics;
+import com.google.cloud.bigtable.mirroring.core.utils.flowcontrol.FlowController;
+import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringMetricsRecorder;
+import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringSpanFactory;
+import com.google.cloud.bigtable.mirroring.core.utils.mirroringmetrics.MirroringTracer;
+import com.google.cloud.bigtable.mirroring.core.utils.referencecounting.ReferenceCounter;
+import com.google.cloud.bigtable.mirroring.core.utils.timestamper.NoopTimestamper;
+import com.google.cloud.bigtable.mirroring.core.utils.timestamper.Timestamper;
+import com.google.cloud.bigtable.mirroring.core.verification.DefaultMismatchDetector;
+import com.google.cloud.bigtable.mirroring.core.verification.MismatchDetector;
+import io.opencensus.trace.Tracing;
+import java.io.IOException;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Table;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@RunWith(JUnit4.class)
+public class TestMirroringTable {
+ @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Rule
+ public final ExecutorServiceRule executorServiceRule =
+ ExecutorServiceRule.singleThreadedExecutor();
+
+ @Mock Table primaryTable;
+ @Mock Table secondaryTable;
+ @Mock FlowController flowController;
+ @Mock MirroringMetricsRecorder mirroringMetricsRecorder;
+ @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer;
+ @Mock ReferenceCounter referenceCounter;
+
+ Timestamper timestamper = new NoopTimestamper();
+ MismatchDetector mismatchDetector;
+ MirroringTable mirroringTable;
+ MirroringTracer mirroringTracer;
+
+ @Before
+ public void setUp() {
+ setupFlowControllerMock(flowController);
+ this.mirroringTracer =
+ new MirroringTracer(
+ new MirroringSpanFactory(Tracing.getTracer(), mirroringMetricsRecorder),
+ mirroringMetricsRecorder);
+ this.mismatchDetector = spy(new DefaultMismatchDetector(this.mirroringTracer, 100));
+ this.mirroringTable =
+ spy(
+ new MirroringTable(
+ primaryTable,
+ secondaryTable,
+ this.executorServiceRule.executorService,
+ mismatchDetector,
+ flowController,
+ secondaryWriteErrorConsumer,
+ new ReadSampler(100),
+ this.timestamper,
+ false,
+ false,
+ this.mirroringTracer,
+ this.referenceCounter,
+ 1000));
+ }
+
+ @Test
+ public void testMutateRow() throws IOException, InterruptedException {
+ RowMutations mutations = new RowMutations("r1".getBytes());
+ mutations.add(new Put("r1".getBytes()));
+ when(primaryTable.mutateRow(any(RowMutations.class))).thenReturn(Result.EMPTY_RESULT);
+ mirroringTable.mutateRow(mutations);
+ executorServiceRule.waitForExecutor();
+ verify(primaryTable, times(1)).mutateRow(mutations);
+ verify(secondaryTable, times(1)).mutateRow(mutations);
+ }
+}
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringConnection.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringConnection.java
index aee0e36800..daef27e929 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringConnection.java
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringConnection.java
@@ -51,7 +51,7 @@
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
-public class MirroringConnection implements Connection {
+public abstract class MirroringConnection implements Connection {
private static final com.google.cloud.bigtable.mirroring.core.utils.Logger Log =
new com.google.cloud.bigtable.mirroring.core.utils.Logger(MirroringConnection.class);
protected final FlowController flowController;
@@ -231,20 +231,7 @@ public Table call() throws IOException {
},
HBaseOperation.GET_TABLE);
Table secondaryTable = this.secondaryConnection.getTable(tableName);
- return new MirroringTable(
- primaryTable,
- secondaryTable,
- executorService,
- this.mismatchDetector,
- this.flowController,
- this.secondaryWriteErrorConsumer,
- this.readSampler,
- this.timestamper,
- this.performWritesConcurrently,
- this.waitForSecondaryWrites,
- this.mirroringTracer,
- this.referenceCounter,
- this.configuration.mirroringOptions.maxLoggedBinaryValueLength);
+ return getMirroringTable(primaryTable, secondaryTable);
}
}
@@ -416,4 +403,6 @@ public void run() {
// This error is not reported to the user.
}
}
+
+ protected abstract Table getMirroringTable(Table primaryTable, Table secondaryTable);
}
diff --git a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java
index 31a7abc88f..6298f35269 100644
--- a/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java
+++ b/hbase-migration-tools/mirroring-client/bigtable-hbase-mirroring-client-core-parent/bigtable-hbase-mirroring-client-core/src/main/java/com/google/cloud/bigtable/mirroring/core/MirroringTable.java
@@ -89,9 +89,9 @@
* asynchronously. Read operations are mirrored to verify that content of both databases matches.
*/
@InternalApi("For internal usage only")
-public class MirroringTable implements Table {
+public class MirroringTable {
- private static final Logger Log = new Logger(MirroringTable.class);
+ protected static final Logger Log = new Logger(MirroringTable.class);
private static final Predicate