Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,10 @@ protected static Topology getTopology(final Properties streamsConfig,
rejoin,
leftVersioned,
rightVersioned,
value -> value.split("\\|")[1]
value -> {
final String[] tokens = value.split("\\|");
return tokens.length == 2 ? tokens[1] : null;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some side cleanup... While playing with the code adding other test (which I removed later, as they were just for myself), this method failed if we did only get one token...

);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public void init(final ProcessorContext<CombinedKey<KRight, KLeft>, Change<Value

@Override
public void process(final Record<KRight, SubscriptionWrapper<KLeft>> record) {
if (record.key() == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction())) {
final KRight foreignKey = record.key();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor cleanup, to explain that record.key() is the FK -- it's not necessarily obvious form the existing code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a thought, what if we rename the record to something like fkRecord instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to rename, but don't think it does buy us too much, as it clear from the class name and parameter type that it's a subscription record. But renaming does not help much to explain how a subscription record built up internally.

if (foreignKey == null && !SubscriptionWrapper.Instruction.PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE.equals(record.value().instruction())) {
dropRecord();
return;
}
Expand All @@ -93,7 +94,7 @@ public void process(final Record<KRight, SubscriptionWrapper<KLeft>> record) {
throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
}
context().forward(
record.withKey(new CombinedKey<>(record.key(), record.value().primaryKey()))
record.withKey(new CombinedKey<>(foreignKey, record.value().primaryKey()))
.withValue(inferChange(record))
.withTimestamp(record.timestamp())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,47 +128,105 @@ public void process(final Record<KLeft, Change<VLeft>> record) {
}

private void leftJoinInstructions(final Record<KLeft, Change<VLeft>> record) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff is unfortunately a little bit messy, but if you compare new and old code, also because I added comments, it should become clear the the new code is much cleaner, especially for left-join case.

if (record.value().oldValue != null) {
final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue);
final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
final VLeft oldValue = record.value().oldValue;
final VLeft newValue = record.value().newValue;

if (oldValue == null && newValue == null) {
// no output for idempotent left hand side deletes
return;
}

final KRight oldForeignKey = oldValue == null ? null : foreignKeyExtractor.extract(record.key(), oldValue);
final KRight newForeignKey = newValue == null ? null : foreignKeyExtractor.extract(record.key(), newValue);

final boolean maybeUnsubscribe = oldForeignKey != null;
if (maybeUnsubscribe) {
// delete old subscription only if FK changed
//
// if FK did change, we need to explicitly delete the old subscription,
// because the new subscription goes to a different partition
final boolean foreignKeyChanged = !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can check for null here to avoid the unnecessary serialization and comparison

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think it would buy us much? Just a few method calls, as serialize() would do the null checks and returns null, and if either serialize call does return null, equals is also very cheap.

I would rather keep it the way it is to keep fewer if/else checks what makes it easier to read, and I don't think we get any perf benefits if we make the code more complicated.


if (foreignKeyChanged) {
// this may lead to unnecessary tombstones if the old FK did not join;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
}
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else if (record.value().newValue != null) {
final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue);
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
}

// for all cases (insert, update, and delete), we send a new subscription;
// we need to get a response back for all cases to always produce a left-join result
//
// note: for delete, `newForeignKey` is null, what is a "hack"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a thought, if we split the logic into if equals null, delete(or pass null expricitly), if not do the normal stuff, will it look as hacky?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO it's still hacky, as we keep sending a null to the right hand side. The hacky part is not about the variable being null (it contributes a little bit I guess, as it might seem unintuitive that it is a valid case), but about sending null at all (and sending null is also unintuitive by itself IMHO). Thoughts?

// no actual subscription will be added for null-FK on the right hand sice, but we still get the response back we need
//
// this may lead to unnecessary tombstones if the old FK did not join;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
}

private void defaultJoinInstructions(final Record<KLeft, Change<VLeft>> record) {
if (record.value().oldValue != null) {
final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue);
final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
final VLeft oldValue = record.value().oldValue;
final VLeft newValue = record.value().newValue;

final KRight oldForeignKey = oldValue == null ? null : foreignKeyExtractor.extract(record.key(), oldValue);
final boolean needToUnsubscribe = oldForeignKey != null;

if (oldForeignKey == null && newForeignKey == null) {
// if left row is inserted or updated, subscribe to new FK (if new FK is valid)
if (newValue != null) {
final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), newValue);

if (newForeignKey == null) { // invalid FK
logSkippedRecordDueToNullForeignKey();
} else if (oldForeignKey == null) {
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
} else if (newForeignKey == null) {
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
} else if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
//Different Foreign Key - delete the old key value and propagate the new one.
//Delete it from the oldKey's state store
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
//Add to the newKey's state store. Additionally, propagate null if no FK is found there,
//since we must "unset" any output set by the previous FK-join. This is true for both INNER
//and LEFT join.
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else { // unchanged FK
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
if (needToUnsubscribe) {
// delete old subscription
//
// this may lead to unnecessary tombstones if the old FK did not join;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
}
} else { // valid FK
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could be else if

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I can follow?

if (newForeignKey = null) {
...
} else { // what would we add here? -> `else if (newForeignKey != null)` would be redundant 
...
}

// regular insert/update

if (needToUnsubscribe) {
// update case

final boolean foreignKeyChanged = !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey));

if (foreignKeyChanged) {
// if FK did change, we need to explicitly delete the old subscription,
// because the new subscription goes to a different partition
//
// we don't need any response, as we only want a response from the new subscription
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);

// subscribe for new FK (note, could be on a different task/node than the old FK)
// additionally, propagate null if no FK is found so we can delete the previous result (if any)
//
// this may lead to unnecessary tombstones if the old FK did not join and the new FK key does not join either;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
} else {
// if FK did not change, we only need a response from the new FK subscription, if there is a join
// if there is no join, we know that the old row did not join either (as it used the same FK)
// and thus we don't need to propagate an idempotent null result
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
}
} else {
// insert case

// subscribe to new key
// don't propagate null if no FK is found:
// for inserts, we know that there is no need to delete any previous result
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
}
}
} else if (record.value().newValue != null) {
final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue);
if (newForeignKey == null) {
logSkippedRecordDueToNullForeignKey();
} else {
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
} else {
// left row is deleted
if (needToUnsubscribe) {
// this may lead to unnecessary tombstones if the old FK did not join;
// however, we cannot avoid it as we have no means to know if the old FK joined or not
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
}
}
}
Expand Down