diff --git a/core/src/main/resources/spline.default.yaml b/core/src/main/resources/spline.default.yaml index 66eef175..e458802d 100644 --- a/core/src/main/resources/spline.default.yaml +++ b/core/src/main/resources/spline.default.yaml @@ -146,7 +146,28 @@ spline: fileBufferSize: 4096 # file/directory permissions, provided as umask string, either in octal or symbolic format filePermissions: 777 - + customLineagePath: + # OPTIONAL: Custom path for centralized lineage storage. + # If left empty, null, or not specified → DEFAULT MODE: lineage written alongside target data files + # If set to a path → CENTRALIZED MODE: all lineage written to this location with unique filenames + # + # CENTRALIZED MODE filename format: {timestamp}_{planId}_{appId} + # - timestamp: Human-readable UTC timestamp (yyyy-MM-dd_HH-mm-ss-SSS) for natural chronological sorting and easy filtering + # Example: 2025-10-12_14-30-45-123 + # - planId: Execution plan UUID for guaranteed uniqueness (prevents collisions from concurrent writes) + # Example: 550e8400-e29b-41d4-a716-446655440000 + # - appId: Spark application ID for traceability to specific runs (correlates with Spark UI, logs, monitoring) + # Example: app-20251012143045-0001 + # + # Examples: + # - Local: customLineagePath: /my/centralized/lineage + # Output: /my/centralized/lineage/2025-10-12_14-30-45-123_550e8400-e29b-41d4-a716-446655440000_app-20251012143045-0001 + # - S3: customLineagePath: s3://my-bucket/lineage + # Output: s3://my-bucket/lineage/2025-10-12_14-30-45-123_550e8400-e29b-41d4-a716-446655440000_app-20251012143045-0001 + # - GCS: customLineagePath: gs://my-bucket/lineage + # Output: gs://my-bucket/lineage/2025-10-12_14-30-45-123_550e8400-e29b-41d4-a716-446655440000_app-20251012143045-0001 + # - HDFS: customLineagePath: hdfs://cluster/lineage + # Output: hdfs://cluster/lineage/2025-10-12_14-30-45-123_550e8400-e29b-41d4-a716-446655440000_app-20251012143045-0001 # ------------------------------------------- # Open Lineage HTTP dispatcher # ------------------------------------------- diff --git a/core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala b/core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala index a52611af..b975a3b1 100644 --- a/core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala +++ b/core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala @@ -30,7 +30,10 @@ import za.co.absa.spline.harvester.json.HarvesterJsonSerDe import za.co.absa.spline.producer.model.{ExecutionEvent, ExecutionPlan} import java.net.URI +import java.time.format.DateTimeFormatter +import java.time.{Instant, ZoneId} import scala.concurrent.blocking +import scala.util.{Try, Success, Failure} /** * A port of https://github.com/AbsaOSS/spline/tree/release/0.3.9/persistence/hdfs/src/main/scala/za/co/absa/spline/persistence/hdfs @@ -40,16 +43,34 @@ import scala.concurrent.blocking * for every generic use case in a real production application. * * It is NOT thread-safe, strictly synchronous assuming a predefined order of method calls: `send(plan)` and then `send(event)` + * + * TWO MODES OF OPERATION: + * + * 1. DEFAULT MODE (customLineagePath = None, null, or empty string): + * Lineage files are written alongside the target data files. + * Example: Writing to /data/output/file.parquet creates /data/output/_LINEAGE + * + * 2. CENTRALIZED MODE (customLineagePath set to a valid path): + * All lineage files are written to a single centralized location with unique filenames. + * Filename format: {timestamp}_{planId}_{appId} + * - timestamp: Human-readable UTC timestamp (yyyy-MM-dd_HH-mm-ss-SSS) for chronological sorting and filtering + * - planId: Execution plan UUID for guaranteed uniqueness (prevents collisions from concurrent writes) + * - appId: Spark application ID for traceability to Spark UI, logs, and monitoring systems + * + * The timestamp-first format ensures natural chronological sorting and easy date-based filtering. + * Parent directories are automatically created with proper permissions for multi-user access (HDFS/local). + * For object storage (S3, GCS, Azure), directory creation is skipped since they use key prefixes. */ @Experimental -class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSize: Int) +class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSize: Int, customLineagePath: Option[String]) extends LineageDispatcher with Logging { def this(conf: Configuration) = this( filename = conf.getRequiredString(FileNameKey), permission = new FsPermission(conf.getRequiredObject(FilePermissionsKey).toString), - bufferSize = conf.getRequiredInt(BufferSizeKey) + bufferSize = conf.getRequiredInt(BufferSizeKey), + customLineagePath = conf.getOptionalString(CustomLineagePathKey).map(_.trim).filter(_.nonEmpty) ) @volatile @@ -67,7 +88,7 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi throw new IllegalStateException("send(event) must be called strictly after send(plan) method with matching plan ID") try { - val path = s"${this._lastSeenPlan.operations.write.outputSource.stripSuffix("/")}/$filename" + val path = resolveLineagePath() val planWithEvent = Map( "executionPlan" -> this._lastSeenPlan, "executionEvent" -> event @@ -80,10 +101,124 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi } } + /** + * Resolves the lineage file path based on configuration. + * If customLineagePath is specified, lineage files are written to that centralized location. + * Otherwise, lineage files are written alongside the target data file (current behavior). + * + * @return The full path where the lineage file should be written + */ + private def resolveLineagePath(): String = { + val outputSource = s"${this._lastSeenPlan.operations.write.outputSource}" + customLineagePath match { + case Some(customPath) => + // Centralized mode: write to custom path with unique filename + val cleanCustomPath = customPath.stripSuffix("/") + val uniqueFilename = generateUniqueFilename() + s"$cleanCustomPath/$uniqueFilename" + case None => + // Default mode: write alongside target data file + s"${outputSource.stripSuffix("/")}/$filename" + } + } + + /** + * Generates a unique filename for centralized lineage storage. + * + * Format: {timestamp}_{planId}_{appId} + * Example: 2025-10-12_14-30-45-123_550e8400-e29b-41d4-a716-446655440000_app-20251012143045-0001 + * + * This format optimizes for operational debugging use cases: + * - Timestamp FIRST: Ensures natural chronological sorting (most recent files appear together) + * - Plan ID: Guarantees uniqueness even for concurrent writes from the same application + * - Application ID: Enables immediate filtering by Spark application (correlates with Spark UI, logs, monitoring) + * + * The planId ensures zero collision risk even when: + * - Multiple writes happen in the same millisecond + * - The same application writes multiple datasets concurrently + * - Different applications run simultaneously + * + * The appId enables operators to quickly filter all lineage files from a specific Spark application run, + * making it trivial to correlate with Spark UI, logs, and monitoring systems where the human-readable + * application name is already available. + * + * @return A unique filename optimized for filtering, sorting, and guaranteed uniqueness + */ + private def generateUniqueFilename(): String = { + val sparkContext = SparkContext.getOrCreate() + val planId = this._lastSeenPlan.id.getOrElse( + throw new IllegalStateException("Execution plan ID is missing") + ).toString + val appId = sparkContext.applicationId + val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC")) + val timestamp = dateFormatter.format(Instant.now()) + s"${timestamp}_${planId}_${appId}" + } + + /** + * Ensures that parent directories exist with proper permissions for centralized lineage storage. + * This is only called when customLineagePath is specified and is critical for multi-user access. + * + * Note: Object storage systems (S3, GCS, Azure Blob) don't have true directories - they use key prefixes. + * Directory creation is automatically skipped for these systems to avoid unnecessary operations. + * + * @param fs The Hadoop FileSystem + * @param path The target file path whose parent directories should be created + */ + private def ensureParentDirectoriesExist(fs: FileSystem, path: Path): Unit = { + // Object storage systems (S3, GCS, Azure Blob) don't have real directories - they're just key prefixes + // Skip directory creation for these systems to avoid unnecessary operations + val fsScheme = fs.getUri.getScheme + val scheme = Option(fsScheme).map(_.toLowerCase(java.util.Locale.ROOT)).orNull + val isObjectStorage = scheme != null && ( + scheme.startsWith("s3") || // S3: s3, s3a, s3n + scheme.startsWith("gs") || // Google Cloud Storage: gs + scheme.startsWith("wasb") || // Azure Blob Storage: wasb, wasbs + scheme.startsWith("abfs") || // Azure Data Lake Storage Gen2: abfs, abfss + scheme.startsWith("adl") // Azure Data Lake Storage Gen1: adl + ) + + if (isObjectStorage) { + logDebug(s"Skipping directory creation for object storage filesystem ($scheme) - directories are implicit key prefixes") + } else { + logDebug(s"Ensuring parent directories exist for centralized lineage storage: $path") + val parentDir = path.getParent + if (parentDir != null && !fs.exists(parentDir)) { + Try { + // Create directories with multi-user friendly permissions to allow all service accounts to write + // This uses the same permission object that's already configured for files + val created = fs.mkdirs(parentDir, permission) + if (created) { + logInfo(s"Created parent directories: $parentDir with permissions $permission") + } + } match { + case Success(_) => + // Directory creation succeeded + case Failure(e: org.apache.hadoop.fs.FileAlreadyExistsException) => + // Race condition: another process created the directory - this is fine + logDebug(s"Directory $parentDir already exists (created by another process)") + case Failure(e: RuntimeException) => + logWarning(s"Failed to create parent directories: $parentDir", e) + throw e + case Failure(e) => + // Handle any other exceptions (IOException, etc.) + logWarning(s"Failed to create parent directories: $parentDir", e) + throw new RuntimeException(s"Failed to create parent directories: $parentDir", e) + } + } + } + } + private def persistToHadoopFs(content: String, fullLineagePath: String): Unit = blocking { val (fs, path) = pathStringToFsWithPath(fullLineagePath) logDebug(s"Opening HadoopFs output stream to $path") + // Only ensure parent directories exist when using centralized mode (customLineagePath is specified) + // In default mode, the parent directories should already exist as they're alongside the data files + if (customLineagePath.isDefined) { + ensureParentDirectoriesExist(fs, path) + } + val replication = fs.getDefaultReplication(path) val blockSize = fs.getDefaultBlockSize(path) val outputStream = fs.create(path, permission, true, bufferSize, replication, blockSize, null) @@ -104,6 +239,7 @@ object HDFSLineageDispatcher { private val FileNameKey = "fileName" private val FilePermissionsKey = "filePermissions" private val BufferSizeKey = "fileBufferSize" + private val CustomLineagePathKey = "customLineagePath" /** * Converts string full path to Hadoop FS and Path, e.g. diff --git a/integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala b/integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala index a302fe73..ef49ab3c 100644 --- a/integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala +++ b/integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala @@ -26,7 +26,7 @@ import za.co.absa.spline.commons.version.Version._ import za.co.absa.spline.harvester.json.HarvesterJsonSerDe.impl._ import za.co.absa.spline.test.fixture.SparkFixture import za.co.absa.spline.test.fixture.spline.SplineFixture - +import org.apache.spark.sql.SparkSession import java.io.File class HDFSLineageDispatcherSpec @@ -37,15 +37,21 @@ class HDFSLineageDispatcherSpec behavior of "HDFSLineageDispatcher" - it should "save lineage file to a filesystem" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in { + val lineageDispatcherConfigKeyName = "spark.spline.lineageDispatcher" + val lineageDispatcherConfigValueName = "hdfs" + val lineageDispatcherConfigClassNameKeyName = s"$lineageDispatcherConfigKeyName.$lineageDispatcherConfigValueName.className" + val lineageDispatcherConfigCustomLineagePathKeyName = s"$lineageDispatcherConfigKeyName.$lineageDispatcherConfigValueName.customLineagePath" + val destFilePathExtension = ".parquet" + + it should "save lineage file to a filesystem in DEFAULT mode" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in { withIsolatedSparkSession(_ - .config("spark.spline.lineageDispatcher", "hdfs") - .config("spark.spline.lineageDispatcher.hdfs.className", classOf[HDFSLineageDispatcher].getName) + .config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName) + .config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName) ) { implicit spark => withLineageTracking { captor => import spark.implicits._ val dummyDF = Seq((1, 2)).toDF - val destPath = TempDirectory("spline_", ".parquet", pathOnly = true).deleteOnExit() + val destPath = TempDirectory("spline_", destFilePathExtension, pathOnly = true).deleteOnExit() for { (_, _) <- captor.lineageOf(dummyDF.write.save(destPath.asString)) @@ -63,4 +69,106 @@ class HDFSLineageDispatcherSpec } } + Seq( + ("without customLineagePath config", None), + ("with empty string customLineagePath", Some("")), + ("with whitespace-only customLineagePath", Some(" ")) + ).foreach { case (scenarioDesc, customPathValue) => + it should s"use DEFAULT mode $scenarioDesc" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in { + val builder = (b: SparkSession.Builder) => { + val configured = b + .config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName) + .config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName) + customPathValue.fold(configured)(path => configured.config(lineageDispatcherConfigCustomLineagePathKeyName, path)) + } + + withIsolatedSparkSession(builder) { implicit spark => + withLineageTracking { captor => + import spark.implicits._ + val dummyDF = Seq((1, 2)).toDF + val destPath = TempDirectory("spline_", destFilePathExtension, pathOnly = true).deleteOnExit() + + for { + (_, _) <- captor.lineageOf(dummyDF.write.save(destPath.asString)) + } yield { + val lineageFile = new File(destPath.asString, "_LINEAGE") + lineageFile.exists should be(true) + lineageFile.length should be > 0L + + val lineageJson = readFileToString(lineageFile, "UTF-8").fromJson[Map[String, Map[String, _]]] + lineageJson should contain key "executionPlan" + lineageJson should contain key "executionEvent" + lineageJson("executionPlan")("id") should equal(lineageJson("executionEvent")("planId")) + } + } + } + } + } + + it should "save lineage files in a custom lineage path" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in { + val centralizedPath = TempDirectory("spline_centralized").deleteOnExit() + centralizedPath.delete() + + withIsolatedSparkSession(_ + .config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName) + .config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName) + .config(lineageDispatcherConfigCustomLineagePathKeyName, centralizedPath.asString) + ) { implicit spark => + withLineageTracking { captor => + import spark.implicits._ + + // Test with multiple data writes to verify unique filenames + val dummyDF1 = Seq((1, 2)).toDF + val dummyDF2 = Seq((3, 4)).toDF + val destPath1 = TempDirectory("spline_1_", destFilePathExtension, pathOnly = true).deleteOnExit() + val destPath2 = TempDirectory("spline_2_", destFilePathExtension, pathOnly = true).deleteOnExit() + + for { + (_, _) <- captor.lineageOf(dummyDF1.write.save(destPath1.asString)) + (_, _) <- captor.lineageOf(dummyDF2.write.save(destPath2.asString)) + } yield { + val centralizedDir = new File(centralizedPath.asString) + centralizedDir.exists should be(true) + centralizedDir.isDirectory should be(true) + + val appId = spark.sparkContext.applicationId + + val lineageFiles = Option(centralizedDir.listFiles()).getOrElse(Array.empty[File]) + val lineageFilesOnly = lineageFiles.filter(f => f.isFile && !f.getName.endsWith(".crc")) + lineageFilesOnly.length should be(2) + + // Verify naming convention aligns with centralized lineage pattern (timestamp_planId_appId) + // Format: {timestamp}_{planId}_{appId} + // Example: 2025-10-12_14-30-45-123_550e8400-e29b-41d4-a716-446655440000_app-20251012143045-0001 + val filenamePattern = """\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-\d{3}_.+_.+""" + lineageFilesOnly.foreach { file => + val name = file.getName + withClue(s"Lineage filename '$name' should follow the timestamp_planId_appId pattern") { + name.matches(filenamePattern) shouldBe true + } + // Verify the appId appears in the filename (ends with appId) + withClue(s"Lineage filename '$name' should end with application ID '$appId'") { + name.endsWith(appId) shouldBe true + } + + } + + // Verify each file has the correct format and content + lineageFilesOnly.foreach { lineageFile => + val lineageJson = readFileToString(lineageFile, "UTF-8").fromJson[Map[String, Map[String, _]]] + lineageJson should contain key "executionPlan" + lineageJson should contain key "executionEvent" + } + + // Verify no lineage files in destination directories + val dest1Files = Option(new File(destPath1.asString).listFiles()).getOrElse(Array.empty[File]) + val dest2Files = Option(new File(destPath2.asString).listFiles()).getOrElse(Array.empty[File]) + + dest1Files.exists(_.getName.contains("_LINEAGE")) should be(false) + dest2Files.exists(_.getName.contains("_LINEAGE")) should be(false) + } + } + } + } + }