diff --git a/active/000-spark-streaming-sink-source-platform.md b/active/000-spark-streaming-sink-source-platform.md new file mode 100644 index 0000000..6156c23 --- /dev/null +++ b/active/000-spark-streaming-sink-source-platform.md @@ -0,0 +1,134 @@ +- Start Date: 2025-04-21 +- RFC PR: [#11](https://github.com/datahub-project/rfcs/pull/11) +- Discussion Issue: (GitHub issue this was discussed in before the RFC, if any) +- Implementation PR(s): (leave this empty) + +# Spark Streaming Sink/Source Platform + +## Summary + +Introduce configurable support for specifying the data platform of Spark Structured Streaming sources and sinks. + +## Motivation + +This RFC addresses an issue encountered when capturing data lineage in DataHub with Spark Structured Streaming. In the DataHub [codebase](https://github.com/datahub-project/datahub/blob/master/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java#L145), a regular expression matcher expects source descriptions to contain identifiable prefixes, such as Kafka[…], in order to extract the data platform. However, platforms like Iceberg do not use such prefixes (e.g., iceberg[…]), leading to DataHub's failure to detect the platform, resulting in missing lineage. + +## Requirements + +- Support configuration of the data platform for a streaming source or sink within a Spark job. +- Use these configurations as fallbacks when regex-based extraction fails. + +## Detailed design + +Propose adding the following Spark configuration: + +- `spark.datahub.streaming.platform.instance` – explicitly specifies the data platform when automatic detection fails. + +This configuration will be checked in the `generateUrnFromStreamingDescription` method within `SparkStreamingEventToDatahub.java`. If the regex pattern fails to identify the platform, and this configuration is set, its value will be used to construct the dataset URN. + +Example implementation: +```java + public static Optional generateUrnFromStreamingDescription( + String description, SparkLineageConf sparkLineageConf) { + return SparkStreamingEventToDatahub.generateUrnFromStreamingDescription( + description, sparkLineageConf, false); +} +``` +```java +public static Optional generateUrnFromStreamingDescription(String description, + SparkLineageConf sparkLineageConf, boolean isSink) { + String pattern = "(.*?)\\[(.*)]"; + Pattern r = Pattern.compile(pattern); + Matcher m = r.matcher(description); + if (m.find()) { + String namespace = m.group(1); + String platform = getDatahubPlatform(namespace); + String path = m.group(2); + log.debug("Streaming description Platform: {}, Path: {}", platform, path); + if (platform.equals(KAFKA_PLATFORM)) { + path = getKafkaTopicFromPath(m.group(2)); + } else if (platform.equals(FILE_PLATFORM) || platform.equals(DELTA_LAKE_PLATFORM)) { + try { + DatasetUrn urn = HdfsPathDataset.create(new URI(path), sparkLineageConf.getOpenLineageConf()).urn(); + return Optional.of(urn); + } catch (InstantiationException e) { + return Optional.empty(); + } catch (URISyntaxException e) { + log.error("Failed to parse path {}", path, e); + return Optional.empty(); + } + } + return Optional.of( + new DatasetUrn(new DataPlatformUrn(platform), path, sparkLineageConf.getOpenLineageConf().getFabricType())); + } else { + if (sparkLineageConf.getOpenLineageConf().getStreamingPlatformInstance() != null) { + try { + CatalogTableDataset catalogTableDataset = + CatalogTableDataset.create(sparkLineageConf.getOpenLineageConf(), description, + isSink ? "sink" : "source"); + if (catalogTableDataset == null) { + return Optional.empty(); + } else { + DatasetUrn urn = catalogTableDataset.urn(); + return Optional.of(urn); + } + } catch (InstantiationException e) { + return Optional.empty(); + } + } else { + return Optional.empty(); + } + } +} +``` +### Configuring Iceberg-based dataset URNs + +This section follows the approach described in [Configuring Hdfs based dataset URNs](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns) + +Spark emits lineage between datasets. It has its own logic for generating URNs. Python sources emit metadata of datasets. To link these two, the URNs generated by both must match. This section helps ensure compatibility by aligning URN generation logic. + +By default, URNs are created using the template: +`urn:li:dataset:(urn:li:dataPlatform:<$platform>,<$platformInstance>.<$name>,<$env>)` +Each of these fields can be configured to generate the desired URN. + +**Platform**: +The platform is explicitly supported through the new Spark configuration key: +- `spark.datahub.streaming.platform.instance` + +Platforms that do not set this configuration will default to `null`. + +**Name**: +Defaults to the full table path in the streaming description. + +**Platform Instance and Env:** +The default value for `env` is `'PROD'`, and `platformInstance` is `null`. These values can be overridden using: +```properties +spark.datahub.streaming.platform...platformInstance +spark.datahub.streaming.platform...env +``` +If Spark is processing data from different environments or platform instances, the `streaming_alias` allows for alias-based overrides of platform-specific settings. + +**Configuration Keys** +Below is a summary of configuration options for per-alias streaming platform customization: +- `spark.datahub.streaming.platform...platformInstance` +Sets the platform instance name to be used in the URN (e.g., `stream1`, `catalog`). +- `spark.datahub.streaming.platform...env` +Sets the environment name to be used in the URN (e.g., `PROD`, `DEV`). +- `spark.datahub.streaming.platform...usePlatformInstance` +
(Optional, default: `false`) +
If set to `true`, the platform instance will be injected into the table name. This is useful for ensuring that the lineage reflects the correct platform instance when using a shared catalog or namespace. For example, `stream2.namespace.table` will be rewritten as `catalog.namespace.table` in the URN if `platformInstance=catalog`. +- `spark.datahub.streaming.platform...streaming.io.platform.type` +Indicates whether the alias represents a streaming `"source"` or `"sink"`. + +**Example:** +```properties +spark.datahub.streaming.platform.iceberg.stream1.env : PROD +spark.datahub.streaming.platform.iceberg.stream1.streaming.io.platform.type : source +spark.datahub.streaming.platform.iceberg.stream1.platformInstance : stream1 + +spark.datahub.streaming.platform.iceberg.stream2.env : DEV +spark.datahub.streaming.platform.iceberg.stream2.streaming.io.platform.type : sink +spark.datahub.streaming.platform.iceberg.stream2.platformInstance : catalog +spark.datahub.streaming.platform.iceberg.stream2.usePlatformInstance : true +``` +In this example, `stream2.namespace.table` will be rewritten as `catalog.namespace.table` when `usePlatformInstance = true`, allowing lineage to reflect the correct platform instance. The default behavior is `false`, meaning the platform instance is not injected into the table name. \ No newline at end of file