Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
7a21043
key source config
brandtnewton Jan 8, 2026
27821c9
test fixes
brandtnewton Jan 8, 2026
5cf5c99
Merge branch 'kafka/it-refac' into kafka/key-source
brandtnewton Jan 8, 2026
498597e
imports
brandtnewton Jan 8, 2026
3bedb96
fixed config
brandtnewton Jan 8, 2026
2bb369d
tests
brandtnewton Jan 12, 2026
7ee8b2d
cleaner test assertions
brandtnewton Jan 12, 2026
7051753
adding IT
brandtnewton Jan 12, 2026
8b5caa8
docs: fix typo
brandtnewton Jan 12, 2026
fd54851
Merge branch 'GoogleCloudPlatform:main' into kafka/key-source
brandtnewton Jan 12, 2026
8329db5
Merge branch 'main' of github.com:brandtnewton/cloud-bigtable-ecosystem
brandtnewton Jan 12, 2026
9921427
adding tests
brandtnewton Jan 14, 2026
b4cc9d4
Merge branch 'kafka/key-source' of github.com:brandtnewton/cloud-bigt…
brandtnewton Jan 14, 2026
ea094fb
json schema with value based key test
brandtnewton Jan 14, 2026
b573a42
more tests
brandtnewton Jan 14, 2026
15f99fd
Merge branch 'main' into kafka/key-source
brandtnewton Jan 14, 2026
46ffc97
Merge branch 'kafka/key-source' of github.com:brandtnewton/cloud-bigt…
brandtnewton Jan 14, 2026
06e69ae
removed debug logs
brandtnewton Jan 14, 2026
2e0e841
docs
brandtnewton Jan 14, 2026
5c8c2df
pom fix
brandtnewton Jan 14, 2026
36e1d3c
added schemaless json test
brandtnewton Jan 14, 2026
a426063
config clean up
brandtnewton Jan 14, 2026
c7c3a8f
feat: flatten array element SMT
brandtnewton Jan 15, 2026
3a6b2d9
adding config
brandtnewton Jan 16, 2026
ad67c53
smt test
brandtnewton Jan 16, 2026
6d92880
flatten array works
brandtnewton Jan 16, 2026
d00bfa1
fixed smt test
brandtnewton Jan 16, 2026
b17caee
imports
brandtnewton Jan 16, 2026
9ae60a3
removing key source
brandtnewton Jan 16, 2026
1768237
removed key source
brandtnewton Jan 16, 2026
76dafca
Merge branch 'kafka/root-array' into kafka/root-array-to-cf
brandtnewton Jan 16, 2026
6c1c9e0
tdd
brandtnewton Jan 16, 2026
bee1b57
removed old test
brandtnewton Jan 16, 2026
3c9cb72
Merge branch 'kafka/root-array' into kafka/root-array-to-cf
brandtnewton Jan 16, 2026
5d905fd
unit test passes
brandtnewton Jan 16, 2026
be626b7
clearing root level array
brandtnewton Jan 20, 2026
5e90739
cleaner tests
brandtnewton Jan 20, 2026
50cc8d4
docs
brandtnewton Jan 20, 2026
eb34ec7
Merge branch 'kafka/root-array' into kafka/root-array-to-cf
brandtnewton Jan 20, 2026
f7e2e37
test passes
brandtnewton Jan 20, 2026
9808203
fixed products test
brandtnewton Jan 20, 2026
78af59b
tests
brandtnewton Jan 20, 2026
db63b52
Merge branch 'kafka/root-array' into kafka/root-array-to-cf
brandtnewton Jan 20, 2026
958f3a2
fixed null element error
brandtnewton Jan 20, 2026
698090f
Merge branch 'kafka/root-array' into kafka/root-array-to-cf
brandtnewton Jan 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka-connect-bigtable-sink/.gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
target
.idea
*.iml
dependency-reduced-pom.xml
88 changes: 87 additions & 1 deletion kafka-connect-bigtable-sink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
This [Kafka Connect Sink](https://kafka.apache.org/documentation/#connect) is
for writing Kafka data to the Google Bigtable database.
This was designed to stream data into Bigtable with as little
latency as possible.
latency as possible. This project also includes some [custom SMTs](#SMT), to
conveniently prepare your data for Bigtable without any extra plugins.

## Features

Expand All @@ -13,6 +14,7 @@ latency as possible.
* Deletes
* At least once delivery
* Dead Letter Queue
* Bundled SMTs for convenience

### Flexible Key Mapping

Expand All @@ -31,6 +33,18 @@ type:
complicated delimiters, and string constants are required in your Row Key,
consider configuring an SMT to add relevant fields to the Kafka Record key.

#### Using Message Values for Row Keys

If you need to use fields from the message value, rather than the message key, use the `org.apache.kafka.connect.transforms.ValueToKey` SMT to map the value onto the key:

```properties
transforms=createKey
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=orderId,userId
row.key.definition=userId,orderId
row.key.delimiter=#
```

### Dead Letter Queue

A DLQ can be enabled by setting `errors.tolerance` to `all` and by
Expand All @@ -52,6 +66,78 @@ optimizing configs for latency will reduce throughput and efficiency.
When `value.null.mode` is set to `delete`, Kafka messages with a null value will
result in the corresponding row being deleted.

## SMT

This project includes SMTs that may be useful for preparing your data for Bigtable.

### Flatten Array Element

This SMT is used to flatten nested array fields that can be generated by some serialization libraries.

#### Configuration

`array.field`

The name of the root level array field. Note: this field is not the direct parent of the array, see the example below.

* Type: string
* Default:
* Valid Values: non-empty string
* Importance: high

`array.inner.wrapper`

The name of the field wrapping the actual array field.

* Type: string
* Default:
* Valid Values: non-empty string
* Importance: high

`array.element.wrapper`

The name of the field wrapping individual elements within the array.

* Type: string
* Default:
* Valid Values: non-empty string
* Importance: high

#### Example

Given the following input message value, with `array.field="products"` `array.inner.wrapper="list"` `array.element.wrapper="element"`

```json
{
"orderId": "order1",
"products": {
"list": [
{
"element": {
"name": "Ball",
"value": "24"
}
}
]
}
}
```

The resulting output would be:

```json
{
"orderId": "order1",
"products": [
{
"name": "Ball",
"value": "24"
}
]
}
}
```

## Configuration

See [config/](./config/bigtable-kafka-sink-connector.properties) for a sample
Expand Down
6 changes: 6 additions & 0 deletions kafka-connect-bigtable-sink/doc/tests.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,9 @@ cbt -project "$PROJECT" -instance "$INSTANCE" ls | xargs -P 0 -I {} cbt -project
```bash
mvn clean verify -DskipUnitTests
```

#### To run a specific integration test

```bash
mvn clean verify -DskipUnitTests -Dit.test=InsertUpsertIT#testUpsert
```
2 changes: 1 addition & 1 deletion kafka-connect-bigtable-sink/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<integration.test.plugin.path>${project.basedir}/integration_test_plugins</integration.test.plugin.path>
<google.sink.package.path>${project.basedir}/../sink/target/sink-${project.version}-package</google.sink.package.path>
<google.sink.package.path>${project.basedir}/../sink/target/</google.sink.package.path>
<google.sink.package.plugin.dir>${integration.test.plugin.path}/google-sink</google.sink.package.plugin.dir>
</properties>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@
import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest;
import com.google.cloud.bigtable.admin.v2.models.Table;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.models.Filters;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.TableId;
import com.google.cloud.kafka.connect.bigtable.wrappers.BigtableTableAdminClientInterface;
import com.google.common.util.concurrent.Futures;
import com.google.protobuf.ByteString;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Supplier;
Expand Down Expand Up @@ -94,7 +99,7 @@ public void createTablesAndColumnFamilies(Map<String, Set<String>> tablesAndColu
public Map<ByteString, Row> readAllRows(BigtableDataClient bigtable, String table) {
Integer numRecords = null;
try {
Query query = Query.create(table);
Query query = Query.create(TableId.of(table)).filter(Filters.FILTERS.limit().cellsPerColumn(1));
Map<ByteString, Row> result =
bigtable.readRows(query).stream().collect(Collectors.toMap(Row::getKey, r -> r));
numRecords = result.size();
Expand All @@ -106,6 +111,17 @@ public Map<ByteString, Row> readAllRows(BigtableDataClient bigtable, String tabl
}
}

public String[] readAllRowKeys(BigtableDataClient bigtable, String table) {
try {
Query query = Query.create(table);
String[] result =
bigtable.readRows(query).stream().map(r -> r.getKey().toString(StandardCharsets.UTF_8)).toList().toArray(String[]::new);
return result;
} catch (Throwable t) {
throw t;
}
}

public long cellCount(Map<ByteString, Row> rows) {
return rows.values().stream().mapToLong(r -> r.getCells().size()).sum();
}
Expand All @@ -119,6 +135,18 @@ public void waitUntilBigtableContainsNumberOfRows(String tableId, long numberOfR
"Records not consumed in time.");
}

public void waitUntilBigtableWriteTime(String tableId, Instant time)
throws InterruptedException {
long start = System.currentTimeMillis();
waitForCondition(
testConditionIgnoringTransientErrors(
() -> readAllRows(bigtableData, tableId).values().stream().anyMatch(r -> r.getCells().stream().anyMatch(c -> c.getTimestamp() >= time.toEpochMilli() * 1000))),
DEFAULT_BIGTABLE_RETRY_TIMEOUT_MILLIS,
"Records not consumed in time.");
long elapsed = System.currentTimeMillis() - start;
System.out.printf("Bigtable rows found in table %s after %dms\n", tableId, elapsed);
}

public void waitUntilBigtableContainsNumberOfCells(String tableId, long numberOfCells)
throws InterruptedException {
waitForCondition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ private String startConnector(Map<String, String> configProps, Set<String> topic
if (topicNameSuffixes.isEmpty()) {
configProps.put(SinkConnectorConfig.TOPICS_CONFIG, id);
connect.kafka().createTopic(id, numTasks);
logger.info("created topic: " + id);
} else {
configProps.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, id + ".*");
for (String suffix : topicNameSuffixes) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package com.google.cloud.kafka.connect.bigtable.integration;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.cloud.kafka.connect.bigtable.config.BigtableErrorMode;
import com.google.cloud.kafka.connect.bigtable.config.InsertMode;
import com.google.cloud.kafka.connect.bigtable.transformations.FlattenArrayElement;
import com.google.protobuf.ByteString;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ExecutionException;

import static com.google.cloud.kafka.connect.bigtable.config.BigtableSinkConfig.*;
import static org.apache.kafka.connect.runtime.WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

@RunWith(JUnit4.class)

public class FlattenArrayElementIT extends BaseKafkaConnectBigtableIT {
private static final String KEY1 = "key1";

@Test
public void testFlattenArrayElementSmt() throws InterruptedException, ExecutionException, JsonProcessingException {
Map<String, String> props = baseConnectorProps();
props.put(INSERT_MODE_CONFIG, InsertMode.UPSERT.name());
props.put("transforms", "flattenElements");
props.put("transforms.flattenElements.type", FlattenArrayElement.class.getName());
props.put("transforms.flattenElements." + FlattenArrayElement.ARRAY_FIELD_NAME, "products");
props.put("transforms.flattenElements." + FlattenArrayElement.ARRAY_INNER_WRAPPER_FIELD_NAME, "list");
props.put("transforms.flattenElements." + FlattenArrayElement.ARRAY_ELEMENT_WRAPPER_FIELD_NAME, "element");
props.put(DEFAULT_COLUMN_FAMILY_CONFIG, "cf");
props.put(ERROR_MODE_CONFIG, BigtableErrorMode.FAIL.name());
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());

String testId = startSingleTopicConnector(props);
createTablesAndColumnFamilies(Map.of(testId, Set.of(testId, "cf", "products")));

Schema productSchema = SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
.field("id", Schema.STRING_SCHEMA)
.field("quantity", Schema.INT32_SCHEMA)
.build();

Schema elementSchema = SchemaBuilder.struct().field("element", productSchema).build();

Schema schema = SchemaBuilder.struct().optional()
.field("orderId", Schema.STRING_SCHEMA)
.field("userId", Schema.STRING_SCHEMA)
.field("products",
SchemaBuilder.struct().field("list", SchemaBuilder.array(elementSchema)).build()
)
.build();

JsonConverter converter = new JsonConverter();
converter.configure(Collections.singletonMap("schemas.enable", "true"), false);

Struct productElement1 = new Struct(elementSchema).put("element", new Struct(productSchema)
.put("name", "Ball")
.put("id", "PROD-123")
.put("quantity", 5)
);
Struct productElement2 = new Struct(elementSchema).put("element", new Struct(productSchema)
.put("name", "Car")
.put("id", "PROD-456")
.put("quantity", 1)
);
Struct productElement3 = new Struct(elementSchema).put("element", new Struct(productSchema)
.put("name", "Tambourine")
.put("id", "PROD-789")
.put("quantity", 2)
);

List<Struct> productList = Arrays.stream(new Struct[]{productElement1, productElement2, productElement3}).toList();

Struct productsWrapper = new Struct(schema.field("products").schema())
.put("list", productList);

Struct value = new Struct(schema)
.put("orderId", "ORD-999")
.put("userId", "USER-42")
.put("products", productsWrapper);


byte[] schemaAsJson = converter.fromConnectData(testId, schema, value);
System.out.println(new String(schemaAsJson));

connect.kafka().produce(testId, KEY1, new String(schemaAsJson));

waitUntilBigtableContainsNumberOfRows(testId, 1);
Map<ByteString, Row> rows = readAllRows(bigtableData, testId);
ByteString key = ByteString.copyFrom(KEY1.getBytes(StandardCharsets.UTF_8));
Row row1 = rows.get(key);
assertNotNull(row1);
assertEquals(3, row1.getCells().size());

List<RowCell> orderIdCells = row1.getCells("cf", "orderId");
assertEquals(1, orderIdCells.size());
assertEquals("ORD-999", orderIdCells.get(0).getValue().toString(StandardCharsets.UTF_8));

List<RowCell> userIdCells = row1.getCells("cf", "userId");
assertEquals(1, userIdCells.size());
assertEquals("USER-42", userIdCells.get(0).getValue().toString(StandardCharsets.UTF_8));

List<RowCell> productCells = row1.getCells("cf", "products");
assertEquals(1, productCells.size());
String rawProductsJson = productCells.get(0).getValue().toString(StandardCharsets.UTF_8);

ObjectMapper mapper = new ObjectMapper();
ArrayNode productsJson = (ArrayNode) mapper.readTree(rawProductsJson);
assertEquals(3, productsJson.size());

// product 1
assertEquals("Ball", productsJson.get(0).get("name").asText());
assertEquals("PROD-123", productsJson.get(0).get("id").asText());
assertEquals(5, productsJson.get(0).get("quantity").asInt());

// product 2
assertEquals("Car", productsJson.get(1).get("name").asText());
assertEquals("PROD-456", productsJson.get(1).get("id").asText());
assertEquals(1, productsJson.get(1).get("quantity").asInt());

// product 3
assertEquals("Tambourine", productsJson.get(2).get("name").asText());
assertEquals("PROD-789", productsJson.get(2).get("id").asText());
assertEquals(2, productsJson.get(2).get("quantity").asInt());
}
}
Loading