Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions kafka-connect-bigtable-sink/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
<maven.clean.version>3.4.1</maven.clean.version>

<integration.test.plugin.path>${project.basedir}/integration_test_plugins</integration.test.plugin.path>
<google.sink.package.path>${project.basedir}/../sink/target/sink-${project.version}-package</google.sink.package.path>
<google.sink.package.plugin.dir>${integration.test.plugin.path}/google-sink</google.sink.package.plugin.dir>
<google.sink.jar.path>${project.basedir}/../sink/target/sink-${project.version}-jar-with-dependencies.jar</google.sink.jar.path>
<google.sink.jar.plugin.dir>${integration.test.plugin.path}/google-sink</google.sink.jar.plugin.dir>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -217,10 +217,8 @@
</goals>
<configuration>
<target>
<mkdir dir="${google.sink.package.plugin.dir}"/>
<copy todir="${google.sink.package.plugin.dir}">
<fileset dir="${google.sink.package.path}"/>
</copy>
<mkdir dir="${google.sink.jar.plugin.dir}"/>
<copy file="${google.sink.jar.path}" tofile="${google.sink.jar.plugin.dir}/sink.jar"/>
</target>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@
import com.google.cloud.kafka.connect.bigtable.wrappers.BigtableTableAdminClientInterface;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.storage.StringConverter;

public abstract class BaseIT {
public static final String CREDENTIALS_PATH_ENV_VAR = "GOOGLE_APPLICATION_CREDENTIALS";
public static final String CONNECTOR_CLASS_NAME =
"com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector";

Expand Down Expand Up @@ -62,10 +60,6 @@ public Map<String, String> baseConnectorProps() {
// TODO: get it from environment variables after migrating to kokoro.
result.put(GCP_PROJECT_ID_CONFIG, "todotodo");
result.put(BIGTABLE_INSTANCE_ID_CONFIG, "todotodo");
// TODO: fix it when transitioning to kokoro.
result.put(
BigtableSinkConfig.GCP_CREDENTIALS_PATH_CONFIG,
Objects.requireNonNull(System.getenv(CREDENTIALS_PATH_ENV_VAR)));

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.kafka.connect.bigtable.wrappers.BigtableTableAdminClientInterface;
import com.google.common.util.concurrent.Futures;
import com.google.protobuf.ByteString;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -142,8 +141,7 @@ public void waitUntilBigtableTableHasExactSetOfColumnFamilies(
waitForCondition(
testConditionIgnoringTransientErrors(
() ->
bigtableAdmin.getTable(tableId).getColumnFamilies()
.stream()
bigtableAdmin.getTable(tableId).getColumnFamilies().stream()
.map(ColumnFamily::getId)
.collect(Collectors.toSet())
.equals(columnFamilies)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ public void assertSingleDlqEntry(
h ->
h.key().equals(DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION)
&& Arrays.equals(
h.value(),
exceptionClass.getName().getBytes(StandardCharsets.UTF_8))));
h.value(),
exceptionClass.getName().getBytes(StandardCharsets.UTF_8))));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.curator.shaded.com.google.common.util.concurrent.Futures;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand Down Expand Up @@ -117,8 +115,7 @@ public void testDisabledResourceAutoCreation()
.anyMatch(r -> Arrays.equals(KEY3.getBytes(StandardCharsets.UTF_8), r.key())));
assertConnectorAndAllTasksAreRunning(testId);
// With the column family created.
bigtableAdmin.modifyFamilies(
ModifyColumnFamiliesRequest.of(testId).addFamily(COLUMN_FAMILY2));
bigtableAdmin.modifyFamilies(ModifyColumnFamiliesRequest.of(testId).addFamily(COLUMN_FAMILY2));
connect.kafka().produce(testId, KEY4, serializedValue2);
waitUntilBigtableContainsNumberOfRows(testId, 2);
assertTrue(
Expand Down Expand Up @@ -148,8 +145,7 @@ public void testTableAutoCreationDisabledColumnFamilyAutoCreationEnabled()
assertSingleDlqEntry(dlqTopic, KEY1, value, null);

createTablesAndColumnFamilies(Map.of(testId, Set.of()));
assertTrue(
bigtableAdmin.getTable(testId).getColumnFamilies().isEmpty());
assertTrue(bigtableAdmin.getTable(testId).getColumnFamilies().isEmpty());
connect.kafka().produce(testId, KEY2, value);
waitUntilBigtableTableHasExactSetOfColumnFamilies(testId, Set.of(testId));
waitUntilBigtableContainsNumberOfRows(testId, 1);
Expand Down Expand Up @@ -223,8 +219,7 @@ public void testRowDeletionCreatesTableWhenAutoCreationEnabled() throws Interrup
assertFalse(bigtableAdmin.listTables().contains(testId));
connect.kafka().produce(testId, KEY1, rowDeletionValue);
waitUntilBigtableTableExists(testId);
assertTrue(
bigtableAdmin.getTable(testId).getColumnFamilies().isEmpty());
assertTrue(bigtableAdmin.getTable(testId).getColumnFamilies().isEmpty());

assertSingleDlqEntry(dlqTopic, KEY1, null, null);
assertConnectorAndAllTasksAreRunning(testId);
Expand Down Expand Up @@ -294,10 +289,9 @@ public void testColumnDeletionCreatesTableAndColumnFamilyWhenAutoCreationEnabled
}

/**
* This test checks consequences of design choices described in comments in
* {@link com.google.cloud.kafka.connect.bigtable.mapping.MutationDataBuilder#deleteCells(String,
* ByteString, Range.TimestampRange)} and
* {@link
* This test checks consequences of design choices described in comments in {@link
* com.google.cloud.kafka.connect.bigtable.mapping.MutationDataBuilder#deleteCells(String,
* ByteString, Range.TimestampRange)} and {@link
* com.google.cloud.kafka.connect.bigtable.mapping.MutationDataBuilder#deleteFamily(String)}.
*/
@Test
Expand Down
7 changes: 7 additions & 0 deletions kafka-connect-bigtable-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<surefire.version>3.5.2</surefire.version>
<failsafe.version>3.5.2</failsafe.version>
<maven-jar.version>3.4.2</maven-jar.version>
<maven-build-helper.version>3.3.0</maven-build-helper.version>

<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
Expand Down Expand Up @@ -63,6 +64,9 @@
<includes>
<include>.gitignore</include>
</includes>
<excludes>
<exclude>third_party</exclude>
</excludes>
<trimTrailingWhitespace/>
<endWithNewline/>
<indent>
Expand All @@ -72,6 +76,9 @@
</format>
</formats>
<java>
<excludes>
<exclude>third_party</exclude>
</excludes>
<googleJavaFormat>
<version>${google.java.format.version}</version>
<style>GOOGLE</style>
Expand Down
12 changes: 7 additions & 5 deletions kafka-connect-bigtable-sink/sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.3.0</version>
<version>${maven-build-helper.version}</version>
<executions>
<execution>
<id>add-source</id>
Expand All @@ -91,7 +91,7 @@
<sources>
<!-- We need to put third-party code inside the `third_party` directory
for licensing compliance, including them in the project here. -->
<source>./third_party/apache_hbase_common/src/main/java/com/google/cloud/kafka/connect/bigtable/utils/ByteUtils.java</source>
<source>./third_party/apache_hbase_common/src/main/java</source>
</sources>
</configuration>
</execution>
Expand All @@ -106,12 +106,14 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
</manifest>
</archive>
</configuration>
<executions>
<execution>
Expand Down
18 changes: 0 additions & 18 deletions kafka-connect-bigtable-sink/sink/src/main/assembly/package.xml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings;
import com.google.cloud.kafka.connect.bigtable.version.PackageMetadata;
import com.google.cloud.kafka.connect.bigtable.wrappers.BigtableTableAdminClientWrapper;
import com.google.cloud.kafka.connect.bigtable.wrappers.BigtableTableAdminClientInterface;
import com.google.cloud.kafka.connect.bigtable.wrappers.BigtableTableAdminClientWrapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -440,8 +440,8 @@ public InsertMode getInsertMode() {
}

/**
* @return {@link BigtableTableAdminClientInterface} connected to a Cloud Bigtable instance configured as
* described in {@link BigtableSinkConfig#getDefinition()}.
* @return {@link BigtableTableAdminClientInterface} connected to a Cloud Bigtable instance
* configured as described in {@link BigtableSinkConfig#getDefinition()}.
*/
public BigtableTableAdminClientInterface getBigtableAdminClient() {
Duration totalTimeout = getTotalRetryTimeout();
Expand Down Expand Up @@ -497,7 +497,8 @@ BigtableTableAdminClientInterface getBigtableAdminClient(
StatusCode.Code.FAILED_PRECONDITION)));

try {
return new BigtableTableAdminClientWrapper(BigtableTableAdminClient.create(adminSettingsBuilder.build()));
return new BigtableTableAdminClientWrapper(
BigtableTableAdminClient.create(adminSettingsBuilder.build()));
} catch (IOException e) {
throw new RetriableException(e);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.bigtable.data.v2.models.Range;
import com.google.cloud.kafka.connect.bigtable.config.ConfigInterpolation;
import com.google.cloud.kafka.connect.bigtable.config.NullValueMode;
import com.google.cloud.kafka.connect.bigtable.utils.ByteUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.io.IOException;
Expand Down
Loading