Skip to content

Comments

[SPARK-55144][SS] Introduce new state format version for performant stream-stream join#53930

Open
HeartSaVioR wants to merge 4 commits intoapache:masterfrom
HeartSaVioR:SPARK-55144-on-top-of-SPARK-55129
Open

[SPARK-55144][SS] Introduce new state format version for performant stream-stream join#53930
HeartSaVioR wants to merge 4 commits intoapache:masterfrom
HeartSaVioR:SPARK-55144-on-top-of-SPARK-55129

Conversation

@HeartSaVioR
Copy link
Contributor

@HeartSaVioR HeartSaVioR commented Jan 23, 2026

What changes were proposed in this pull request?

This PR proposes to implement the new state format for stream-stream join, based on the new state key encoding w.r.t. event time awareness.

The new state format is focused to eliminate the necessity of full scan during eviction & populating unmatched rows. The overhead of eviction should have bound to the actual number of state rows to be evicted (indirectly impacted by the amount of watermark advancement), but we have been doing the full scan with the existing state format, which could take more than 2 seconds in 1,000,000 rows even if there is zero row to be evicted. The overhead of eviction with the new state format would be bound to the actual number of state rows to be evicted, taking around 30ms or even less in 1,000,000 rows when there is zero row to be evicted.

To achieve the above, we make a drastic change of data structure to move out from the logical array, and introduce a secondary index in addition to the main data.

Each side of the join will use two (virtual) column families (total 4 column families), which are following:

  • KeyWithTsToValuesStore
    • Primary data store
    • (key, event time) -> values
    • each element in values consists of (value, matched)
  • TsWithKeyTypeStore
    • Secondary index for efficient eviction
    • (event time, key) -> empty value (configured as multi-values)
    • numValues is calculated by the number of elements in the value side; new element is added when a new value is added into values in primary data store
      • This is to track the number of deleted rows accurately. It's optional but the metric has been useful so we want to keep it as it is.

As the format of key part implies, KeyWithTsToValuesStore will use TimestampAsPostfixKeyStateEncoderSpec, and TsWithKeyTypeStore will use TimestampAsPrefixKeyStateEncoderSpec.

The granularity of the timestamp for event time is 1 millisecond, which is in line with the granularity for watermark advancement. This can be a kind of knob controlling the number of the keys vs the number of the values in the key, trading off the granularity of eviction based on watermark advancement vs the size of key space (may impact performance).

There are several follow-ups with this state format implementation, which can be addressed on top of this:

  • further optimizations with RocksDB offering: WriteBatch (for batched writes), MGET, etc.
  • retrieving matched rows with the "scope" of timestamps (in time-interval join)
    • while the format is ready to support ordered scan of timestamp, this needs another state store API to define the range of keys to scan, which needs some effort

Why are the changes needed?

The cost of eviction based on full scan is severe to make the stream-stream join to be lower latency. Also, the logic of maintaining logical array is complicated enough to maintain and the performance characteristic is less predictable given the behavior of deleting the element in random index (placing the value of the last index to the deleted index).

Does this PR introduce any user-facing change?

No. At this point, this state format is not integrated with the actual stream-stream join operator, and we need to do follow-up work for integration to finally introduce the change to user-facing.

How was this patch tested?

New UT suites, refactoring the existing suite to test with both time window and time interval cases.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions
Copy link

JIRA Issue Information

=== Improvement SPARK-55144 ===
Summary: Introduce new state format version for performant stream-stream join
Assignee: None
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

@HeartSaVioR
Copy link
Contributor Author

NOTE: This is on top of the PR - #53911

@HeartSaVioR
Copy link
Contributor Author

TODO tickets:

yet to cover the case of regular join where there is no event time in both join condition and the value

We have an idea to give a try (via replacing GET-and-PUT pattern of count in secondary index with blind MERGE), though it may require broader change due to the issue in below.

https://issues.apache.org/jira/browse/SPARK-55131

After resolving the above, we can give a try with blind MERGE and check the performance improvement.

retrieving matched rows with the "scope" of timestamps (in time-interval join)

https://issues.apache.org/jira/browse/SPARK-55147

further optimizations with RocksDB offering: WriteBatch (for batched writes), MGET, etc.

https://issues.apache.org/jira/browse/SPARK-55148

joinStateManager.get(key)
}

// FIXME: doc!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the comments for this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, thanks for reminding. Will do.

* @throws UnsupportedOperationException if called on an encoder that doesn't support event time
* as postfix.
*/
def encodeKeyForEventTimeAsPostfix(row: UnsafeRow, eventTime: Long): Array[Byte]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this more generic ? Don't think we should call out eventTime as such - can just name it as longType ?

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Jan 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say we shouldn't generalize too much - this is coupled with state store API change and I'm not sure we want to introduce an API with just saying it's to handle additional long type. That should have enough meaning to do so.

While I think "event time" has enough potential for usages, timestamp is fine for me if event time sounds too tight. I'd still want to keep the semantic of "time" here.

* @throws UnsupportedOperationException if called on an encoder that doesn't support event time
* as postfix.
*/
def decodeKeyForEventTimeAsPostfix(bytes: Array[Byte]): (UnsafeRow, Long)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

}
}
StructType(remainingSchema)
case _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this possible ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New encoder specs for event time would be bound to here. It's not handled at this point and I filed a TODO ticket for it.

new StateStoreIterator(iter, rocksDbIter.closeIfNeeded)
}

class RocksDBEventTimeAwareStateOperations(cfName: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not rename this to something more generic ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

val eventTimeColsSet = eventTimeCols.map(_._1.exprId).toSet
if (eventTimeColsSet.size > 1) {
throw new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_3077",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add new error class for this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was copied over from existing code IIRC - maybe file a JIRA ticket and handle it altogether?

@HeartSaVioR
Copy link
Contributor Author

@anishshri-db
Btw, just to remind, there is another PR for narrower change - #53911
This is on top of the above linked PR and probably quite huge to review altogether.

@HeartSaVioR HeartSaVioR force-pushed the SPARK-55144-on-top-of-SPARK-55129 branch 2 times, most recently from a2c1977 to c6618d8 Compare February 2, 2026 13:09
@HeartSaVioR
Copy link
Contributor Author

TODO Update:

https://issues.apache.org/jira/browse/SPARK-55131

This is now the first PR of the stacked PRs. We can now update the logic here to replace GET-and-PUT pattern of count in secondary index with blind MERGE.

joinStateManager.get(key)
}

// FIXME: doc!
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self review: add code comment

def convertToValueRow(value: UnsafeRow, matched: Boolean): UnsafeRow
}

class StreamingSymmetricHashJoinValueRowConverterFormatV1(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self review: add code comment

override def convertToValueRow(value: UnsafeRow, matched: Boolean): UnsafeRow = value
}

class StreamingSymmetricHashJoinValueRowConverterFormatV2(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self review: add code comment

}
}

object StreamingSymmetricHashJoinValueRowConverter {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self review: add code comment

import org.apache.spark.sql.types.{BooleanType, DataType, LongType, NullType, StructField, StructType}
import org.apache.spark.util.NextIterator

trait SymmetricHashJoinStateManager {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self review: add code comment

)
// */

/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self review: revert it

// We want to collect instance metrics from both state stores
keyWithIndexToValueMetrics.instanceMetrics ++ keyToNumValuesMetrics.instanceMetrics
)
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self review: revert it

KeyWithIndexToValueType
} else {
throw new IllegalArgumentException(s"Unknown join store name: $storeName")
// TODO: Add support of KeyWithTsToValuesType and TsWithKeyType
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self review: may need to have TODO JIRA ticket?

// State key is the partition key
new NoopStatePartitionKeyExtractor(stateKeySchema)
} else {
// TODO: Add support of KeyWithTsToValuesType and TsWithKeyType
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self review: may need to have TODO JIRA ticket?

}
}

case class KeyAndTsToValuePair(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self review: add code comment

HeartSaVioR added a commit that referenced this pull request Feb 18, 2026
…StateStore API

### What changes were proposed in this pull request?

This PR proposes to introduce iterator/prefixScan with multi-values in StateStore API.

### Why are the changes needed?

The functionality is missing on StateStore API so when the caller sets multi-values for specific CF, that CF doesn't support scanning through the data. The new functionality will be used in new state format version in stream-stream join, specifically SPARK-55144 (#53930).

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UTs.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: claude-4.5-sonnet

The above is used for creating a new test suite. All other parts aren't generated by LLM.

Closes #54278 from HeartSaVioR/SPARK-55494.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
@HeartSaVioR HeartSaVioR force-pushed the SPARK-55144-on-top-of-SPARK-55129 branch from de4d53d to ec48b97 Compare February 19, 2026 07:22
Copy link
Contributor

@nyaapa nyaapa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

Copy link
Contributor

@eason-yuchen-liu eason-yuchen-liu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had one pass except for StreamingSymmetricHashJoinValueRowConverter.scala and the tests.

case _ =>
// Need a strategy about bucketing when event time is not available
// - first attempt: random bucketing
random.nextInt(bucketSizeForNoEventTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK for extractEventTimeFn to return non-deterministic result? Will it create problem when we want to fetch the exact key in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always scan through all buckets to figure out all the values associated with the key. Unlike time interval join where we could scope the timestamp range during scanning, this case will need to read all the values, so it's simply a trade off of "smaller number of buckets with more elements per bucket" vs "larger number of buckets with less elements per bucket".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no operation we have to look up specific element.

override def get(key: UnsafeRow): Iterator[UnsafeRow] = {
keyWithTsToValues.getValues(key).flatMap { result =>
result.values.map(_.value)
}.iterator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure - let me check with IDE...

Seq(StructField("dummy", NullType, nullable = true))
)

private val stateStoreCkptId: Option[String] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this is hardcoded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh nice catch. I think I left this to the integration but forgot to leave a TODO comment. Let me do this...

)

private val stateStoreCkptId: Option[String] = None
private val handlerSnapshotOptions: Option[HandlerSnapshotOptions] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants