- 
                Notifications
    You must be signed in to change notification settings 
- Fork 14
rfc: spark structured streaming sink and source platform #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| WalkthroughThis change introduces a configuration mechanism to explicitly specify the data platform for Spark Structured Streaming sources and sinks within DataHub lineage capture. It addresses cases where the existing regular expression-based platform detection fails, such as with Iceberg tables lacking recognizable prefixes. A new Spark configuration property  Changes
 Sequence Diagram(s)sequenceDiagram
    participant SparkApp
    participant DataHubLineage
    SparkApp->>DataHubLineage: Trigger lineage capture with streaming description
    DataHubLineage->>DataHubLineage: Parse streaming description with regex
    alt Regex matches
        DataHubLineage->>DataHubLineage: Extract namespace, path, platform and generate URN
    else Regex fails
        DataHubLineage->>DataHubLineage: Check for fallback platform config (spark.datahub.streaming.platform.instance)
        alt Fallback config found
            DataHubLineage->>DataHubLineage: Use fallback platform to generate URN (source or sink)
        else No config found
            DataHubLineage->>DataHubLineage: Return empty optional
        end
    end
Poem
 Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit: 
 Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
 Other keywords and placeholders
 CodeRabbit Configuration File ( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
active/000-spark-streaming-sink-source-platform.md (4)
1-5: Refine metadata formatting and link styling.The header currently uses bare URLs and placeholders, e.g.:
RFC PR: https://github.com/datahub-project/rfcs/pull/11
Discussion Issue: (GitHub issue this was discussed in before the RFC, if any)
Implementation PR(s): (leave this empty)Proposed changes:
- Convert bare URLs to markdown links.
- Remove or clearly mark unavailable fields (e.g., use “None” or “N/A”).
Example diff:
- `RFC PR: https://github.com/datahub-project/rfcs/pull/11` + `RFC PR: [DataHub RFC #11](https://github.com/datahub-project/rfcs/pull/11)` - `Discussion Issue: (GitHub issue this was discussed in before the RFC, if any)` + `Discussion Issue: None` - `Implementation PR(s): (leave this empty)` + `Implementation PR(s): N/A`🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
2-2: Bare URL used
null(MD034, no-bare-urls)
10-12: Enhance the Summary with explicit fallback configuration keys.The summary currently states:
Allows configuration of Spark structured streaming sink and source platform.
Consider adding the new config properties by name to make the intent crystal clear:
Introduce
spark.datahub.streaming.source.platformandspark.datahub.streaming.sink.platformas explicit fallback configurations when regex-based platform detection fails.
26-33: Optimize regex usage by precompiling the Pattern.Currently the regex is compiled on each invocation:
String pattern = "(.*?)\\[(.*)]"; Pattern r = Pattern.compile(pattern); Matcher m = r.matcher(description);This can be expensive in high-throughput streaming contexts. Instead, move it to a
static finalfield:+ private static final Pattern STREAM_DESC_PATTERN = + Pattern.compile("(.*?)\\[(.*)]"); ... - String pattern = "(.*?)\\[(.*)]"; - Pattern r = Pattern.compile(pattern); - Matcher m = r.matcher(description); + Matcher m = STREAM_DESC_PATTERN.matcher(description);
41-46: Use multi-catch to simplify exception handling.The two catch blocks under file/delta lake path parsing both return
Optional.empty(). You can collapse them:- } catch (InstantiationException e) { - return Optional.empty(); - } catch (URISyntaxException e) { - log.error("Failed to parse path {}", path, e); - return Optional.empty(); - } + } catch (InstantiationException | URISyntaxException e) { + log.error("Failed to parse path {}", path, e); + return Optional.empty(); + }This reduces duplication and makes the intent clearer.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- active/000-spark-streaming-sink-source-platform.md(1 hunks)
🧰 Additional context used
🪛 markdownlint-cli2 (0.17.2)
active/000-spark-streaming-sink-source-platform.md
2-2: Bare URL used
null
(MD034, no-bare-urls)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
active/000-spark-streaming-sink-source-platform.md (1)
61-75:⚠️ Potential issueFix infinite recursion and incorrect overload in fallback logic.
The fallback block passes a
Stringwhere the method expects aboolean isSink, causing a compile error and endless recursion. Please replace the recursive calls with direct URN construction or introduce a proper overload. For example:-if (sparkLineageConf.getOpenLineageConf().getStreamingSinkPlatform() != null && isSink) { - return generateUrnFromStreamingDescription( - description, - sparkLineageConf, - sparkLineageConf.getOpenLineageConf().getStreamingSinkPlatform() - ); -} else if (sparkLineageConf.getOpenLineageConf().getStreamingSourcePlatform() != null && !isSink) { - return generateUrnFromStreamingDescription( - description, - sparkLineageConf, - sparkLineageConf.getOpenLineageConf().getStreamingSourcePlatform() - ); -} else { - return Optional.empty(); -} +// Fallback: directly construct URN with provided platform +if (sparkLineageConf.getOpenLineageConf().getStreamingSinkPlatform() != null && isSink) { + String platform = sparkLineageConf.getOpenLineageConf().getStreamingSinkPlatform(); + return Optional.of(new DatasetUrn( + new DataPlatformUrn(platform), + description, + sparkLineageConf.getOpenLineageConf().getFabricType())); +} else if (sparkLineageConf.getOpenLineageConf().getStreamingSourcePlatform() != null && !isSink) { + String platform = sparkLineageConf.getOpenLineageConf().getStreamingSourcePlatform(); + return Optional.of(new DatasetUrn( + new DataPlatformUrn(platform), + description, + sparkLineageConf.getOpenLineageConf().getFabricType())); +} else { + return Optional.empty(); +}This removes the erroneous overload call and prevents infinite recursion.
🧹 Nitpick comments (4)
active/000-spark-streaming-sink-source-platform.md (4)
1-5: Populate metadata fields for Discussion Issue and Implementation PR(s).The
Discussion IssueandImplementation PR(s)entries are currently empty placeholders. Please either fill in the relevant GitHub issue/PR links or remove these lines to avoid confusion.🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
2-2: Bare URL used
null(MD034, no-bare-urls)
2-2: Convert bare RFC PR URL to a Markdown link.The RFC PR URL is a bare link. For consistency and readability, format it as a Markdown hyperlink, for example:
- RFC PR: [#11](https://github.com/datahub-project/rfcs/pull/11)🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
2-2: Bare URL used
null(MD034, no-bare-urls)
14-15: Convert inline bare URL to a Markdown link.The DataHub code reference is currently a bare URL in parentheses. Please convert it to a Markdown link, e.g.:
in the DataHub [code](https://github.com/datahub-project/datahub/blob/master/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java#L145)
23-29: Add language identifier to the code fence.The Java snippet for the new overload is missing a language specifier. Adding
javawill enable proper syntax highlighting:-``` +```java public static Optional<DatasetUrn> generateUrnFromStreamingDescription( String description, SparkLineageConf sparkLineageConf) { return SparkStreamingEventToDatahub.generateUrnFromStreamingDescription( description, sparkLineageConf, false); } -``` +```
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- active/000-spark-streaming-sink-source-platform.md(1 hunks)
🧰 Additional context used
🪛 markdownlint-cli2 (0.17.2)
active/000-spark-streaming-sink-source-platform.md
2-2: Bare URL used
null
(MD034, no-bare-urls)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (6)
active/000-spark-streaming-sink-source-platform.md (6)
24-28: Add Default Overload for Streaming Description
The new two‐parameter overload cleanly delegates to the three‐parameter version withisSink=false, which is sensible for source contexts. Consider adding a brief Javadoc explaining that this entrypoint treats descriptions as non‐sinks by default.
37-44: Avoid Repeated Group Access and Improve Clarity
You extractm.group(2)twice when handling Kafka paths. Reuse thepathvariable to make the intent clearer:- String path = m.group(2); log.debug("Streaming description Platform: {}, Path: {}", platform, path); if (platform.equals(KAFKA_PLATFORM)) { - path = getKafkaTopicFromPath(m.group(2)); + path = getKafkaTopicFromPath(path); } else if (platform.equals(FILE_PLATFORM) || platform.equals(DELTA_LAKE_PLATFORM)) {This reduces duplication and scope for error.
44-54: Consolidate Exception Handling for File/Delta Paths
BothInstantiationExceptionandURISyntaxExceptionlead to returningOptional.empty(). You could merge them into a single multi‐catch and log uniformly:- } catch (InstantiationException e) { - return Optional.empty(); - } catch (URISyntaxException e) { - log.error("Failed to parse path {}", path, e); - return Optional.empty(); - } + } catch (InstantiationException | URISyntaxException e) { + log.error("Failed to instantiate or parse HDFS path {}", path, e); + return Optional.empty(); + }This simplifies the control flow and ensures both errors are logged consistently.
55-60: Clarify Default Platform URN Construction
After regex matching, all other platforms map directly to aDatasetUrn. Consider adding a comment or Javadoc here to note that this branch handles non‐Kafka, non‐file, non‐delta cases detected via regex.
68-73: Symmetric Fallback Logic for Source & Input Validation
Similarly, the source fallback uses the(String)overload. It may be worthwhile to check that the configuredstreamingSourcePlatformis non‐empty (not just non‐null) before recursing; an empty string could produce an invalid URN.
80-89: Validate Provided Streaming Platform Value
In the(String)overload, consider guarding against blank or malformedstreamingPlatformvalues. For example:if (streamingPlatform == null || streamingPlatform.trim().isEmpty()) { log.warn("Empty streamingPlatform config; cannot generate URN for description {}", description); return Optional.empty(); }This avoids producing URNs with empty platform identifiers.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- active/000-spark-streaming-sink-source-platform.md(1 hunks)
🔇 Additional comments (1)
active/000-spark-streaming-sink-source-platform.md (1)
61-67: Verify Overload Resolution for Sink Fallback
The fallback invokesgenerateUrnFromStreamingDescription(description, sparkLineageConf, streamingSinkPlatform)which relies on the(String)overload. Please confirm the compiler picks the intended method and that there’s no ambiguity with the boolean overload.
| public static Optional<DatasetUrn> generateUrnFromStreamingDescription( | ||
| String description, SparkLineageConf sparkLineageConf, boolean isSink) { | ||
| String pattern = "(.*?)\\[(.*)]"; | ||
| Pattern r = Pattern.compile(pattern); | ||
| Matcher m = r.matcher(description); | ||
| if (m.find()) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Refine the Regex for Robust Matching
The pattern (.*?)\[(.*)] may over‐match when descriptions contain multiple or nested brackets. It would be safer to anchor the regex and ensure it only matches the entire string. For example:
String pattern = "^(.+?)\\[(.+)]$";This prevents unintended partial matches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
active/000-spark-streaming-sink-source-platform.md (1)
33-34: Anchor and refine the regex for robust matching.
The current pattern"(.*?)\\[(.*)]"may over‐match or accept unintended inputs (e.g., nested brackets). It’s safer to anchor it and escape properly:String pattern = "^(.*?)\\[(.*)\\]$";This ensures the entire
descriptionmatches exactly one namespace and one path.
🧹 Nitpick comments (8)
active/000-spark-streaming-sink-source-platform.md (8)
1-5: Remove or complete placeholder fields in the RFC header.
TheDiscussion IssueandImplementation PR(s)are currently empty placeholders. To avoid confusion, either fill these with the actual issue/PR links or remove these lines entirely.
10-11: Clarify summary with new configuration property.
The summary should explicitly mention thespark.datahub.streaming_platformfallback property so readers immediately see the core change. For example:Adds
spark.datahub.streaming_platformas a fallback platform identifier when regex parsing of streaming descriptions fails.
44-52: Consolidate exception handling in HDFS path parsing.
The twocatchblocks forInstantiationExceptionandURISyntaxExceptionboth result inOptional.empty(), but only one logs the error. Consider collapsing into a single multi‐catch to reduce duplication and ensure all failures get logged for diagnostics:-} catch (InstantiationException e) { - return Optional.empty(); -} catch (URISyntaxException e) { - log.error("Failed to parse path {}", path, e); - return Optional.empty(); -} +} catch (InstantiationException | URISyntaxException e) { + log.error("Failed to parse HDFS path '{}' in description '{}'", path, description, e); + return Optional.empty(); +}
50-51: Enhance error logging context.
When logging a bad URI, include both thedescriptionand whether it was a sink or source to help pinpoint issues in production:log.error( "Failed to parse path '{}' for {} with description '{}'", path, isSink ? "sink" : "source", description, e );
59-61: Handle nullableCatalogTableDatasetmore idiomatically.
Rather than checking fornull, leverageOptionaldirectly to keep the API consistent:- CatalogTableDataset catalogTableDataset = CatalogTableDataset.create(...); - if (catalogTableDataset == null) { - return Optional.empty(); - } else { - return Optional.of(catalogTableDataset.urn()); - } + return Optional.ofNullable(CatalogTableDataset.create(...)) + .map(CatalogTableDataset::urn);
20-23: Hyphenate “Iceberg-based” for correct spelling.
LanguageTool flags that “Iceberg based” should be “Iceberg-based dataset URNs”.🧰 Tools
🪛 LanguageTool
[uncategorized] ~22-~22: This expression is usually spelled with a hyphen.
Context: ...ec which is defined in the Configuring Iceberg based dataset URNs section below. Within the...(BASED_HYPHEN)
77-77: Hyphenate heading: “Configuring Iceberg-based dataset URNs”.
For consistency and accuracy, update the heading to:### Configuring Iceberg-based dataset URNs🧰 Tools
🪛 LanguageTool
[uncategorized] ~77-~77: This expression is usually spelled with a hyphen.
Context: ...; } } } ``` ### Configuring Iceberg based dataset URNs This section follows the ...(BASED_HYPHEN)
112-112: Specify code block language.
The fenced block showing configuration examples would benefit from a language tag (e.g.,bash orini) for syntax highlighting.🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
112-112: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- active/000-spark-streaming-sink-source-platform.md(1 hunks)
🧰 Additional context used
🪛 LanguageTool
active/000-spark-streaming-sink-source-platform.md
[uncategorized] ~22-~22: This expression is usually spelled with a hyphen.
Context: ...ec which is defined in the Configuring Iceberg based dataset URNs section below. Within the...
(BASED_HYPHEN)
[uncategorized] ~77-~77: This expression is usually spelled with a hyphen.
Context: ...;         }     } } ``` ### Configuring Iceberg based dataset URNs  This section follows the ...
(BASED_HYPHEN)
🪛 markdownlint-cli2 (0.17.2)
active/000-spark-streaming-sink-source-platform.md
112-112: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
🔇 Additional comments (1)
active/000-spark-streaming-sink-source-platform.md (1)
57-61: Verify fallback configuration property mapping.
Make suresparkLineageConf.getOpenLineageConf().getStreamingPlatform()actually reads from the newly introducedspark.datahub.streaming_platformSpark config key. It’s easy for naming mismatches to sneak in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (11)
active/000-spark-streaming-sink-source-platform.md (11)
1-5: Use a proper YAML front-matter block and remove placeholders
Rather than plain list items, consider using a fenced YAML front matter at the top of the RFC and either fill in or remove the “Discussion Issue” and “Implementation PR(s)” placeholders. This makes the metadata machine-readable and follows existing RFC conventions.Proposed diff:
- - Start Date: 2025-04-21 - - RFC PR: [#11](https://github.com/datahub-project/rfcs/pull/11) - - Discussion Issue: (GitHub issue this was discussed in before the RFC, if any) - - Implementation PR(s): (leave this empty) + --- + Start Date: 2025-04-21 + RFC PR: 11 + Discussion Issue: <fill-or-remove> + Implementation PR(s): <fill-or-remove> + ---
10-10: Refine the summary grammar
Current:Allows configuration of Spark structured streaming sink and source platform.
Suggested:
Allows configuring the data platform for Spark Structured Streaming sources and sinks.
14-15: Clarify hyperlink context in Motivation
Rather than linking with generic text[code], add context to the link, e.g.:...in the DataHub converter implementation (see SparkStreamingEventToDatahub.java#L145)...
22-22: Hyphenate “Iceberg-based”
The phrase “Configuring Iceberg based dataset URNs” should read “Configuring Iceberg-based dataset URNs“.🧰 Tools
🪛 LanguageTool
[uncategorized] ~22-~22: This expression is usually spelled with a hyphen.
Context: ...ec which is defined in the Configuring Iceberg based dataset URNs section below. Within the...(BASED_HYPHEN)
23-29: Add language identifier to code fence
The Java snippet is missing a language marker. Prefix the fence withjavafor proper syntax highlighting.Example:
- ``` + ```java public static Optional<DatasetUrn> generateUrnFromStreamingDescription( String description, SparkLineageConf sparkLineageConf) { return SparkStreamingEventToDatahub.generateUrnFromStreamingDescription( description, sparkLineageConf, false); } - ``` + ```
31-75: Addjavato the second code block fence
The longer snippet showing the overload needs ajavafence as well to improve readability and tooling support.
32-36: Anchor the regex to prevent over-matching
The pattern"(.*?)\\[(.*)]"may match unwanted substrings if there are multiple brackets. Consider:- String pattern = "(.*?)\\[(.*)]"; + String pattern = "^(.+?)\\[(.+)]$";This ensures the entire description is of the form
namespace[path]rather than a partial match.
77-77: Hyphenate “Iceberg-based” in section header
Change### Configuring Iceberg based dataset URNsto
### Configuring Iceberg-based dataset URNs🧰 Tools
🪛 LanguageTool
[uncategorized] ~77-~77: This expression is usually spelled with a hyphen.
Context: ...; } } } ``` ### Configuring Iceberg based dataset URNs This section follows the ...(BASED_HYPHEN)
112-112: Specify language for the properties code fence
The example block should be marked as properties (or ini) to aid in syntax highlighting:- ``` + ```properties🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
112-112: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
100-104: Clarifystreaming_aliasusage
The prose refers to “spark.datahub.streaming.platform.<$platform>.<streaming_alias>.envand ….platformInstance” but omitsstreaming_io_platform_type. It would be helpful to list all four configurable properties (env,platformInstance,streaming_io_platform_type, plus the fallbackspark.datahub.streaming_platform) in one place.
3-4: Remove or complete placeholder for Discussion Issue
Line reads:Discussion Issue: (GitHub issue this was discussed in before the RFC, if any)
Either remove the parenthetical or fill in the actual issue link to avoid confusion.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- active/000-spark-streaming-sink-source-platform.md(1 hunks)
🧰 Additional context used
🪛 LanguageTool
active/000-spark-streaming-sink-source-platform.md
[uncategorized] ~22-~22: This expression is usually spelled with a hyphen.
Context: ...ec which is defined in the Configuring Iceberg based dataset URNs section below. Within the...
(BASED_HYPHEN)
[uncategorized] ~77-~77: This expression is usually spelled with a hyphen.
Context: ...;         }     } } ``` ### Configuring Iceberg based dataset URNs  This section follows the ...
(BASED_HYPHEN)
🪛 markdownlint-cli2 (0.17.2)
active/000-spark-streaming-sink-source-platform.md
112-112: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
active/000-spark-streaming-sink-source-platform.md (1)
33-33: Duplicate: Anchor the regex to avoid partial matches
Your pattern"(.*?)\\[(.*)]"may over‐match when descriptions contain multiple or nested brackets. It’s safer to anchor it, for example:String pattern = "^(.+?)\\[(.+?)\\]$";This ensures you only match the entire string with one pair of brackets.
🧹 Nitpick comments (6)
active/000-spark-streaming-sink-source-platform.md (6)
1-4: Populate metadata placeholders
The “Discussion Issue” and “Implementation PR(s)” fields are currently empty. Please fill these in (or remove them if not applicable) to ensure full traceability of this RFC.
10-10: Clarify summary wording
Consider rephrasing to explicitly mention “data platforms” and mirror the RFC title. For example:Allows configuration of the data platform for Spark Structured Streaming sources and sinks.
22-23: Clarify “streaming spec” reference
The phraseand streaming spec which is defined in the Configuring Iceberg based dataset URNs section below
is a bit vague. Consider renaming “streaming spec” to something more descriptive (e.g., “alias‐based streaming configuration”) or add a brief definition here:- …two new Spark configurations: `spark.datahub.streaming_platform` for specifying the streaming platform, and streaming spec which is defined in the *Configuring Iceberg based dataset URNs* section below. + …two new Spark configurations: `spark.datahub.streaming_platform` for specifying the streaming platform, and alias‐based streaming configuration (see *Configuring Iceberg-based dataset URNs* below).🧰 Tools
🪛 LanguageTool
[uncategorized] ~22-~22: This expression is usually spelled with a hyphen.
Context: ...ec which is defined in the Configuring Iceberg based dataset URNs section below. Within the...(BASED_HYPHEN)
77-77: Hyphenate for clarity
MarkdownLint suggests hyphenating “Iceberg based” in the section title. Apply this diff:- ## Configuring Iceberg based dataset URNs + ## Configuring Iceberg-based dataset URNs🧰 Tools
🪛 LanguageTool
[uncategorized] ~77-~77: This expression is usually spelled with a hyphen.
Context: ...; } } } ``` ### Configuring Iceberg based dataset URNs This section follows the ...(BASED_HYPHEN)
101-102: Explain the config-key naming convention
The RFC introduces bothspark.datahub.streaming_platform(underscore) andspark.datahub.streaming.platform.<…>(dots). To avoid confusion, please either unify the naming scheme or explicitly call out why the patterns differ.
112-112: Specify fenced code block language
The example block at line 112 is missing a language hint. For better syntax highlighting, add thepropertiestag:- ``` + ```properties🧰 Tools
🪛 markdownlint-cli2 (0.17.2)
112-112: Fenced code blocks should have a language specified
null(MD040, fenced-code-language)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- active/000-spark-streaming-sink-source-platform.md(1 hunks)
🧰 Additional context used
🪛 LanguageTool
active/000-spark-streaming-sink-source-platform.md
[uncategorized] ~22-~22: This expression is usually spelled with a hyphen.
Context: ...ec which is defined in the Configuring Iceberg based dataset URNs section below. Within the...
(BASED_HYPHEN)
[uncategorized] ~77-~77: This expression is usually spelled with a hyphen.
Context: ...;         }     } } ``` ### Configuring Iceberg based dataset URNs  This section follows the ...
(BASED_HYPHEN)
🪛 markdownlint-cli2 (0.17.2)
active/000-spark-streaming-sink-source-platform.md
112-112: Fenced code blocks should have a language specified
null
(MD040, fenced-code-language)
🔇 Additional comments (1)
active/000-spark-streaming-sink-source-platform.md (1)
57-63: Verify fallback configuration mapping
The code checksgetOpenLineageConf().getStreamingPlatform()fornullbefore using it as a fallback. Please confirm thatgetStreamingPlatform()correctly reads thespark.datahub.streaming_platformSpark config as documented—and update the RFC to document this mapping explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (4)
active/000-spark-streaming-sink-source-platform.md (4)
1-4: Ensure RFC metadata completeness
The header fields for "Discussion Issue" and "Implementation PR(s)" are placeholders. Please either populate them with valid links/values or remove the entries to avoid confusion.
24-29: Add JavaDoc to explain the overload behavior
Consider documenting the new one-argument overload in Javadoc (or in code comments) to clarify that it delegates to the three-argument version withisSink=false. This will help future readers understand the intended fallback semantics.
33-36: Refine the regex to anchor bracket parsing and use full-string matching
The current"(.*?)\\[(.*)]"combined withm.find()may over-match on nested or partial bracketed strings. To ensure the entire description matches the expected format, consider:- String pattern = "(.*?)\\[(.*)]"; + private static final String STREAM_DESC_REGEX = "^(.+?)\\[(.+)]$"; ... - Matcher m = r.matcher(description); + Matcher m = COMPILED_STREAM_DESC_PATTERN.matcher(description);And replace
m.find()withm.matches().
33-35: Move regex compilation to a static constant
Currently, the pattern is recompiled on every invocation. For better performance and readability, define it once as aprivate static final Patternat class scope.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- active/000-spark-streaming-sink-source-platform.md(1 hunks)
🔇 Additional comments (1)
active/000-spark-streaming-sink-source-platform.md (1)
57-65: Verify mapping of Spark config key togetStreamingPlatform()
The fallback looks upsparkLineageConf.getOpenLineageConf().getStreamingPlatform(). Please confirm that this method indeed reads fromspark.datahub.streaming_platform(underscore) and not a different key (e.g., with dots). If there’s a mismatch, update either the code or the documentation accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
active/000-spark-streaming-sink-source-platform.md (1)
40-40: Anchor the regex for more robust matchingThe pattern
"(.*?)\\[(.*)]"is unanchored and may over‐match when descriptions include multiple or nested brackets. Consider anchoring it to match the full string:- String pattern = "(.*?)\\[(.*)]"; + String pattern = "^(.+?)\\[(.+)]$";
🧹 Nitpick comments (2)
active/000-spark-streaming-sink-source-platform.md (2)
14-14: Reduce wordiness in motivation sectionConsider simplifying “in order to extract the data platform” to “to extract the data platform” for more concise phrasing.
🧰 Tools
🪛 LanguageTool
[style] ~14-~14: Consider a shorter alternative to avoid wordiness.
Context: ...dentifiable prefixes, such as Kafka[…], in order to extract the data platform. However, pla...(IN_ORDER_TO_PREMIUM)
117-125: Use standard properties syntax in examplesThe example uses colons (
key : value), but Java properties conventionally use the equals sign (key=value). For example:- spark.datahub.streaming.platform.iceberg.stream1.env : PROD + spark.datahub.streaming.platform.iceberg.stream1.env=PROD
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- active/000-spark-streaming-sink-source-platform.md(1 hunks)
🧰 Additional context used
🪛 LanguageTool
active/000-spark-streaming-sink-source-platform.md
[style] ~14-~14: Consider a shorter alternative to avoid wordiness.
Context: ...dentifiable prefixes, such as Kafka[…], in order to extract the data platform. However, pla...
(IN_ORDER_TO_PREMIUM)
🔇 Additional comments (1)
active/000-spark-streaming-sink-source-platform.md (1)
1-5: Add Implementation PR(s) when availableThe header’s “Implementation PR(s)” field is empty. Please fill in this link once the corresponding code changes are merged to help readers track the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
active/000-spark-streaming-sink-source-platform.md (4)
3-4: Clean up front-matter placeholders.The fields “Discussion Issue: (…)" and “Implementation PR(s): (…)" are left as generic placeholders. If there is no prior GitHub issue or linked PR, consider removing these lines or explicitly marking them as “None” to avoid confusion.
14-14: Streamline wording.Consider replacing “in order to extract the data platform” with “to extract the data platform” for improved conciseness.
🧰 Tools
🪛 LanguageTool
[style] ~14-~14: Consider a shorter alternative to avoid wordiness.
Context: ...dentifiable prefixes, such as Kafka[…], in order to extract the data platform. However, pla...(IN_ORDER_TO_PREMIUM)
64-77: Improve error visibility on fallback URN creation.In the fallback branch you catch only
InstantiationExceptionand swallow it:} catch (InstantiationException e) { return Optional.empty(); }It’s helpful to log these failures so users can diagnose misconfigurations. For example:
- } catch (InstantiationException e) { - return Optional.empty(); - } + } catch (InstantiationException e) { + log.error("Failed to instantiate CatalogTableDataset for description {} (isSink={}): {}", description, isSink, e.getMessage(), e); + return Optional.empty(); + }This will aid debugging if the configured platform instance or alias is invalid.
109-113: Clarify howstreaming_aliasis selected.The doc refers to a “streaming_alias” for grouping multiple streams, but it’s not clear how aliases are discovered or applied. Consider adding a brief section on how to configure or enumerate aliases (e.g., via a comma-separated
spark.datahub.streaming.platform.<platform>.aliasesproperty), and how the code picks the correct<alias>block when constructing URNs.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
- active/000-spark-streaming-sink-source-platform.md(1 hunks)
🧰 Additional context used
🪛 LanguageTool
active/000-spark-streaming-sink-source-platform.md
[style] ~14-~14: Consider a shorter alternative to avoid wordiness.
Context: ...dentifiable prefixes, such as Kafka[…], in order to extract the data platform. However, pla...
(IN_ORDER_TO_PREMIUM)
| String pattern = "(.*?)\\[(.*)]"; | ||
| Pattern r = Pattern.compile(pattern); | ||
| Matcher m = r.matcher(description); | ||
| if (m.find()) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Anchor the regex to prevent over-matching.
The pattern
String pattern = "(.*?)\\[(.*)]";is unanchored and could over-match when there are multiple or nested brackets in the description. To ensure you only match the full string and capture minimal groups, anchor the expression, for example:
- String pattern = "(.*?)\\[(.*)]";
+ String pattern = "^(.+?)\\[(.+)]$";This change guards against unintended partial matches.
Summary by CodeRabbit
New Features
Documentation