diff --git a/README.md b/README.md index dd992d2c..751724ed 100644 --- a/README.md +++ b/README.md @@ -267,7 +267,7 @@ Not all methods have been implemented. Please check [store types method support ##### keyValueStore Using defaults, for a state store named "my-kv-store" following CQL Schema applies: ```sql -CREATE TABLE IF NOT EXISTS my_kv_store_kstreams_store ( +CREATE TABLE IF NOT EXISTS my_kv_kstreams_store ( partition int, key blob, time timestamp, @@ -279,7 +279,7 @@ CREATE TABLE IF NOT EXISTS my_kv_store_kstreams_store ( ##### globalKeyValueStore Using defaults, for a state store named "global-kv-store" following CQL Schema applies: ```sql -CREATE TABLE IF NOT EXISTS global_kv_store_kstreams_store ( +CREATE TABLE IF NOT EXISTS global_kv_kstreams_store ( key blob, time timestamp, value blob, @@ -287,6 +287,20 @@ CREATE TABLE IF NOT EXISTS global_kv_store_kstreams_store ( ) WITH compaction = { 'class' : 'LeveledCompactionStrategy' } ``` +##### windowStore +Using defaults, for a state store named "global-kv-store" following CQL Schema applies: +```sql +CREATE TABLE IF NOT EXISTS some_window_kstreams_store ( + partition int, + start_time bigint, + end_time bigint, + key blob, + time timestamp, + value blob, + PRIMARY KEY ((partition, start_time), end_time, key) +) WITH compaction = { 'class' : 'LeveledCompactionStrategy' } +``` + #### Feat: Cassandra table with default TTL 💡 **Tip:** Cassandra has a table option `default_time_to_live` (default expiration time (“TTL”) in seconds for a table) which can be useful for certain use cases where data (state) can or should expire. diff --git a/kafka-streams-cassandra-state-store/src/intTest/java/dev/thriving/oss/kafka/streams/cassandra/state/store/StreamToStreamJoinIntegrationTest.java b/kafka-streams-cassandra-state-store/src/intTest/java/dev/thriving/oss/kafka/streams/cassandra/state/store/StreamToStreamJoinIntegrationTest.java new file mode 100644 index 00000000..2b67d9dc --- /dev/null +++ b/kafka-streams-cassandra-state-store/src/intTest/java/dev/thriving/oss/kafka/streams/cassandra/state/store/StreamToStreamJoinIntegrationTest.java @@ -0,0 +1,185 @@ +/* + * Copyright Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.thriving.oss.kafka.streams.cassandra.state.store; + +import com.datastax.oss.driver.api.core.CqlSession; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.StreamJoined; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.rnorth.ducttape.unreliables.Unreliables; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * End-to-end integration test that demonstrates how to perform a join between two KStreams. + * + * Note: This example uses lambda expressions and thus works with Java 8+ only. + */ +public class StreamToStreamJoinIntegrationTest extends AbstractIntegrationTest { + + private static final String adImpressionsTopic = "adImpressions"; + private static final String adClicksTopic = "adClicks"; + private static final String outputTopic = "output-topic"; + + @Test + public void shouldJoinTwoStreams() throws ExecutionException, InterruptedException, TimeoutException { + // Input 1: Ad impressions + final List> inputAdImpressions = Arrays.asList( + new KeyValue<>("car-advertisement", "shown"), + new KeyValue<>("newspaper-advertisement", "shown"), + new KeyValue<>("gadget-advertisement", "shown") + ); + + // Input 2: Ad clicks + final List> inputAdClicks = Arrays.asList( + new KeyValue<>("newspaper-advertisement", "clicked"), + new KeyValue<>("gadget-advertisement", "clicked"), + new KeyValue<>("newspaper-advertisement", "clicked") + ); + + final List> expectedResults = Arrays.asList( + new KeyValue<>("car-advertisement", "shown/not-clicked-yet"), + new KeyValue<>("newspaper-advertisement", "shown/not-clicked-yet"), + new KeyValue<>("gadget-advertisement", "shown/not-clicked-yet"), + new KeyValue<>("newspaper-advertisement", "shown/clicked"), + new KeyValue<>("gadget-advertisement", "shown/clicked"), + new KeyValue<>("newspaper-advertisement", "shown/clicked") + ); + + // configure and start the processor topology. + final Serde stringSerde = Serdes.String(); + final Properties props = getStreamsProperties(); + + // when + try ( + final AdminClient adminClient = initAdminClient(); + final KafkaProducer producer = initProducer(stringSerde, stringSerde); + final KafkaConsumer consumer = initConsumer(stringSerde, stringSerde); + final CqlSession session = initSession(); + final KafkaStreams streams = initStreams(props, session) + ) { + // setup input and output topics. + Collection topics = Arrays.asList( + new NewTopic(adImpressionsTopic, 6, (short) 1), + new NewTopic(adClicksTopic, 6, (short) 1), + new NewTopic(outputTopic, 3, (short) 1) + ); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + + consumer.subscribe(Collections.singletonList(outputTopic)); + + // start streams. + streams.start(); + + // produce some input data to the input topics. + inputAdImpressions.forEach(it -> { + try { + producer.send(new ProducerRecord<>(adImpressionsTopic, it.key, it.value)).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + inputAdClicks.forEach(it -> { + try { + producer.send(new ProducerRecord<>(adClicksTopic, it.key, it.value)).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + + // consume and collect streams output + final List> results = new ArrayList<>(); + Unreliables.retryUntilTrue(600, TimeUnit.SECONDS, () -> { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + records.iterator().forEachRemaining(record -> results.add(KeyValue.pair(record.key(), record.value()))); + + return results.size() >= expectedResults.size(); + } + ); + + // then verify the application's output data. + assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults); + } + } + + // note: adapted from https://github.com/confluentinc/kafka-streams-examples/blob/v7.5.0-148/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java + @NotNull + private KafkaStreams initStreams(Properties streamsConfiguration, CqlSession session) { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream alerts = builder.stream(adImpressionsTopic); + final KStream incidents = builder.stream(adClicksTopic); + + // In this example, we opt to perform an OUTER JOIN between the two streams. We picked this + // join type to show how the Streams API will send further join updates downstream whenever, + // for the same join key (e.g. "newspaper-advertisement"), we receive an update from either of + // the two joined streams during the defined join window. + Duration joinTimeDifference = Duration.ofSeconds(5); + Duration windowDuration = joinTimeDifference.multipliedBy(2); + long DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD = 24 * 60 * 60 * 1000L; + long retentionPeriod = DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD; + final KStream impressionsAndClicks = alerts.outerJoin( + incidents, + (impressionValue, clickValue) -> + (clickValue == null)? impressionValue + "/not-clicked-yet": impressionValue + "/" + clickValue, + // KStream-KStream joins are always windowed joins, hence we must provide a join window. + JoinWindows.of(joinTimeDifference), + // In this specific example, we don't need to define join serdes explicitly because the key, left value, and + // right value are all of type String, which matches our default serdes configured for the application. However, + // we want to showcase the use of `StreamJoined.with(...)` in case your code needs a different type setup. + StreamJoined.with( + Serdes.String(), /* key */ + Serdes.String(), /* left value */ + Serdes.String() /* right value */ + ) + .withThisStoreSupplier( + CassandraStores.builder(session, "store-this") + .withKeyspace(CASSANDRA_KEYSPACE) + .windowBytesStore(retentionPeriod, -1, windowDuration.toMillis(), true) + ) + .withOtherStoreSupplier( + CassandraStores.builder(session, "store-other") + .withKeyspace(CASSANDRA_KEYSPACE) + .windowBytesStore(retentionPeriod, -1, windowDuration.toMillis(), true) + ) + .withLoggingDisabled() + ); + + // Write the results to the output topic. + impressionsAndClicks.to(outputTopic); + + return new KafkaStreams(builder.build(), streamsConfiguration); + } +} diff --git a/kafka-streams-cassandra-state-store/src/intTest/java/dev/thriving/oss/kafka/streams/cassandra/state/store/ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java b/kafka-streams-cassandra-state-store/src/intTest/java/dev/thriving/oss/kafka/streams/cassandra/state/store/ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java new file mode 100644 index 00000000..0b65f4b0 --- /dev/null +++ b/kafka-streams-cassandra-state-store/src/intTest/java/dev/thriving/oss/kafka/streams/cassandra/state/store/ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java @@ -0,0 +1,156 @@ +/* + * Copyright Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.thriving.oss.kafka.streams.cassandra.state.store; + +import com.datastax.oss.driver.api.core.CqlSession; +import dev.thriving.oss.kafka.streams.cassandra.state.store.utils.IntegrationTestUtils; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.ReadOnlyWindowStore; +import org.apache.kafka.streams.state.WindowStore; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType; + +/** + * Demonstrates how to validate an application's expected state through interactive queries. + *

+ * Note: This example uses lambda expressions and thus works with Java 8+ only. + */ +public class ValidateStateWithInteractiveQueriesLambdaIntegrationTest extends AbstractIntegrationTest { + + private static final String inputTopic = "inputTopic"; + private static final String STORE_NAME_MAX = "max-store"; + private static final String STORE_NAME_MAX_WINDOW = "max-window-store"; + + @Test + public void shouldCalculateMaxClicksPerUser() throws ExecutionException, InterruptedException, TimeoutException { + // input: A user may be listed multiple times. + final List> inputUserClicks = Arrays.asList( + new KeyValue<>("alice", 13L), + new KeyValue<>("bob", 4L), + new KeyValue<>("chao", 25L), + new KeyValue<>("bob", 19L), + new KeyValue<>("chao", 56L), + new KeyValue<>("alice", 78L), + new KeyValue<>("alice", 40L), + new KeyValue<>("bob", 3L) + ); + + final Map expectedMaxClicksPerUser = new HashMap() { + { + put("alice", 78L); + put("bob", 19L); + put("chao", 56L); + } + }; + + // configure and start the processor topology. + final Serde stringSerde = Serdes.String(); + final Serde longSerde = Serdes.Long(); + final Properties props = getStreamsProperties(); + + // The commit interval for flushing records to state stores and downstream must be lower than + // this integration test's timeout (30 secs) to ensure we observe the expected processing results. + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2 * 1000); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); + + // when + try ( + final AdminClient adminClient = initAdminClient(); + final KafkaProducer producer = initProducer(stringSerde, longSerde); + final CqlSession session = initSession(); + final KafkaStreams streams = initStreams(props, session) + ) { + // setup input and output topics. + Collection topics = List.of( + new NewTopic(inputTopic, 6, (short) 1) + ); + adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS); + + // start streams. + streams.start(); + + // produce some input data to the input topics. + inputUserClicks.forEach(it -> { + try { + producer.send(new ProducerRecord<>(inputTopic, it.key, it.value)).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + + // then verify the application's output data. + final ReadOnlyKeyValueStore keyValueStore = + streams.store(fromNameAndType(STORE_NAME_MAX, QueryableStoreTypes.keyValueStore())); + + final ReadOnlyWindowStore windowStore = + streams.store(fromNameAndType(STORE_NAME_MAX_WINDOW, QueryableStoreTypes.windowStore())); + + IntegrationTestUtils.assertThatKeyValueStoreContains(keyValueStore, expectedMaxClicksPerUser); + IntegrationTestUtils.assertThatOldestWindowContains(windowStore, expectedMaxClicksPerUser); + } + } + + // note: adapted from https://github.com/confluentinc/kafka-streams-examples/blob/v7.5.0-148/src/test/java/io/confluent/examples/streams/ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java + @NotNull + private KafkaStreams initStreams(Properties streamsConfiguration, CqlSession session) { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream stream = builder.stream(inputTopic); + + // rolling MAX() aggregation + final String maxStore = STORE_NAME_MAX; + stream.groupByKey().aggregate( + () -> Long.MIN_VALUE, + (aggKey, value, aggregate) -> Math.max(value, aggregate), + Materialized.as(maxStore) + ); + + // windowed MAX() aggregation + final String maxWindowStore = STORE_NAME_MAX_WINDOW; + stream.groupByKey() + .windowedBy(TimeWindows.of(Duration.ofMinutes(1L)).grace(Duration.ZERO)) + .aggregate( + () -> Long.MIN_VALUE, + (aggKey, value, aggregate) -> Math.max(value, aggregate), + Materialized.>as(maxWindowStore).withRetention(Duration.ofMinutes(5L))); + + return new KafkaStreams(builder.build(), streamsConfiguration); + } +} diff --git a/kafka-streams-cassandra-state-store/src/intTest/java/dev/thriving/oss/kafka/streams/cassandra/state/store/utils/IntegrationTestUtils.java b/kafka-streams-cassandra-state-store/src/intTest/java/dev/thriving/oss/kafka/streams/cassandra/state/store/utils/IntegrationTestUtils.java new file mode 100644 index 00000000..fe7be55c --- /dev/null +++ b/kafka-streams-cassandra-state-store/src/intTest/java/dev/thriving/oss/kafka/streams/cassandra/state/store/utils/IntegrationTestUtils.java @@ -0,0 +1,362 @@ +/* + * Copyright Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.thriving.oss.kafka.streams.cassandra.state.store.utils; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.ReadOnlyWindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.rnorth.ducttape.unreliables.Unreliables; + +import java.time.Duration; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Utility functions to make integration testing more convenient. + */ +public class IntegrationTestUtils { + + private static final int UNLIMITED_MESSAGES = -1; + private static final long DEFAULT_TIMEOUT = 30 * 1000L; + + /** + * Returns up to `maxMessages` message-values from the topic. + * + * @param topic Kafka topic to read messages from + * @param consumerConfig Kafka consumer configuration + * @param maxMessages Maximum number of messages to read via the consumer. + * @return The values retrieved via the consumer. + */ + public static List readValues(final String topic, final Properties consumerConfig, final int maxMessages) { + final List> kvs = readKeyValues(topic, consumerConfig, maxMessages); + return kvs.stream().map(kv -> kv.value).collect(Collectors.toList()); + } + + /** + * Returns as many messages as possible from the topic until a (currently hardcoded) timeout is + * reached. + * + * @param topic Kafka topic to read messages from + * @param consumerConfig Kafka consumer configuration + * @return The KeyValue elements retrieved via the consumer. + */ + public static List> readKeyValues(final String topic, final Properties consumerConfig) { + return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES); + } + + /** + * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from are + * already configured in the consumer). + * + * @param topic Kafka topic to read messages from + * @param consumerConfig Kafka consumer configuration + * @param maxMessages Maximum number of messages to read via the consumer + * @return The KeyValue elements retrieved via the consumer + */ + public static List> readKeyValues(final String topic, final Properties consumerConfig, final int maxMessages) { + final KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig); + consumer.subscribe(Collections.singletonList(topic)); + final Duration pollInterval = Duration.ofMillis(100L); + final long maxTotalPollTimeMs = 10000L; + long totalPollTimeMs = 0; + final List> consumedValues = new ArrayList<>(); + while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) { + final long pollStart = System.currentTimeMillis(); + final ConsumerRecords records = consumer.poll(pollInterval); + final long pollEnd = System.currentTimeMillis(); + totalPollTimeMs += (pollEnd - pollStart); + for (final ConsumerRecord record : records) { + consumedValues.add(new KeyValue<>(record.key(), record.value())); + } + } + consumer.close(); + return consumedValues; + } + + private static boolean continueConsuming(final int messagesConsumed, final int maxMessages) { + return maxMessages <= 0 || messagesConsumed < maxMessages; + } + + /** + * Write a collection of KeyValueWithTimestamp pairs, with explicitly defined timestamps, to Kafka + * and wait until the writes are acknowledged. + * + * @param topic Kafka topic to write the data records to + * @param records Data records to write to Kafka + * @param producerConfig Kafka producer configuration + * @param Key type of the data records + * @param Value type of the data records + */ + public static void produceKeyValuesWithTimestampsSynchronously( + final String topic, + final Collection> records, + final Properties producerConfig) + throws ExecutionException, InterruptedException { + final Producer producer = new KafkaProducer<>(producerConfig); + for (final KeyValueWithTimestamp record : records) { + final Future f = producer.send( + new ProducerRecord<>(topic, null, record.timestamp, record.key, record.value)); + f.get(); + } + producer.flush(); + producer.close(); + } + + /** + * @param topic Kafka topic to write the data records to + * @param records Data records to write to Kafka + * @param producerConfig Kafka producer configuration + * @param Key type of the data records + * @param Value type of the data records + */ + public static void produceKeyValuesSynchronously( + final String topic, + final Collection> records, + final Properties producerConfig) + throws ExecutionException, InterruptedException { + final Collection> keyedRecordsWithTimestamp = + records + .stream() + .map(record -> new KeyValueWithTimestamp<>(record.key, record.value, System.currentTimeMillis())) + .collect(Collectors.toList()); + produceKeyValuesWithTimestampsSynchronously(topic, keyedRecordsWithTimestamp, producerConfig); + } + + public static void produceValuesSynchronously( + final String topic, final Collection records, final Properties producerConfig) + throws ExecutionException, InterruptedException { + final Collection> keyedRecords = + records + .stream() + .map(record -> new KeyValue<>(null, record)) + .collect(Collectors.toList()); + produceKeyValuesSynchronously(topic, keyedRecords, producerConfig); + } + + public static List> waitUntilMinKeyValueRecordsReceived( + final Properties consumerConfig, + final String topic, + final int expectedNumRecords) + throws InterruptedException { + return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); + } + + /** + * Wait until enough data (key-value records) has been consumed. + * + * @param consumerConfig Kafka Consumer configuration + * @param topic Topic to consume from + * @param expectedNumRecords Minimum number of expected records + * @param waitTime Upper bound in waiting time in milliseconds + * @return All the records consumed, or null if no records are consumed + * @throws AssertionError if the given wait time elapses + */ + public static List> waitUntilMinKeyValueRecordsReceived(final Properties consumerConfig, + final String topic, + final int expectedNumRecords, + final long waitTime) throws InterruptedException { + final List> accumData = new ArrayList<>(); + final long startTime = System.currentTimeMillis(); + while (true) { + final List> readData = readKeyValues(topic, consumerConfig); + accumData.addAll(readData); + if (accumData.size() >= expectedNumRecords) + return accumData; + if (System.currentTimeMillis() > startTime + waitTime) + throw new AssertionError("Expected " + expectedNumRecords + + " but received only " + accumData.size() + + " records before timeout " + waitTime + " ms"); + Thread.sleep(Math.min(waitTime, 100L)); + } + } + + public static List waitUntilMinValuesRecordsReceived(final Properties consumerConfig, + final String topic, + final int expectedNumRecords) throws InterruptedException { + + return waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); + } + + /** + * Wait until enough data (value records) has been consumed. + * + * @param consumerConfig Kafka Consumer configuration + * @param topic Topic to consume from + * @param expectedNumRecords Minimum number of expected records + * @param waitTime Upper bound in waiting time in milliseconds + * @return All the records consumed, or null if no records are consumed + * @throws AssertionError if the given wait time elapses + */ + public static List waitUntilMinValuesRecordsReceived(final Properties consumerConfig, + final String topic, + final int expectedNumRecords, + final long waitTime) throws InterruptedException { + final List accumData = new ArrayList<>(); + final long startTime = System.currentTimeMillis(); + while (true) { + final List readData = readValues(topic, consumerConfig, expectedNumRecords); + accumData.addAll(readData); + if (accumData.size() >= expectedNumRecords) + return accumData; + if (System.currentTimeMillis() > startTime + waitTime) + throw new AssertionError("Expected " + expectedNumRecords + + " but received only " + accumData.size() + + " records before timeout " + waitTime + " ms"); + Thread.sleep(Math.min(waitTime, 100L)); + } + } + + /** + * Asserts that the key-value store contains exactly the expected content and nothing more. + * + * @param store the store to be validated + * @param expected the expected contents of the store + * @param the store's key type + * @param the store's value type + */ + public static void assertThatKeyValueStoreContains(final ReadOnlyKeyValueStore store, final Map expected) + throws InterruptedException { + Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> + expected.keySet() + .stream() + .allMatch(k -> expected.get(k).equals(store.get(k)))); + } + + /** + * Asserts that the oldest available window in the window store contains the expected content. + * + * @param store the store to be validated + * @param expected the expected contents of the store + * @param the store's key type + * @param the store's value type + */ + public static void assertThatOldestWindowContains(final ReadOnlyWindowStore store, final Map expected) + throws InterruptedException { + final Instant fromBeginningOfTime = Instant.EPOCH; + final Instant toNowInProcessingTime = Instant.now(); + Unreliables.retryUntilTrue(30, TimeUnit.SECONDS, () -> + expected.keySet().stream().allMatch(k -> { + try (final WindowStoreIterator iterator = store.fetch(k, fromBeginningOfTime, toNowInProcessingTime)) { + if (iterator.hasNext()) { + return expected.get(k).equals(iterator.next().value); + } + return false; + } + })); + } + + /** + * Creates a map entry (for use with {@link IntegrationTestUtils#mkMap(Map.Entry[])}) + * + * @param k The key + * @param v The value + * @param The key type + * @param The value type + * @return An entry + */ + static Map.Entry mkEntry(final K k, final V v) { + return new Map.Entry() { + @Override + public K getKey() { + return k; + } + + @Override + public V getValue() { + return v; + } + + @Override + public V setValue(final V value) { + throw new UnsupportedOperationException(); + } + }; + } + + /** + * Creates a map from a sequence of entries + * + * @param entries The entries to map + * @param The key type + * @param The value type + * @return A map + */ + @SafeVarargs + static Map mkMap(final Map.Entry... entries) { + final Map result = new LinkedHashMap<>(); + for (final Map.Entry entry : entries) { + result.put(entry.getKey(), entry.getValue()); + } + return result; + } + + /** + * A Serializer/Deserializer/Serde implementation for use when you know the data is always null + * @param The type of the stream (you can parameterize this with any type, + * since we throw an exception if you attempt to use it with non-null data) + */ + static class NothingSerde implements Serializer, Deserializer, Serde { + + @Override + public void configure(final Map configuration, final boolean isKey) {} + + @Override + public T deserialize(final String topic, final byte[] bytes) { + if (bytes != null) { + throw new IllegalArgumentException("Expected [" + Arrays.toString(bytes) + "] to be null."); + } else { + return null; + } + } + + @Override + public byte[] serialize(final String topic, final T data) { + if (data != null) { + throw new IllegalArgumentException("Expected [" + data + "] to be null."); + } else { + return null; + } + } + + @Override + public void close() {} + + @Override + public Serializer serializer() { + return this; + } + + @Override + public Deserializer deserializer() { + return this; + } + } +} diff --git a/kafka-streams-cassandra-state-store/src/intTest/java/dev/thriving/oss/kafka/streams/cassandra/state/store/utils/KeyValueWithTimestamp.java b/kafka-streams-cassandra-state-store/src/intTest/java/dev/thriving/oss/kafka/streams/cassandra/state/store/utils/KeyValueWithTimestamp.java new file mode 100644 index 00000000..ff0f8784 --- /dev/null +++ b/kafka-streams-cassandra-state-store/src/intTest/java/dev/thriving/oss/kafka/streams/cassandra/state/store/utils/KeyValueWithTimestamp.java @@ -0,0 +1,36 @@ +/* + * Copyright Confluent Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.thriving.oss.kafka.streams.cassandra.state.store.utils; + +import org.apache.kafka.streams.KeyValue; + +public class KeyValueWithTimestamp extends KeyValue { + + /** + * Timestamp of Kafka message (milliseconds since the epoch). + */ + public final long timestamp; + + public KeyValueWithTimestamp(final K key, final V value, final long timestamp) { + super(key, value); + this.timestamp = timestamp; + } + + public KeyValueWithTimestamp(final K key, final V value) { + this(key, value, System.currentTimeMillis()); + } + +} diff --git a/kafka-streams-cassandra-state-store/src/intTest/resources/logback-test.xml b/kafka-streams-cassandra-state-store/src/intTest/resources/logback-test.xml index 83ef7a1a..703dbefc 100644 --- a/kafka-streams-cassandra-state-store/src/intTest/resources/logback-test.xml +++ b/kafka-streams-cassandra-state-store/src/intTest/resources/logback-test.xml @@ -12,5 +12,6 @@ + diff --git a/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraKeyValueIterator.java b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraKeyValueIterator.java index b9dbd268..bfb84c3d 100644 --- a/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraKeyValueIterator.java +++ b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraKeyValueIterator.java @@ -38,8 +38,8 @@ public boolean hasNext() { @Override public KeyValue next() { Row row = iter.next(); - Bytes key = Bytes.wrap(row.getByteBuffer(0).array()); - ByteBuffer byteBuffer = row.getByteBuffer(1); + Bytes key = Bytes.wrap(row.getByteBuffer("key").array()); + ByteBuffer byteBuffer = row.getByteBuffer("value"); byte[] value = byteBuffer == null ? null : byteBuffer.array(); return KeyValue.pair(key, value); } diff --git a/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraKeyValueStore.java b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraKeyValueStore.java index 1d19fd65..29410710 100644 --- a/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraKeyValueStore.java +++ b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraKeyValueStore.java @@ -26,10 +26,11 @@ public class CassandraKeyValueStore implements KeyValueStore { private final CassandraKeyValueStoreRepository repo; protected StateStoreContext context; protected int partition; - protected Position position = Position.emptyPosition(); + protected final Position position = Position.emptyPosition(); private volatile boolean open = false; - public CassandraKeyValueStore(String name, CassandraKeyValueStoreRepository repo) { + public CassandraKeyValueStore(final String name, + final CassandraKeyValueStoreRepository repo) { this.name = name; this.repo = repo; } diff --git a/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraStores.java b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraStores.java index 51ac0037..e381ae76 100644 --- a/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraStores.java +++ b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraStores.java @@ -3,11 +3,9 @@ import com.datastax.oss.driver.api.core.CqlSession; import dev.thriving.oss.kafka.streams.cassandra.state.store.repo.GlobalCassandraKeyValueStoreRepository; import dev.thriving.oss.kafka.streams.cassandra.state.store.repo.PartitionedCassandraKeyValueStoreRepository; +import dev.thriving.oss.kafka.streams.cassandra.state.store.repo.PartitionedCassandraWindowStoreRepository; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.StoreSupplier; +import org.apache.kafka.streams.state.*; import java.util.Objects; import java.util.function.Function; @@ -269,4 +267,59 @@ private String resolveTableName() { return keyspace != null ? keyspace + "." + resolvedTableName : resolvedTableName; } + public WindowBytesStoreSupplier windowBytesStore( + final long retentionPeriod, + final long segmentIntervalMs, + final long windowSize, + final boolean retainDuplicates + ) { + return new WindowBytesStoreSupplier() { + +// // TODO: would this be useful? +// final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L); + + @Override + public long segmentIntervalMs() { + return segmentIntervalMs; + } + + @Override + public long windowSize() { + return windowSize; + } + + @Override + public boolean retainDuplicates() { + return retainDuplicates; + } + + @Override + public long retentionPeriod() { + return retentionPeriod; + } + + @Override + public String name() { + return name; + } + + @Override + public WindowStore get() { + return new CassandraWindowStore(name, + retentionPeriod, + windowSize, + retainDuplicates, + new PartitionedCassandraWindowStoreRepository( + session, + resolveTableName(), + tableOptions + )); + } + + @Override + public String metricsScope() { + return "cassandra-window"; + } + }; + } } diff --git a/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraWindowStore.java b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraWindowStore.java new file mode 100644 index 00000000..ac088846 --- /dev/null +++ b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraWindowStore.java @@ -0,0 +1,278 @@ +package dev.thriving.oss.kafka.streams.cassandra.state.store; + +import dev.thriving.oss.kafka.streams.cassandra.state.store.repo.CassandraWindowStoreRepository; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.query.*; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.apache.kafka.streams.state.internals.StoreQueryUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Objects; + +public class CassandraWindowStore implements WindowStore { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraWindowStore.class); + public static final int SEQNUM_SIZE = 4; + + private final String name; + private final long retentionPeriod; + private final long windowSize; + private final boolean retainDuplicates; + private final CassandraWindowStoreRepository repo; + protected final Position position; + private StateStoreContext stateStoreContext; + private int partition; + private long seqnum = 0; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + private long lastCleanupTime; + + private volatile boolean open = false; + + public CassandraWindowStore(final String name, + final long retentionPeriod, + final long windowSize, + final boolean retainDuplicates, + final CassandraWindowStoreRepository repo) { + this.name = name; + this.retentionPeriod = retentionPeriod; + this.windowSize = windowSize; + this.retainDuplicates = retainDuplicates; + this.repo = repo; + this.position = Position.emptyPosition(); + } + + @Deprecated + @Override + public void init(final ProcessorContext context, final StateStore root) { + if (context instanceof StateStoreContext) { + init((StateStoreContext) context, root); + } else { + throw new UnsupportedOperationException( + "Use CassandraWindowStore#init(StateStoreContext, StateStore) instead." + ); + } + } + + @Override + public void init(final StateStoreContext context, final StateStore root) { + this.stateStoreContext = context; + this.partition = context.taskId().partition(); + this.lastCleanupTime = 0; + + if (root != null) { + // register the store + context.register( + root, + (RecordBatchingStateRestoreCallback) records -> { } + ); + } + + open = true; + } + + @Override + public void close() { + this.open = false; + } + + @Override + public String name() { + return name; + } + + @Override + public void flush() { + // do-nothing + } + + @Override + public boolean persistent() { + return true; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public Position getPosition() { + return position; + } + + @Override + public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) { + LOG.trace("put {}::{}", key, value); + maybeRemoveExpiredWindows(); + observedStreamTime = Math.max(observedStreamTime, windowStartTimestamp); + + if (windowStartTimestamp <= observedStreamTime - retentionPeriod) { + // expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context)); + LOG.warn("Skipping record for expired segment."); + } else { + if (value != null) { + maybeUpdateSeqnumForDups(); + repo.save(partition, windowStartTimestamp, key, value, seqnum); + StoreQueryUtils.updatePosition(position, stateStoreContext); + } else if (!retainDuplicates) { + // Skip if value is null and duplicates are allowed since this delete is a no-op + repo.delete(partition, windowStartTimestamp, key, seqnum); + } + } + + StoreQueryUtils.updatePosition(position, stateStoreContext); + } + + @Override + public WindowStoreIterator fetch(final Bytes key, final long timeFrom, final long timeTo) { + return fetch(key, timeFrom, timeTo, true); + } + + @Override + public WindowStoreIterator backwardFetch(final Bytes key, final long timeFrom, final long timeTo) { + return fetch(key, timeFrom, timeTo, false); + } + + private WindowStoreIterator fetch(final Bytes key, final long timeFrom, final long timeTo, final boolean forward) { + Objects.requireNonNull(key, "key cannot be null"); + + maybeRemoveExpiredWindows(); + + // add one b/c records expire exactly retentionPeriod ms after created + final long minTime = Math.max(timeFrom, observedStreamTime - retentionPeriod + 1); + + if (timeTo < minTime) { + return new CassandraWindowStoreIteratorProvider(Collections.emptyIterator(), windowSize).valuesIterator(); + } + + return repo.fetch(partition, key, timeFrom, timeTo, forward, windowSize).valuesIterator(); + } + + @Override + public KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long timeFrom, final long timeTo) { + return fetch(keyFrom, keyTo, timeFrom, timeTo, true); + } + + @Override + public KeyValueIterator, byte[]> backwardFetch(final Bytes keyFrom, final Bytes keyTo, final long timeFrom, final long timeTo) { + return fetch(keyFrom, keyTo, timeFrom, timeTo, false); + } + + private KeyValueIterator, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo, final long timeFrom, final long timeTo, final boolean forward) { + maybeRemoveExpiredWindows(); + + if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) { + LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. " + + "This may be due to range arguments set in the wrong order, " + + "or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. " + + "Note that the built-in numerical serdes do not follow this for negative numbers"); + return KeyValueIterators.emptyIterator(); + } + + // add one b/c records expire exactly retentionPeriod ms after created + final long minTime = Math.max(timeFrom, observedStreamTime - retentionPeriod + 1); + + if (timeTo < minTime) { + return KeyValueIterators.emptyIterator(); + } + + return repo.fetch(partition, keyFrom, keyTo, timeFrom, timeTo, forward, windowSize).keyValueIterator(); + } + + @Override + public KeyValueIterator, byte[]> fetchAll(final long timeFrom, final long timeTo) { + return fetchAll(timeFrom, timeTo, true); + } + + @Override + public KeyValueIterator, byte[]> backwardFetchAll(final long timeFrom, final long timeTo) { + return fetchAll(timeFrom, timeTo, false); + } + + private KeyValueIterator, byte[]> fetchAll(final long timeFrom, final long timeTo, final boolean forward) { + maybeRemoveExpiredWindows(); + + // add one b/c records expire exactly retentionPeriod ms after created + final long minTime = Math.max(timeFrom, observedStreamTime - retentionPeriod + 1); + + if (timeTo < minTime) { + return KeyValueIterators.emptyIterator(); + } + + return repo.fetchAll(partition, timeFrom, timeTo, forward); + } + + @Override + public byte[] fetch(final Bytes key, final long windowStartTimestamp) { + Objects.requireNonNull(key, "key cannot be null"); + + maybeRemoveExpiredWindows(); + + if (windowStartTimestamp <= observedStreamTime - retentionPeriod) { + return null; + } + + return repo.get(partition, windowStartTimestamp, key, seqnum); + } + + @Override + public KeyValueIterator, byte[]> all() { + return all(true); + } + + @Override + public KeyValueIterator, byte[]> backwardAll() { + return all(false); + } + + private KeyValueIterator, byte[]> all(final boolean forward) { + maybeRemoveExpiredWindows(); + + final long minTime = observedStreamTime - retentionPeriod; + + return repo.getAllWindowsFrom(partition, minTime, forward); + } + + @Override + public QueryResult query(final Query query, + final PositionBound positionBound, + final QueryConfig config) { + + return StoreQueryUtils.handleBasicQueries( + query, + positionBound, + config, + this, + position, + stateStoreContext + ); + } + + private void maybeRemoveExpiredWindows() { + // exec in a buffered fashion, based on currentTimeMillis & retentionPeriod -> keep last time executed % retentionPeriod + long currentTimeMillis = System.currentTimeMillis(); + long bufferedCleanupTime = currentTimeMillis - (currentTimeMillis % retentionPeriod); + + if (lastCleanupTime < bufferedCleanupTime) { + long minLiveTime = Math.max(0L, observedStreamTime - retentionPeriod + 1); + repo.deleteWindowsOlderThan(partition, minLiveTime); + lastCleanupTime = bufferedCleanupTime; + } + } + + private void maybeUpdateSeqnumForDups() { + if (retainDuplicates) { + seqnum = (seqnum + 1) & 0x7FFFFFFF; + } + } +} diff --git a/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraWindowStoreIteratorProvider.java b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraWindowStoreIteratorProvider.java new file mode 100644 index 00000000..76ea9d5c --- /dev/null +++ b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/CassandraWindowStoreIteratorProvider.java @@ -0,0 +1,127 @@ +package dev.thriving.oss.kafka.streams.cassandra.state.store; + +import com.datastax.oss.driver.api.core.cql.Row; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Instant; +import java.util.Iterator; + +public class CassandraWindowStoreIteratorProvider { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraWindowStoreIteratorProvider.class); + + private final Iterator iter; + private final long windowSize; + + public CassandraWindowStoreIteratorProvider(Iterator iter, + final long windowSize) { + this.iter = iter; + this.windowSize = windowSize; + } + + public WindowStoreIterator valuesIterator() { + return new WrappedWindowStoreIterator(iter); + } + + public KeyValueIterator, byte[]> keyValueIterator() { + return new WrappedKeyValueIterator(iter, windowSize); + } + + private static class WrappedWindowStoreIterator implements WindowStoreIterator { + private final Iterator iter; + + WrappedWindowStoreIterator(final Iterator iter) { + this.iter = iter; + } + + @Override + public Long peekNextKey() { + throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public KeyValue next() { + Row row = iter.next(); + + ByteBuffer byteBuffer = row.getByteBuffer("value"); + byte[] value = byteBuffer == null ? null : byteBuffer.array(); + + Instant timestamp = row.getInstant("time"); + final long timestampMs = timestamp.toEpochMilli(); + return KeyValue.pair(timestampMs, value); + } + + @Override + public void close() { + // do nothing + } + } + + private static class WrappedKeyValueIterator implements KeyValueIterator, byte[]> { + private final Iterator iter; + private final long windowSize; + + WrappedKeyValueIterator(final Iterator iter, + final long windowSize) { + this.iter = iter; + this.windowSize = windowSize; + } + + @Override + public Windowed peekNextKey() { + throw new UnsupportedOperationException("peekNextKey() not supported in " + getClass().getName()); + } + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public KeyValue, byte[]> next() { + Row row = iter.next(); + + Bytes key = Bytes.wrap(row.getByteBuffer("key").array()); + + ByteBuffer byteBuffer = row.getByteBuffer("value"); + byte[] value = byteBuffer == null ? null : byteBuffer.array(); + + long startTime = row.getLong("start_time"); + TimeWindow timeWindow = timeWindowForSize(startTime); + + return KeyValue.pair(new Windowed<>(key, timeWindow), value); + } + + @Override + public void close() { + // do nothing + } + + /** + * Safely construct a time window of the given size, + * taking care of bounding endMs to Long.MAX_VALUE if necessary + */ + private TimeWindow timeWindowForSize(final long startMs) { + long endMs = startMs + windowSize; + + if (endMs < 0) { + LOG.warn("Warning: window end time was truncated to Long.MAX"); + endMs = Long.MAX_VALUE; + } + return new TimeWindow(startMs, endMs); + } + } +} diff --git a/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/KeyValueIterators.java b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/KeyValueIterators.java new file mode 100644 index 00000000..6737f6cc --- /dev/null +++ b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/KeyValueIterators.java @@ -0,0 +1,51 @@ +package dev.thriving.oss.kafka.streams.cassandra.state.store; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.WindowStoreIterator; + +import java.util.NoSuchElementException; + +class KeyValueIterators { + + private static class EmptyKeyValueIterator implements KeyValueIterator { + + @Override + public void close() { + } + + @Override + public K peekNextKey() { + throw new NoSuchElementException(); + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public KeyValue next() { + throw new NoSuchElementException(); + } + + } + + private static class EmptyWindowStoreIterator extends KeyValueIterators.EmptyKeyValueIterator + implements WindowStoreIterator { + } + + private static final KeyValueIterator EMPTY_ITERATOR = new KeyValueIterators.EmptyKeyValueIterator(); + private static final WindowStoreIterator EMPTY_WINDOW_STORE_ITERATOR = new KeyValueIterators.EmptyWindowStoreIterator(); + + + @SuppressWarnings("unchecked") + static KeyValueIterator emptyIterator() { + return (KeyValueIterator) EMPTY_ITERATOR; + } + + @SuppressWarnings("unchecked") + static WindowStoreIterator emptyWindowStoreIterator() { + return (WindowStoreIterator) EMPTY_WINDOW_STORE_ITERATOR; + } +} diff --git a/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/repo/CassandraWindowStoreRepository.java b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/repo/CassandraWindowStoreRepository.java new file mode 100644 index 00000000..49aac417 --- /dev/null +++ b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/repo/CassandraWindowStoreRepository.java @@ -0,0 +1,23 @@ +package dev.thriving.oss.kafka.streams.cassandra.state.store.repo; + +import dev.thriving.oss.kafka.streams.cassandra.state.store.CassandraWindowStoreIteratorProvider; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; + +public interface CassandraWindowStoreRepository { + void save(int partition, long windowStartTimestamp, Bytes key, byte[] value, long seqnum); + + void delete(int partition, long windowStartTimestamp, Bytes key, long seqnum); + + CassandraWindowStoreIteratorProvider fetch(int partition, Bytes key, long timeFrom, long timeTo, boolean forward, long windowSize); + CassandraWindowStoreIteratorProvider fetch(int partition, Bytes keyFrom, Bytes keyTo, long timeFrom, long timeTo, boolean forward, long windowSize); + + KeyValueIterator,byte[]> fetchAll(int partition, long timeFrom, long timeTo, boolean forward); + + byte[] get(int partition, long windowStartTimestamp, Bytes key, long seqnum); + + KeyValueIterator,byte[]> getAllWindowsFrom(int partition, long minTime, boolean forward); + + void deleteWindowsOlderThan(int partition, long minLiveTime); +} diff --git a/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/repo/PartitionedCassandraWindowStoreRepository.java b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/repo/PartitionedCassandraWindowStoreRepository.java new file mode 100644 index 00000000..b60fb2dc --- /dev/null +++ b/kafka-streams-cassandra-state-store/src/main/java/dev/thriving/oss/kafka/streams/cassandra/state/store/repo/PartitionedCassandraWindowStoreRepository.java @@ -0,0 +1,107 @@ +package dev.thriving.oss.kafka.streams.cassandra.state.store.repo; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import dev.thriving.oss.kafka.streams.cassandra.state.store.CassandraWindowStoreIteratorProvider; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.KeyValueIterator; + +import java.nio.ByteBuffer; +import java.time.Instant; + +public class PartitionedCassandraWindowStoreRepository extends AbstractCassandraKeyValueStoreRepository implements CassandraWindowStoreRepository { + + protected PreparedStatement insert; + protected PreparedStatement delete; + protected PreparedStatement select; + protected PreparedStatement selectByPartitionAndKeyAndStartTimeRangeAsc; + + public PartitionedCassandraWindowStoreRepository(CqlSession session, String tableName, String tableOptions) { + super(session, tableName, tableOptions); + } + + @Override + protected void createTable(String tableName, String tableOptions) { + PreparedStatement prepare = session.prepare(""" + CREATE TABLE IF NOT EXISTS %s ( + partition int, + start_time bigint, + key blob, + seq_num bigint, + time timestamp, + value blob, + PRIMARY KEY ((partition, start_time), key, seq_num) + ) %s + """.formatted(tableName, tableOptions.isBlank() ? "" : "WITH " + tableOptions)); + session.execute(prepare.bind()); + } + + @Override + protected void initPreparedStatements(String tableName) { + insert = session.prepare("INSERT INTO " + tableName + " (partition, start_time, key, seq_num, time, value) VALUES (?, ?, ?, ?, ?, ?)"); + delete = session.prepare("DELETE FROM " + tableName + " WHERE partition=? AND start_time=? AND key=? AND seq_num=?"); + select = session.prepare("SELECT value FROM " + tableName + " WHERE partition=? AND start_time=? AND key=? AND seq_num=?"); + + selectByPartitionAndKeyAndStartTimeRangeAsc = session.prepare("SELECT key, value, start_time, time FROM " + tableName + " WHERE partition=? AND start_time>=? AND start_time<=? AND key=? ALLOW FILTERING"); + } + + @Override + public void save(int partition, long windowStartTimestamp, Bytes key, byte[] value, long seqnum) { + BoundStatement prepared = insert.bind(partition, windowStartTimestamp, ByteBuffer.wrap(key.get()), seqnum, Instant.now(), ByteBuffer.wrap(value)); + session.execute(prepared); + } + + @Override + public void delete(int partition, long windowStartTimestamp, Bytes key, long seqnum) { + BoundStatement prepared = delete.bind(partition, windowStartTimestamp, key, seqnum); + session.execute(prepared); + } + + @Override + public CassandraWindowStoreIteratorProvider fetch(int partition, Bytes key, long timeFrom, long timeTo, boolean forward, long windowSize) { + BoundStatement prepared = selectByPartitionAndKeyAndStartTimeRangeAsc.bind(partition, timeFrom, timeTo, ByteBuffer.wrap(key.get())); + session.execute(prepared); + ResultSet rs = session.execute(prepared); + return new CassandraWindowStoreIteratorProvider(rs.iterator(), windowSize); + } + + @Override + public CassandraWindowStoreIteratorProvider fetch(int partition, Bytes keyFrom, Bytes keyTo, long timeFrom, long timeTo, boolean forward, long windowSize) { + // TODO + return null; + } + + @Override + public KeyValueIterator, byte[]> fetchAll(int partition, long timeFrom, long timeTo, boolean forward) { + // TODO + return null; + } + + @Override + public byte[] get(int partition, long windowStartTimestamp, Bytes key, long seqnum) { + BoundStatement prepared = select.bind(partition, windowStartTimestamp, ByteBuffer.wrap(key.get()), seqnum); + ResultSet rs = session.execute(prepared); + Row result = rs.one(); + if (result == null) { + return null; + } else { + ByteBuffer byteBuffer = result.getByteBuffer(0); + return byteBuffer == null ? null : byteBuffer.array(); + } + } + + @Override + public KeyValueIterator, byte[]> getAllWindowsFrom(int partition, long minTime, boolean forward) { + // TODO + return null; + } + + @Override + public void deleteWindowsOlderThan(int partition, long minLiveTime) { + // TODO + } +}