Skip to content

Comments

[SPARK-55494][SS] Introduce iterator/prefixScan with multi-values in StateStore API#54278

Closed
HeartSaVioR wants to merge 3 commits intoapache:masterfrom
HeartSaVioR:SPARK-55494
Closed

[SPARK-55494][SS] Introduce iterator/prefixScan with multi-values in StateStore API#54278
HeartSaVioR wants to merge 3 commits intoapache:masterfrom
HeartSaVioR:SPARK-55494

Conversation

@HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to introduce iterator/prefixScan with multi-values in StateStore API.

Why are the changes needed?

The functionality is missing on StateStore API so when the caller sets multi-values for specific CF, that CF doesn't support scanning through the data. The new functionality will be used in new state format version in stream-stream join, specifically SPARK-55144 (#53930).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New UTs.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: claude-4.5-sonnet

The above is used for creating a new test suite. All other parts aren't generated by LLM.

@HeartSaVioR
Copy link
Contributor Author

HeartSaVioR commented Feb 11, 2026

The last commit is only relevant to this PR. Once #53911 is merged, I will rebase this PR to remove commits except this one.

@HeartSaVioR HeartSaVioR force-pushed the SPARK-55494 branch 5 times, most recently from 4e3a915 to 0c5b92d Compare February 13, 2026 23:10
@HeartSaVioR
Copy link
Contributor Author

cc. @anishshri-db Please take a look, thanks!

*
* It is expected to throw exception if Spark calls this method without proper key encoding spec.
* It is also expected to throw exception if Spark calls this method without setting
* multipleValuesPerKey as true for the column family.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we verify the multipleValuesPerKey condition ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each implementation should do it - RocksDBStateStoreProvider.prefixScanWithMultiValues checks it.

      verify(
        kvEncoder._2.supportsMultipleValuesPerKey,
        "Multi-value iterator operation requires an encoder" +
          " which supports multiple values for a single key")

}
}

test(s"Event time as prefix: iterator with multiple values (encoding = $encoding)") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Avro work here or will be added later ? if so, can we add a TODO ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO comment for Avro is added in L226 - it's a broader foreach covering multiple test cases.

// Put multiple values at different event times
// Insert in non-sorted order to verify ordering by event time
val values2 = Array(valueToRow(200), valueToRow(201))
store.putList(keyAndTimestampToRow("key1", 1, 1000L), values2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add some different values here also ? similar to the encoder tests ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test we added in encoder is extended to cover multi-values API as well. Would it be sufficient?

// For prefix encoder, we use iterator
case "prefix" =>
store.iterator()
if (useMultipleValuesPerKey) store.iteratorWithMultiValues()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use braces ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// For postfix encoder, we use prefix scan with ("key1", 1) as the prefix key
case "postfix" =>
store.prefixScan(keyToRow("key1", 1))
if (useMultipleValuesPerKey) store.prefixScanWithMultiValues(keyToRow("key1", 1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@anishshri-db anishshri-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@HeartSaVioR
Copy link
Contributor Author

https://github.com/HeartSaVioR/spark/runs/63873409895

CI failure only happened in docker integration test which is unrelated.

@HeartSaVioR
Copy link
Contributor Author

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants