diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java index 256eddd6f7462..fa289ca595968 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -891,7 +891,10 @@ protected static Topology getTopology(final Properties streamsConfig, rejoin, leftVersioned, rightVersioned, - value -> value.split("\\|")[1] + value -> { + final String[] tokens = value.split("\\|"); + return tokens.length == 2 ? tokens[1] : null; + } ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java index e654cd752af16..3bb3c7a9396f8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java @@ -82,7 +82,8 @@ public void init(final ProcessorContext, Change> record) { - if (record.key() == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction())) { + final KRight foreignKey = record.key(); + if (foreignKey == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction())) { dropRecord(); return; } @@ -93,7 +94,7 @@ public void process(final Record> record) { throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version."); } context().forward( - record.withKey(new CombinedKey<>(record.key(), record.value().primaryKey())) + record.withKey(new CombinedKey<>(foreignKey, record.value().primaryKey())) .withValue(inferChange(record)) .withTimestamp(record.timestamp()) ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java index b161ce092c4d9..03c8272b2e448 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java @@ -128,47 +128,105 @@ public void process(final Record> record) { } private void leftJoinInstructions(final Record> record) { - if (record.value().oldValue != null) { - final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue); - final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue); - if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { + final VLeft oldValue = record.value().oldValue; + final VLeft newValue = record.value().newValue; + + if (oldValue == null && newValue == null) { + // no output for idempotent left hand side deletes + return; + } + + final KRight oldForeignKey = oldValue == null ? null : foreignKeyExtractor.extract(record.key(), oldValue); + final KRight newForeignKey = newValue == null ? null : foreignKeyExtractor.extract(record.key(), newValue); + + final boolean maybeUnsubscribe = oldForeignKey != null; + if (maybeUnsubscribe) { + // delete old subscription only if FK changed + // + // if FK did change, we need to explicitly delete the old subscription, + // because the new subscription goes to a different partition + final boolean foreignKeyChanged = !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey)); + + if (foreignKeyChanged) { + // this may lead to unnecessary tombstones if the old FK did not join; + // however, we cannot avoid it as we have no means to know if the old FK joined or not forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); } - forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); - } else if (record.value().newValue != null) { - final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue); - forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); } + + // for all cases (insert, update, and delete), we send a new subscription; + // we need to get a response back for all cases to always produce a left-join result + // + // note: for delete, `newForeignKey` is null, what is a "hack" + // no actual subscription will be added for null-FK on the right hand sice, but we still get the response back we need + // + // this may lead to unnecessary tombstones if the old FK did not join; + // however, we cannot avoid it as we have no means to know if the old FK joined or not + forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); } private void defaultJoinInstructions(final Record> record) { - if (record.value().oldValue != null) { - final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue); - final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue); + final VLeft oldValue = record.value().oldValue; + final VLeft newValue = record.value().newValue; + + final KRight oldForeignKey = oldValue == null ? null : foreignKeyExtractor.extract(record.key(), oldValue); + final boolean needToUnsubscribe = oldForeignKey != null; - if (oldForeignKey == null && newForeignKey == null) { + // if left row is inserted or updated, subscribe to new FK (if new FK is valid) + if (newValue != null) { + final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), newValue); + + if (newForeignKey == null) { // invalid FK logSkippedRecordDueToNullForeignKey(); - } else if (oldForeignKey == null) { - forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE); - } else if (newForeignKey == null) { - forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); - } else if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) { - //Different Foreign Key - delete the old key value and propagate the new one. - //Delete it from the oldKey's state store - forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); - //Add to the newKey's state store. Additionally, propagate null if no FK is found there, - //since we must "unset" any output set by the previous FK-join. This is true for both INNER - //and LEFT join. - forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); - } else { // unchanged FK - forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE); + if (needToUnsubscribe) { + // delete old subscription + // + // this may lead to unnecessary tombstones if the old FK did not join; + // however, we cannot avoid it as we have no means to know if the old FK joined or not + forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); + } + } else { // valid FK + // regular insert/update + + if (needToUnsubscribe) { + // update case + + final boolean foreignKeyChanged = !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey)); + + if (foreignKeyChanged) { + // if FK did change, we need to explicitly delete the old subscription, + // because the new subscription goes to a different partition + // + // we don't need any response, as we only want a response from the new subscription + forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE); + + // subscribe for new FK (note, could be on a different task/node than the old FK) + // additionally, propagate null if no FK is found so we can delete the previous result (if any) + // + // this may lead to unnecessary tombstones if the old FK did not join and the new FK key does not join either; + // however, we cannot avoid it as we have no means to know if the old FK joined or not + forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE); + } else { + // if FK did not change, we only need a response from the new FK subscription, if there is a join + // if there is no join, we know that the old row did not join either (as it used the same FK) + // and thus we don't need to propagate an idempotent null result + forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE); + } + } else { + // insert case + + // subscribe to new key + // don't propagate null if no FK is found: + // for inserts, we know that there is no need to delete any previous result + forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE); + } } - } else if (record.value().newValue != null) { - final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue); - if (newForeignKey == null) { - logSkippedRecordDueToNullForeignKey(); - } else { - forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE); + } else { + // left row is deleted + if (needToUnsubscribe) { + // this may lead to unnecessary tombstones if the old FK did not join; + // however, we cannot avoid it as we have no means to know if the old FK joined or not + forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE); } } }