-
Notifications
You must be signed in to change notification settings - Fork 104
Extending HDFS Lineage Dispatcher to support writing lineage files to a centralised location #893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 6 commits
8d49e2f
a35a99c
7a824bf
96373b4
2b01b81
6f0ecdf
b064041
578c876
bebf4c9
04eebe3
5f63f61
8bb78a1
7a93705
32db7ac
30f2f40
87936b8
4e699dd
8a4b431
6bc068d
d155a98
4c7d723
cd535c9
0e5c69a
bebc869
7bd5b0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -30,6 +30,8 @@ import za.co.absa.spline.harvester.json.HarvesterJsonSerDe | |||||||||||||||||||||||||||||||||
| import za.co.absa.spline.producer.model.{ExecutionEvent, ExecutionPlan} | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| import java.net.URI | ||||||||||||||||||||||||||||||||||
| import java.time.format.DateTimeFormatter | ||||||||||||||||||||||||||||||||||
| import java.time.{Instant, ZoneId} | ||||||||||||||||||||||||||||||||||
| import scala.concurrent.blocking | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||
|
|
@@ -40,16 +42,34 @@ import scala.concurrent.blocking | |||||||||||||||||||||||||||||||||
| * for every generic use case in a real production application. | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * It is NOT thread-safe, strictly synchronous assuming a predefined order of method calls: `send(plan)` and then `send(event)` | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * TWO MODES OF OPERATION: | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * 1. DEFAULT MODE (customLineagePath = None, null, or empty string): | ||||||||||||||||||||||||||||||||||
| * Lineage files are written alongside the target data files. | ||||||||||||||||||||||||||||||||||
| * Example: Writing to /data/output/file.parquet creates /data/output/_LINEAGE | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * 2. CENTRALIZED MODE (customLineagePath set to a valid path): | ||||||||||||||||||||||||||||||||||
| * All lineage files are written to a single centralized location with unique filenames. | ||||||||||||||||||||||||||||||||||
| * Filename format: {timestamp}_{fileName}_{appId} | ||||||||||||||||||||||||||||||||||
| * - timestamp: Human-readable UTC timestamp (yyyy-MM-dd_HH-mm-ss-SSS) for chronological sorting and filtering | ||||||||||||||||||||||||||||||||||
| * - fileName: The configured fileName value (e.g., "my_file.parq_LINEAGE") | ||||||||||||||||||||||||||||||||||
| * - appId: Spark application ID for traceability | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * The timestamp-first format ensures natural chronological sorting and easy date-based filtering. | ||||||||||||||||||||||||||||||||||
| * Parent directories are automatically created with proper permissions for multi-user access (HDFS/local). | ||||||||||||||||||||||||||||||||||
| * For object storage (S3, GCS, Azure), directory creation is skipped since they use key prefixes. | ||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| @Experimental | ||||||||||||||||||||||||||||||||||
| class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSize: Int) | ||||||||||||||||||||||||||||||||||
| class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSize: Int, customLineagePath: Option[String]) | ||||||||||||||||||||||||||||||||||
| extends LineageDispatcher | ||||||||||||||||||||||||||||||||||
| with Logging { | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| def this(conf: Configuration) = this( | ||||||||||||||||||||||||||||||||||
| filename = conf.getRequiredString(FileNameKey), | ||||||||||||||||||||||||||||||||||
| permission = new FsPermission(conf.getRequiredObject(FilePermissionsKey).toString), | ||||||||||||||||||||||||||||||||||
| bufferSize = conf.getRequiredInt(BufferSizeKey) | ||||||||||||||||||||||||||||||||||
| bufferSize = conf.getRequiredInt(BufferSizeKey), | ||||||||||||||||||||||||||||||||||
| customLineagePath = conf.getOptionalString(CustomLineagePathKey) | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| @volatile | ||||||||||||||||||||||||||||||||||
|
|
@@ -67,7 +87,7 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi | |||||||||||||||||||||||||||||||||
| throw new IllegalStateException("send(event) must be called strictly after send(plan) method with matching plan ID") | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||
| val path = s"${this._lastSeenPlan.operations.write.outputSource.stripSuffix("/")}/$filename" | ||||||||||||||||||||||||||||||||||
| val path = resolveLineagePath(event.planId) | ||||||||||||||||||||||||||||||||||
| val planWithEvent = Map( | ||||||||||||||||||||||||||||||||||
| "executionPlan" -> this._lastSeenPlan, | ||||||||||||||||||||||||||||||||||
| "executionEvent" -> event | ||||||||||||||||||||||||||||||||||
|
|
@@ -80,10 +100,102 @@ class HDFSLineageDispatcher(filename: String, permission: FsPermission, bufferSi | |||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||
| * Resolves the lineage file path based on configuration. | ||||||||||||||||||||||||||||||||||
| * If customLineagePath is specified, lineage files are written to that centralized location. | ||||||||||||||||||||||||||||||||||
| * Otherwise, lineage files are written alongside the target data file (current behavior). | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * @param planId The execution plan ID used for generating unique filenames in centralized mode | ||||||||||||||||||||||||||||||||||
| * @return The full path where the lineage file should be written | ||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| private def resolveLineagePath(planId: String): String = { | ||||||||||||||||||||||||||||||||||
| customLineagePath match { | ||||||||||||||||||||||||||||||||||
| case Some(customPath) => | ||||||||||||||||||||||||||||||||||
| // Centralized mode: write to custom path with unique filename | ||||||||||||||||||||||||||||||||||
| val cleanCustomPath = customPath.stripSuffix("/") | ||||||||||||||||||||||||||||||||||
| val uniqueFilename = generateUniqueFilename(planId) | ||||||||||||||||||||||||||||||||||
| s"$cleanCustomPath/$uniqueFilename" | ||||||||||||||||||||||||||||||||||
| case None => | ||||||||||||||||||||||||||||||||||
| // Default mode: write alongside target data file | ||||||||||||||||||||||||||||||||||
| s"${this._lastSeenPlan.operations.write.outputSource.stripSuffix("/")}/$filename" | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||
| * Generates a unique filename for centralized lineage storage. | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * Format: {timestamp}_{fileName}_{appId} | ||||||||||||||||||||||||||||||||||
| * Example: 2025-10-12_14-30-45-123_lineage_app-20251012143045-0001 | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * This format optimizes for operational debugging use cases: | ||||||||||||||||||||||||||||||||||
| * - Timestamp FIRST: Ensures natural chronological sorting (most recent files appear together) | ||||||||||||||||||||||||||||||||||
| * - Application ID: Full traceability to specific Spark application run | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * @param planId The execution plan ID (unused, kept for interface compatibility) | ||||||||||||||||||||||||||||||||||
| * @return A unique filename optimized for filtering and sorting | ||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| private def generateUniqueFilename(planId: String): String = { | ||||||||||||||||||||||||||||||||||
| val sparkContext = SparkContext.getOrCreate() | ||||||||||||||||||||||||||||||||||
| val appId = sparkContext.applicationId | ||||||||||||||||||||||||||||||||||
| val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC")) | ||||||||||||||||||||||||||||||||||
| val timestamp = dateFormatter.format(Instant.now()) | ||||||||||||||||||||||||||||||||||
| s"${timestamp}_${filename}_${appId}" | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||
| * Ensures that parent directories exist with proper permissions for multi-user access. | ||||||||||||||||||||||||||||||||||
| * This is critical for centralized lineage storage where different service accounts need write access. | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * Note: Object storage systems (S3, GCS, Azure Blob) don't have true directories - they use key prefixes. | ||||||||||||||||||||||||||||||||||
| * Directory creation is automatically skipped for these systems to avoid unnecessary operations. | ||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||
| * @param fs The Hadoop FileSystem | ||||||||||||||||||||||||||||||||||
| * @param path The target file path whose parent directories should be created | ||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||
| private def ensureParentDirectoriesExist(fs: FileSystem, path: Path): Unit = { | ||||||||||||||||||||||||||||||||||
| // Object storage systems (S3, GCS, Azure Blob) don't have real directories - they're just key prefixes | ||||||||||||||||||||||||||||||||||
| // Skip directory creation for these systems to avoid unnecessary operations | ||||||||||||||||||||||||||||||||||
| val fsScheme = fs.getUri.getScheme | ||||||||||||||||||||||||||||||||||
| val isObjectStorage = fsScheme != null && ( | ||||||||||||||||||||||||||||||||||
| fsScheme.startsWith("s3") || // S3: s3, s3a, s3n | ||||||||||||||||||||||||||||||||||
| fsScheme.startsWith("gs") || // Google Cloud Storage: gs | ||||||||||||||||||||||||||||||||||
| fsScheme.startsWith("wasb") || // Azure Blob Storage: wasb, wasbs | ||||||||||||||||||||||||||||||||||
| fsScheme.startsWith("abfs") || // Azure Data Lake Storage Gen2: abfs, abfss | ||||||||||||||||||||||||||||||||||
| fsScheme.startsWith("adl") // Azure Data Lake Storage Gen1: adl | ||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
| if (isObjectStorage) { | ||||||||||||||||||||||||||||||||||
| logDebug(s"Skipping directory creation for object storage filesystem ($fsScheme) - directories are implicit key prefixes") | ||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
| val parentDir = path.getParent | ||||||||||||||||||||||||||||||||||
| if (parentDir != null && !fs.exists(parentDir)) { | ||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||
| // Create directories with multi-user friendly permissions to allow all service accounts to write | ||||||||||||||||||||||||||||||||||
| // This uses the same permission object that's already configured for files | ||||||||||||||||||||||||||||||||||
| val created = fs.mkdirs(parentDir, permission) | ||||||||||||||||||||||||||||||||||
| if (created) { | ||||||||||||||||||||||||||||||||||
| logInfo(s"Created parent directories: $parentDir with permissions $permission") | ||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
| val created = fs.mkdirs(parentDir, permission) | |
| if (created) { | |
| logInfo(s"Created parent directories: $parentDir with permissions $permission") | |
| } | |
| try { | |
| // Create directories with multi-user friendly permissions (execute bits for traversal) | |
| // Add execute bits to the file permissions to make directories traversable | |
| val dirPermission = new FsPermission( | |
| permission.getUserAction.or(FsAction.EXECUTE), | |
| permission.getGroupAction.or(FsAction.EXECUTE), | |
| permission.getOtherAction.or(FsAction.EXECUTE) | |
| ) | |
| val created = fs.mkdirs(parentDir, dirPermission) | |
| if (created) { | |
| logInfo(s"Created parent directories: $parentDir with permissions $dirPermission") | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Docs imply gs/abfs/hdfs scheme support that the code doesn’t yet provide
YAML examples list S3, GCS (gs://), Azure (abfs/wasb), and hdfs://. Current FS resolution only special‑cases S3 and otherwise uses the default FS, which won’t work for gs/abfs/wasb/adl or hdfs:// URIs with authority. Either (a) narrow the examples, or (b) implement generic scheme‑aware FS resolution (recommended). See HDFSLineageDispatcher.pathStringToFsWithPath. Also, ensure blanks/whitespace are treated as default mode per these comments.
🏁 Script executed:
Length of output: 2044
🏁 Script executed:
Length of output: 10232
Align docs and code for customLineagePath schemes and blank handling
customLineagePathcurrently matchesSome("")and enters centralized mode; strip/null/blank values must map to default mode per docs🤖 Prompt for AI Agents