Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions active/000-spark-streaming-sink-source-platform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
- Start Date: 2025-04-21
- RFC PR: [#11](https://github.com/datahub-project/rfcs/pull/11)
- Discussion Issue: (GitHub issue this was discussed in before the RFC, if any)
- Implementation PR(s): (leave this empty)

# Spark Streaming Sink/Source Platform

## Summary

Introduce configurable support for specifying the data platform of Spark Structured Streaming sources and sinks.

## Motivation

This RFC addresses an issue encountered when capturing data lineage in DataHub with Spark Structured Streaming. In the DataHub [codebase](https://github.com/datahub-project/datahub/blob/master/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java#L145), a regular expression matcher expects source descriptions to contain identifiable prefixes, such as Kafka[…], in order to extract the data platform. However, platforms like Iceberg do not use such prefixes (e.g., iceberg[…]), leading to DataHub's failure to detect the platform, resulting in missing lineage.

## Requirements

- Support configuration of the data platform for a streaming source or sink within a Spark job.
- Use these configurations as fallbacks when regex-based extraction fails.

## Detailed design

Propose adding the following Spark configuration:

- `spark.datahub.streaming.platform.instance` – explicitly specifies the data platform when automatic detection fails.

This configuration will be checked in the `generateUrnFromStreamingDescription` method within `SparkStreamingEventToDatahub.java`. If the regex pattern fails to identify the platform, and this configuration is set, its value will be used to construct the dataset URN.

Example implementation:
```java
public static Optional<DatasetUrn> generateUrnFromStreamingDescription(
String description, SparkLineageConf sparkLineageConf) {
return SparkStreamingEventToDatahub.generateUrnFromStreamingDescription(
description, sparkLineageConf, false);
}
```
```java
public static Optional<DatasetUrn> generateUrnFromStreamingDescription(String description,
SparkLineageConf sparkLineageConf, boolean isSink) {
String pattern = "(.*?)\\[(.*)]";
Pattern r = Pattern.compile(pattern);
Matcher m = r.matcher(description);
if (m.find()) {
Comment on lines +40 to +43
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Anchor the regex to prevent over-matching.

The pattern

String pattern = "(.*?)\\[(.*)]";

is unanchored and could over-match when there are multiple or nested brackets in the description. To ensure you only match the full string and capture minimal groups, anchor the expression, for example:

- String pattern = "(.*?)\\[(.*)]";
+ String pattern = "^(.+?)\\[(.+)]$";

This change guards against unintended partial matches.

String namespace = m.group(1);
String platform = getDatahubPlatform(namespace);
String path = m.group(2);
log.debug("Streaming description Platform: {}, Path: {}", platform, path);
if (platform.equals(KAFKA_PLATFORM)) {
path = getKafkaTopicFromPath(m.group(2));
} else if (platform.equals(FILE_PLATFORM) || platform.equals(DELTA_LAKE_PLATFORM)) {
try {
DatasetUrn urn = HdfsPathDataset.create(new URI(path), sparkLineageConf.getOpenLineageConf()).urn();
return Optional.of(urn);
} catch (InstantiationException e) {
return Optional.empty();
} catch (URISyntaxException e) {
log.error("Failed to parse path {}", path, e);
return Optional.empty();
}
}
return Optional.of(
new DatasetUrn(new DataPlatformUrn(platform), path, sparkLineageConf.getOpenLineageConf().getFabricType()));
} else {
if (sparkLineageConf.getOpenLineageConf().getStreamingPlatformInstance() != null) {
try {
CatalogTableDataset catalogTableDataset =
CatalogTableDataset.create(sparkLineageConf.getOpenLineageConf(), description,
isSink ? "sink" : "source");
if (catalogTableDataset == null) {
return Optional.empty();
} else {
DatasetUrn urn = catalogTableDataset.urn();
return Optional.of(urn);
}
} catch (InstantiationException e) {
return Optional.empty();
}
} else {
return Optional.empty();
}
}
}
```
### Configuring Iceberg-based dataset URNs

This section follows the approach described in [Configuring Hdfs based dataset URNs](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns)

Spark emits lineage between datasets. It has its own logic for generating URNs. Python sources emit metadata of datasets. To link these two, the URNs generated by both must match. This section helps ensure compatibility by aligning URN generation logic.

By default, URNs are created using the template:
`urn:li:dataset:(urn:li:dataPlatform:<$platform>,<$platformInstance>.<$name>,<$env>)`
Each of these fields can be configured to generate the desired URN.

**Platform**:
The platform is explicitly supported through the new Spark configuration key:
- `spark.datahub.streaming.platform.instance`

Platforms that do not set this configuration will default to `null`.

**Name**:
Defaults to the full table path in the streaming description.

**Platform Instance and Env:**
The default value for `env` is `'PROD'`, and `platformInstance` is `null`. These values can be overridden using:
```properties
spark.datahub.streaming.platform.<platform>.<alias>.platformInstance
spark.datahub.streaming.platform.<platform>.<alias>.env
```
If Spark is processing data from different environments or platform instances, the `streaming_alias` allows for alias-based overrides of platform-specific settings.

**Configuration Keys**
Below is a summary of configuration options for per-alias streaming platform customization:
- `spark.datahub.streaming.platform.<platform>.<alias>.platformInstance`
Sets the platform instance name to be used in the URN (e.g., `stream1`, `catalog`).
- `spark.datahub.streaming.platform.<platform>.<alias>.env`
Sets the environment name to be used in the URN (e.g., `PROD`, `DEV`).
- `spark.datahub.streaming.platform.<platform>.<alias>.usePlatformInstance`
<br>(Optional, default: `false`)
<br>If set to `true`, the platform instance will be injected into the table name. This is useful for ensuring that the lineage reflects the correct platform instance when using a shared catalog or namespace. For example, `stream2.namespace.table` will be rewritten as `catalog.namespace.table` in the URN if `platformInstance=catalog`.
- `spark.datahub.streaming.platform.<platform>.<alias>.streaming.io.platform.type`
Indicates whether the alias represents a streaming `"source"` or `"sink"`.

**Example:**
```properties
spark.datahub.streaming.platform.iceberg.stream1.env : PROD
spark.datahub.streaming.platform.iceberg.stream1.streaming.io.platform.type : source
spark.datahub.streaming.platform.iceberg.stream1.platformInstance : stream1

spark.datahub.streaming.platform.iceberg.stream2.env : DEV
spark.datahub.streaming.platform.iceberg.stream2.streaming.io.platform.type : sink
spark.datahub.streaming.platform.iceberg.stream2.platformInstance : catalog
spark.datahub.streaming.platform.iceberg.stream2.usePlatformInstance : true
```
In this example, `stream2.namespace.table` will be rewritten as `catalog.namespace.table` when `usePlatformInstance = true`, allowing lineage to reflect the correct platform instance. The default behavior is `false`, meaning the platform instance is not injected into the table name.