Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8d49e2f
Added initial implementation for writing lineage messages using HDFS …
rkrumins Oct 12, 2025
a35a99c
Added creation of the base customLineagePath directory ensuring linea…
rkrumins Oct 12, 2025
7a824bf
Updated documentation for Hdfs Dispatcher in spline.default.yaml file
rkrumins Oct 12, 2025
96373b4
Added initial integration tests for HDFS Lineage Dispatcher with cent…
rkrumins Oct 12, 2025
2b01b81
Aliging tests to be running same evaluation as when using default mode
rkrumins Oct 12, 2025
6f0ecdf
Minor cleanup in HDFSLineageDispatcher
rkrumins Oct 12, 2025
b064041
Fixed issue with unmatched types for resolveLineagePath in HDFSLineag…
rkrumins Oct 12, 2025
578c876
Fixing issues as per SonarQube for HDFSLineageDispatcher
rkrumins Oct 12, 2025
bebf4c9
Fix for the issue in fsScheme!
rkrumins Oct 12, 2025
04eebe3
Constant for file extension in HDFSLineageDispatcherSpec
rkrumins Oct 12, 2025
5f63f61
Removed outputSource filename when writing file to new location and u…
rkrumins Oct 12, 2025
8bb78a1
Fixing issues as per static code analysis
rkrumins Oct 13, 2025
7a93705
Fixing issues in logic for HDFSLineageDispatcher
rkrumins Oct 13, 2025
32db7ac
Fixing issue with exception handling for mkdirs in HDFSLineageDispatcher
rkrumins Oct 13, 2025
30f2f40
Updated integration test for custom lineage path in HDFSLineageDispat…
rkrumins Oct 13, 2025
87936b8
Ensuring the edgecase with Spark AppName containing non-standard char…
rkrumins Oct 13, 2025
4e699dd
HDFSLineageDispatcherSpec debug for failing integration test
rkrumins Oct 13, 2025
8a4b431
HDFSLineageDispatcherSpec debug for failing integration test
rkrumins Oct 13, 2025
6bc068d
Fixing issues in HDFSLineageDispatcherSpec
rkrumins Oct 13, 2025
d155a98
Fixing issues in HDFSLineageDispatcherSpec
rkrumins Oct 13, 2025
4c7d723
Fixed integration test and changed the filename to avoid clashed from…
rkrumins Oct 13, 2025
cd535c9
Added more robust check to ensure no _LINEAGE file is created
rkrumins Oct 13, 2025
0e5c69a
Added getOrElse when obtaining planId in HDFSLineageDispatcher
rkrumins Oct 13, 2025
bebc869
Added getOrElse when obtaining planId in HDFSLineageDispatcher
rkrumins Oct 13, 2025
7bd5b0a
Updated spline.default.yaml as per up-to-date details for HDFSLineage…
rkrumins Oct 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion core/src/main/resources/spline.default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,28 @@ spline:
fileBufferSize: 4096
# file/directory permissions, provided as umask string, either in octal or symbolic format
filePermissions: 777

customLineagePath:
# OPTIONAL: Custom path for centralized lineage storage.
# If left empty, null, or not specified → DEFAULT MODE: lineage written alongside target data files
# If set to a path → CENTRALIZED MODE: all lineage written to this location with unique filenames
#
# CENTRALIZED MODE filename format: {timestamp}_{appName}_{appId}
# - timestamp: Human-readable UTC timestamp (yyyy-MM-dd_HH-mm-ss-SSS) for natural chronological sorting and easy filtering
# Example: 2025-10-12_14-30-45-123
# - appName: Spark application name for easy identification of which job generated the lineage
# Example: MySparkJob
# - appId: Spark application ID for traceability to specific runs
# Example: app-20251012143045-0001
#
# Examples (assuming app name is "MySparkJob"):
# - Local: customLineagePath: /my/centralized/lineage
# Output: /my/centralized/lineage/2025-10-12_14-30-45-123_MySparkJob_app-20251012143045-0001
# - S3: customLineagePath: s3://my-bucket/lineage
# Output: s3://my-bucket/lineage/2025-10-12_14-30-45-123_MySparkJob_app-20251012143045-0001
# - GCS: customLineagePath: gs://my-bucket/lineage
# Output: gs://my-bucket/lineage/2025-10-12_14-30-45-123_MySparkJob_app-20251012143045-0001
# - HDFS: customLineagePath: hdfs://cluster/lineage
# Output: hdfs://cluster/lineage/2025-10-12_14-30-45-123_MySparkJob_app-20251012143045-0001
# -------------------------------------------
# Open Lineage HTTP dispatcher
# -------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import za.co.absa.spline.harvester.json.HarvesterJsonSerDe
import za.co.absa.spline.producer.model.{ExecutionEvent, ExecutionPlan}

import java.net.URI
import java.time.format.DateTimeFormatter
import java.time.{Instant, ZoneId}
import scala.concurrent.blocking
import scala.util.{Try, Success, Failure}

/**
* A port of https://github.com/AbsaOSS/spline/tree/release/0.3.9/persistence/hdfs/src/main/scala/za/co/absa/spline/persistence/hdfs
Expand All @@ -40,16 +43,34 @@ import scala.concurrent.blocking
* for every generic use case in a real production application.
*
* It is NOT thread-safe, strictly synchronous assuming a predefined order of method calls: `send(plan)` and then `send(event)`
*
* TWO MODES OF OPERATION:
*
* 1. DEFAULT MODE (customLineagePath = None, null, or empty string):
* Lineage files are written alongside the target data files.
* Example: Writing to /data/output/file.parquet creates /data/output/_LINEAGE
*
* 2. CENTRALIZED MODE (customLineagePath set to a valid path):
* All lineage files are written to a single centralized location with unique filenames.
* Filename format: {timestamp}_{appName}_{appId}
* - timestamp: Human-readable UTC timestamp (yyyy-MM-dd_HH-mm-ss-SSS) for chronological sorting and filtering
* - appName: Spark application name for easy identification
* - appId: Spark application ID for traceability
*
* The timestamp-first format ensures natural chronological sorting and easy date-based filtering.
* Parent directories are automatically created with proper permissions for multi-user access (HDFS/local).
* For object storage (S3, GCS, Azure), directory creation is skipped since they use key prefixes.
*/
@Experimental
class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSize: Int)
class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSize: Int, customLineagePath: Option[String])
extends LineageDispatcher
with Logging {

def this(conf: Configuration) = this(
filename = conf.getRequiredString(FileNameKey),
permission = new FsPermission(conf.getRequiredObject(FilePermissionsKey).toString),
bufferSize = conf.getRequiredInt(BufferSizeKey)
bufferSize = conf.getRequiredInt(BufferSizeKey),
customLineagePath = conf.getOptionalString(CustomLineagePathKey).map(_.trim).filter(_.nonEmpty)
)

@volatile
Expand All @@ -67,7 +88,7 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi
throw new IllegalStateException("send(event) must be called strictly after send(plan) method with matching plan ID")

try {
val path = s"${this._lastSeenPlan.operations.write.outputSource.stripSuffix("/")}/$filename"
val path = resolveLineagePath()
val planWithEvent = Map(
"executionPlan" -> this._lastSeenPlan,
"executionEvent" -> event
Expand All @@ -80,10 +101,113 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi
}
}

/**
* Resolves the lineage file path based on configuration.
* If customLineagePath is specified, lineage files are written to that centralized location.
* Otherwise, lineage files are written alongside the target data file (current behavior).
*
* @return The full path where the lineage file should be written
*/
private def resolveLineagePath(): String = {
val outputSource = s"${this._lastSeenPlan.operations.write.outputSource}"
customLineagePath match {
case Some(customPath) =>
// Centralized mode: write to custom path with unique filename
val cleanCustomPath = customPath.stripSuffix("/")
val uniqueFilename = generateUniqueFilename()
s"$cleanCustomPath/$uniqueFilename"
case None =>
// Default mode: write alongside target data file
s"${outputSource.stripSuffix("/")}/$filename"
}
}

/**
* Generates a unique filename for centralized lineage storage.
*
* Format: {timestamp}_{appName}_{appId}
* Example: 2025-10-12_14-30-45-123_MySparkJob_app-20251012143045-0001
*
* This format optimizes for operational debugging use cases:
* - Timestamp FIRST: Ensures natural chronological sorting (most recent files appear together)
* - Application Name: Easy identification of which job generated the lineage
* - Application ID: Full traceability to specific Spark application run
*
* @return A unique filename optimized for filtering and sorting
*/
private def generateUniqueFilename(): String = {
val sparkContext = SparkContext.getOrCreate()
val appName = sparkContext.appName.replaceAll("[^a-zA-Z0-9_-]", "_")
val appId = sparkContext.applicationId
val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
val timestamp = dateFormatter.format(Instant.now())
s"${timestamp}_${appName}_${appId}"
}
Comment on lines 147 to 156
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Sanitize appName and appId to prevent filesystem path issues.

The Spark application name can contain spaces, slashes, or special characters that may cause filesystem path issues. While the filename format is well-designed for sorting, unsanitized names can break path construction.

Apply this diff to sanitize the application metadata:

   private def generateUniqueFilename(): String = {
     val sparkContext = SparkContext.getOrCreate()
-    val appName = sparkContext.appName
-    val appId = sparkContext.applicationId
+    val appName = sparkContext.appName.replaceAll("[^a-zA-Z0-9_-]", "_")
+    val appId = sparkContext.applicationId.replaceAll("[^a-zA-Z0-9_-]", "_")
     val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
     val timestamp = dateFormatter.format(Instant.now())
     s"${timestamp}_${appName}_${appId}"
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private def generateUniqueFilename(): String = {
val sparkContext = SparkContext.getOrCreate()
val appName = sparkContext.appName
val appId = sparkContext.applicationId
val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
val timestamp = dateFormatter.format(Instant.now())
s"${timestamp}_${appName}_${appId}"
}
private def generateUniqueFilename(): String = {
val sparkContext = SparkContext.getOrCreate()
val appName = sparkContext.appName.replaceAll("[^a-zA-Z0-9_-]", "_")
val appId = sparkContext.applicationId.replaceAll("[^a-zA-Z0-9_-]", "_")
val dateFormatter = DateTimeFormatter
.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS")
.withZone(ZoneId.of("UTC"))
val timestamp = dateFormatter.format(Instant.now())
s"${timestamp}_${appName}_${appId}"
}
🤖 Prompt for AI Agents
In
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala
around lines 137 to 144, the generated filename uses raw sparkContext.appName
and sparkContext.applicationId which may contain spaces, slashes or special
characters that break filesystem paths; sanitize both values before composing
the filename by replacing or removing unsafe characters (e.g., replace
non-alphanumeric, dash, underscore characters with underscore), trim length if
necessary, and then build the filename using the sanitized appName and appId so
the resulting string is safe for use in HDFS paths.


/**
* Ensures that parent directories exist with proper permissions for centralized lineage storage.
* This is only called when customLineagePath is specified and is critical for multi-user access.
*
* Note: Object storage systems (S3, GCS, Azure Blob) don't have true directories - they use key prefixes.
* Directory creation is automatically skipped for these systems to avoid unnecessary operations.
*
* @param fs The Hadoop FileSystem
* @param path The target file path whose parent directories should be created
*/
private def ensureParentDirectoriesExist(fs: FileSystem, path: Path): Unit = {
// Object storage systems (S3, GCS, Azure Blob) don't have real directories - they're just key prefixes
// Skip directory creation for these systems to avoid unnecessary operations
val fsScheme = fs.getUri.getScheme
val scheme = Option(fsScheme).map(_.toLowerCase(java.util.Locale.ROOT)).orNull
val isObjectStorage = scheme != null && (
scheme.startsWith("s3") || // S3: s3, s3a, s3n
scheme.startsWith("gs") || // Google Cloud Storage: gs
scheme.startsWith("wasb") || // Azure Blob Storage: wasb, wasbs
scheme.startsWith("abfs") || // Azure Data Lake Storage Gen2: abfs, abfss
scheme.startsWith("adl") // Azure Data Lake Storage Gen1: adl
)

if (isObjectStorage) {
logDebug(s"Skipping directory creation for object storage filesystem ($scheme) - directories are implicit key prefixes")
} else {
logDebug(s"Ensuring parent directories exist for centralized lineage storage: $path")
val parentDir = path.getParent
if (parentDir != null && !fs.exists(parentDir)) {
Try {
// Create directories with multi-user friendly permissions to allow all service accounts to write
// This uses the same permission object that's already configured for files
val created = fs.mkdirs(parentDir, permission)
if (created) {
logInfo(s"Created parent directories: $parentDir with permissions $permission")
}
} match {
case Success(_) =>
// Directory creation succeeded
case Failure(e: org.apache.hadoop.fs.FileAlreadyExistsException) =>
// Race condition: another process created the directory - this is fine
logDebug(s"Directory $parentDir already exists (created by another process)")
case Failure(e: RuntimeException) =>
logWarning(s"Failed to create parent directories: $parentDir", e)
throw e
case Failure(e) =>
// Handle any other exceptions (IOException, etc.)
logWarning(s"Failed to create parent directories: $parentDir", e)
throw new RuntimeException(s"Failed to create parent directories: $parentDir", e)
}
}
}
}

private def persistToHadoopFs(content: String, fullLineagePath: String): Unit = blocking {
val (fs, path) = pathStringToFsWithPath(fullLineagePath)
logDebug(s"Opening HadoopFs output stream to $path")

// Only ensure parent directories exist when using centralized mode (customLineagePath is specified)
// In default mode, the parent directories should already exist as they're alongside the data files
if (customLineagePath.isDefined) {
ensureParentDirectoriesExist(fs, path)
}

val replication = fs.getDefaultReplication(path)
val blockSize = fs.getDefaultBlockSize(path)
val outputStream = fs.create(path, permission, true, bufferSize, replication, blockSize, null)
Expand All @@ -104,6 +228,7 @@ object HDFSLineageDispatcher {
private val FileNameKey = "fileName"
private val FilePermissionsKey = "filePermissions"
private val BufferSizeKey = "fileBufferSize"
private val CustomLineagePathKey = "customLineagePath"

/**
* Converts string full path to Hadoop FS and Path, e.g.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import za.co.absa.spline.commons.version.Version._
import za.co.absa.spline.harvester.json.HarvesterJsonSerDe.impl._
import za.co.absa.spline.test.fixture.SparkFixture
import za.co.absa.spline.test.fixture.spline.SplineFixture

import org.apache.spark.sql.SparkSession
import java.io.File

class HDFSLineageDispatcherSpec
Expand All @@ -37,15 +37,21 @@ class HDFSLineageDispatcherSpec

behavior of "HDFSLineageDispatcher"

it should "save lineage file to a filesystem" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
val lineageDispatcherConfigKeyName = "spark.spline.lineageDispatcher"
val lineageDispatcherConfigValueName = "hdfs"
val lineageDispatcherConfigClassNameKeyName = s"$lineageDispatcherConfigKeyName.$lineageDispatcherConfigValueName.className"
val lineageDispatcherConfigCustomLineagePathKeyName = s"$lineageDispatcherConfigKeyName.$lineageDispatcherConfigValueName.customLineagePath"
val destFilePathExtension = ".parquet"

it should "save lineage file to a filesystem in DEFAULT mode" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
withIsolatedSparkSession(_
.config("spark.spline.lineageDispatcher", "hdfs")
.config("spark.spline.lineageDispatcher.hdfs.className", classOf[HDFSLineageDispatcher].getName)
.config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName)
.config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName)
) { implicit spark =>
withLineageTracking { captor =>
import spark.implicits._
val dummyDF = Seq((1, 2)).toDF
val destPath = TempDirectory("spline_", ".parquet", pathOnly = true).deleteOnExit()
val destPath = TempDirectory("spline_", destFilePathExtension, pathOnly = true).deleteOnExit()

for {
(_, _) <- captor.lineageOf(dummyDF.write.save(destPath.asString))
Expand All @@ -63,4 +69,100 @@ class HDFSLineageDispatcherSpec
}
}

Seq(
("without customLineagePath config", None),
("with empty string customLineagePath", Some("")),
("with whitespace-only customLineagePath", Some(" "))
).foreach { case (scenarioDesc, customPathValue) =>
it should s"use DEFAULT mode $scenarioDesc" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
val builder = (b: SparkSession.Builder) => {
val configured = b
.config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName)
.config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName)
customPathValue.fold(configured)(path => configured.config(lineageDispatcherConfigCustomLineagePathKeyName, path))
}

withIsolatedSparkSession(builder) { implicit spark =>
withLineageTracking { captor =>
import spark.implicits._
val dummyDF = Seq((1, 2)).toDF
val destPath = TempDirectory("spline_", destFilePathExtension, pathOnly = true).deleteOnExit()

for {
(_, _) <- captor.lineageOf(dummyDF.write.save(destPath.asString))
} yield {
val lineageFile = new File(destPath.asString, "_LINEAGE")
lineageFile.exists should be(true)
lineageFile.length should be > 0L

val lineageJson = readFileToString(lineageFile, "UTF-8").fromJson[Map[String, Map[String, _]]]
lineageJson should contain key "executionPlan"
lineageJson should contain key "executionEvent"
lineageJson("executionPlan")("id") should equal(lineageJson("executionEvent")("planId"))
}
}
}
}
}

it should "save lineage files in a custom lineage path" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
val centralizedPath = TempDirectory("spline_centralized").deleteOnExit()

withIsolatedSparkSession(_
.config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName)
.config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName)
.config(lineageDispatcherConfigCustomLineagePathKeyName, centralizedPath.asString)
) { implicit spark =>
withLineageTracking { captor =>
import spark.implicits._

// Test with multiple data writes to verify unique filenames
val dummyDF1 = Seq((1, 2)).toDF
val dummyDF2 = Seq((3, 4)).toDF
val destPath1 = TempDirectory("spline_1_", destFilePathExtension, pathOnly = true).deleteOnExit()
val destPath2 = TempDirectory("spline_2_", destFilePathExtension, pathOnly = true).deleteOnExit()

for {
(_, _) <- captor.lineageOf(dummyDF1.write.save(destPath1.asString))
(_, _) <- captor.lineageOf(dummyDF2.write.save(destPath2.asString))
} yield {
val centralizedDir = new File(centralizedPath.asString)
centralizedDir.exists should be(true)
centralizedDir.isDirectory should be(true)

val appId = spark.sparkContext.applicationId
val appName = spark.sparkContext.appName
val appNameCleaned = appName.replaceAll("[^a-zA-Z0-9_-]", "_")

val lineageFiles = Option(centralizedDir.listFiles()).getOrElse(Array.empty[File])
println("lineageFiles: " + lineageFiles.map(_.getName).mkString(", "))

// Verify naming convention aligns with centralized lineage pattern (timestamp_appName_appId)
val filenamePattern = """\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-\d{3}_.+_.+""".r
lineageFiles.foreach { file =>
val name = file.getName
println(name)
withClue(s"Lineage filename '$name' should follow the timestamp_appName_appId pattern") {
filenamePattern.matches(name) shouldBe true
}
name should include (appId) and include (appNameCleaned)
}

// Verify each file has the correct format and content
lineageFiles.foreach { lineageFile =>
val lineageJson = readFileToString(lineageFile, "UTF-8").fromJson[Map[String, Map[String, _]]]
lineageJson should contain key "executionPlan"
lineageJson should contain key "executionEvent"
}

// Verify no lineage files in destination directories
val lineageFileInDest1 = new File(destPath1.asString, "_LINEAGE")
val lineageFileInDest2 = new File(destPath2.asString, "_LINEAGE")
lineageFileInDest1.exists should be(false)
lineageFileInDest2.exists should be(false)
}
}
}
}

}