Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,10 @@ protected static Topology getTopology(final Properties streamsConfig,
rejoin,
leftVersioned,
rightVersioned,
value -> value.split("\\|")[1]
value -> {
final String[] tokens = value.split("\\|");
return tokens.length == 2 ? tokens[1] : null;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some side cleanup... While playing with the code adding other test (which I removed later, as they were just for myself), this method failed if we did only get one token...

);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public void init(final ProcessorContext<CombinedKey<KRight, KLeft>, Change<Value

@Override
public void process(final Record<KRight, SubscriptionWrapper<KLeft>> record) {
if (record.key() == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction())) {
final KRight foreignKey = record.key();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor cleanup, to explain that record.key() is the FK -- it's not necessarily obvious form the existing code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a thought, what if we rename the record to something like fkRecord instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to rename, but don't think it does buy us too much, as it clear from the class name and parameter type that it's a subscription record. But renaming does not help much to explain how a subscription record built up internally.

if (foreignKey == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction())) {
dropRecord();
return;
}
Expand All @@ -93,7 +94,7 @@ public void process(final Record<KRight, SubscriptionWrapper<KLeft>> record) {
throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
}
context().forward(
record.withKey(new CombinedKey<>(record.key(), record.value().primaryKey()))
record.withKey(new CombinedKey<>(foreignKey, record.value().primaryKey()))
.withValue(inferChange(record))
.withTimestamp(record.timestamp())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,47 +128,101 @@
}

private void leftJoinInstructions(final Record<KLeft, Change<VLeft>> record) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff is unfortunately a little bit messy, but if you compare new and old code, also because I added comments, it should become clear the the new code is much cleaner, especially for left-join case.

if (record.value().oldValue != null) {
final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue);
final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
final VLeft oldValue = record.value().oldValue;
final VLeft newValue = record.value().newValue;

if (oldValue == null && newValue == null) {
// no output for idempotent left hand side deletes
return;
}

final KRight oldForeignKey = oldValue == null ? null : foreignKeyExtractor.extract(record.key(), oldValue);
final KRight newForeignKey = newValue == null ? null : foreignKeyExtractor.extract(record.key(), newValue);

final boolean unsubscribe = oldForeignKey != null;
if (unsubscribe) {
// delete old subscription if FK changed
//
// if FK did change, we need to explicitly delete the old subscription,
// because the new subscription goes to a different partition
if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
// this may lead to unnecessary tombstones, if we delete an existing key,
// which did not join previously;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
}
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else if (record.value().newValue != null) {
final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue);
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
}
}

// for all cases, insert, update, and delete, we send a new subscription
// we need to get a response back for all cases to always produce a left-join result
//
// note: for delete, `newForeignKey` is null, what is a "hack"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a thought, if we split the logic into if equals null, delete(or pass null expricitly), if not do the normal stuff, will it look as hacky?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO it's still hacky, as we keep sending a null to the right hand side. The hacky part is not about the variable being null (it contributes a little bit I guess, as it might seem unintuitive that it is a valid case), but about sending null at all (and sending null is also unintuitive by itself IMHO). Thoughts?

// no actual subscription will be added for null-FK, but we still get the response back we need
//
// this may lead to unnecessary tombstones, if we delete an existing key,
// which did not join previously;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
}

Check notice on line 166 in streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java

View workflow job for this annotation

GitHub Actions / build / Compile and Check (Merge Ref)

Checkstyle error

'method def rcurly' has incorrect indentation level 0, expected level should be 8.

private void defaultJoinInstructions(final Record<KLeft, Change<VLeft>> record) {
if (record.value().oldValue != null) {
final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue);
final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
final VLeft oldValue = record.value().oldValue;
final VLeft newValue = record.value().newValue;

final KRight oldForeignKey = oldValue == null ? null : foreignKeyExtractor.extract(record.key(), oldValue);
final boolean unsubscribe = oldForeignKey != null;

// if left row is inserted or updated, subscribe to new FK (if new FK is valid)
if (newValue != null) {
final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), newValue);

if (oldForeignKey == null && newForeignKey == null) {
logSkippedRecordDueToNullForeignKey();
} else if (oldForeignKey == null) {
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
} else if (newForeignKey == null) {
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
} else if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
//Different Foreign Key - delete the old key value and propagate the new one.
//Delete it from the oldKey's state store
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
//Add to the newKey's state store. Additionally, propagate null if no FK is found there,
//since we must "unset" any output set by the previous FK-join. This is true for both INNER
//and LEFT join.
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else { // unchanged FK
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
}
} else if (record.value().newValue != null) {
final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue);
if (newForeignKey == null) {
logSkippedRecordDueToNullForeignKey();
if (unsubscribe) {
// delete old subscription
//
// this may lead to unnecessary tombstones if the old FK did not join
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
}
} else {
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
// regular insert/update

if (unsubscribe) {
// update case

// delete old subscription if FK changed
// if FK did change, we need to explicitly delete the old subscription,
// because the new subscription goes to a different partition
//
// we don't need any response, as we only want a response from the new subscription
if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
}

// subscribe to new key (note, could be on a different task/node than old key)
// additionally, propagate null if no FK is found so we can delete the previous result (if any)
//
// this may lead to unnecessary tombstones if the old FK did not join
// and the new FK key does not join either;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else {
// insert

// subscribe to new key
// don't propagate null if no FK is found;
// for inserts, we know that there is no need to delete any previous result
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
}
}
} else {
// left row is deleted
if (unsubscribe) {
// this may lead to unnecessary tombstones, if we delete an existing key,
// which did not join previously;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,23 @@ public void innerJoinShouldDeleteOldAndPropagateNewFK() {
);
}

@Test
public void innerJoinShouldNotDeleteOldAndPropagateNewFKForUnchangedFK() {
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
innerJoinProcessor.init(context);
context.setRecordMetadata("topic", 0, 0);

final LeftValue leftRecordValue = new LeftValue(fk1);

innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, leftRecordValue), 0));

assertThat(context.forwarded().size(), is(1));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
}

@Test
public void innerJoinShouldPropagateNothingWhenOldAndNewFKIsNull() {
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
Expand Down Expand Up @@ -332,7 +349,7 @@ public void innerJoinShouldDeleteAndPropagateChangeFromNonNullFKToNullFK() {
}

@Test
public void innerJoinShouldPropagateUnchangedFKOnlyIfFKExistsInRightTable() {
public void innerJoinShouldPropagateNewRecordOfUnchangedFK() {
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
innerJoinProcessor.init(context);
context.setRecordMetadata("topic", 0, 0);
Expand All @@ -344,7 +361,7 @@ public void innerJoinShouldPropagateUnchangedFKOnlyIfFKExistsInRightTable() {
assertThat(context.forwarded().size(), is(1));
assertThat(
context.forwarded().get(0).record(),
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE, pk, 0), 0))
);
}

Expand Down
Loading