Skip to content

Commit 68275d1

Browse files
Make type of logged values consistent for each logkey (delta-io#3883)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Ensures that the types of logged values for each logkey are consistent. ## How was this patch tested? Code compiles and existing tests pass. ## Does this PR introduce _any_ user-facing changes? No Signed-off-by: Michael Zhang <[email protected]>
1 parent ee87b77 commit 68275d1

File tree

9 files changed

+19
-18
lines changed

9 files changed

+19
-18
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ trait MetadataCleanup extends DeltaLogging {
128128
sidecarDeletionMetrics)
129129
logInfo(log"Sidecar deletion metrics: ${MDC(DeltaLogKeys.METRICS, sidecarDeletionMetrics)}")
130130
}
131-
logInfo(log"Deleted ${MDC(DeltaLogKeys.NUM_FILES, numDeleted)} log files and " +
132-
log"${MDC(DeltaLogKeys.NUM_FILES2, numDeletedUnbackfilled)} unbackfilled commit " +
131+
logInfo(log"Deleted ${MDC(DeltaLogKeys.NUM_FILES, numDeleted.toLong)} log files and " +
132+
log"${MDC(DeltaLogKeys.NUM_FILES2, numDeletedUnbackfilled.toLong)} unbackfilled commit " +
133133
log"files older than ${MDC(DeltaLogKeys.DATE, formattedDate)}")
134134
}
135135
}

spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1881,7 +1881,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
18811881

18821882
logInfo(log"Committed delta #${MDC(DeltaLogKeys.VERSION, attemptVersion)} to " +
18831883
log"${MDC(DeltaLogKeys.PATH, deltaLog.logPath)}. Wrote " +
1884-
log"${MDC(DeltaLogKeys.NUM_ACTIONS, commitSize)} actions.")
1884+
log"${MDC(DeltaLogKeys.NUM_ACTIONS, commitSize.toLong)} actions.")
18851885

18861886
deltaLog.checkpoint(currentSnapshot)
18871887
currentSnapshot
@@ -2044,7 +2044,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
20442044
|Detected mismatch in partition values between AddFile and table metadata but
20452045
|commit validation was turned off.
20462046
|To turn it back on set
2047-
|${MDC(DeltaLogKeys.CONFIG_KEY, DeltaSQLConf.DELTA_COMMIT_VALIDATION_ENABLED)}
2047+
|${MDC(DeltaLogKeys.CONFIG_KEY, DeltaSQLConf.DELTA_COMMIT_VALIDATION_ENABLED.key)}
20482048
|to "true"
20492049
""".stripMargin)
20502050
a
@@ -2284,7 +2284,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
22842284
val actions = currentTransactionInfo.finalActionsToCommit
22852285
logInfo(
22862286
log"Attempting to commit version ${MDC(DeltaLogKeys.VERSION, attemptVersion)} with " +
2287-
log"${MDC(DeltaLogKeys.NUM_ACTIONS, actions.size)} actions with " +
2287+
log"${MDC(DeltaLogKeys.NUM_ACTIONS, actions.size.toLong)} actions with " +
22882288
log"${MDC(DeltaLogKeys.ISOLATION_LEVEL, isolationLevel)} isolation level")
22892289

22902290
if (readVersion > -1 && metadata.id != snapshot.metadata.id) {
@@ -2588,7 +2588,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
25882588
log"${MDC(DeltaLogKeys.NUM_ACTIONS, adds)} adds, " +
25892589
log"${MDC(DeltaLogKeys.NUM_ACTIONS2, removes)} removes, " +
25902590
log"${MDC(DeltaLogKeys.NUM_PREDICATES, readPredicates.size)} read predicates, " +
2591-
log"${MDC(DeltaLogKeys.NUM_FILES, readFiles.size)} read files"
2591+
log"${MDC(DeltaLogKeys.NUM_FILES, readFiles.size.toLong)} read files"
25922592
}
25932593

25942594
logInfo(logPrefix +

spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,8 @@ abstract class ConvertToDeltaCommandBase(
311311
_, txn.deltaLog.dataPath, fs, conf, Some(partitionSchema), deltaPath.isDefined))
312312
if (shouldCollectStats) {
313313
logInfo(
314-
log"Collecting stats for a batch of ${MDC(DeltaLogKeys.NUM_FILES, batch.size)} files; " +
314+
log"Collecting stats for a batch of " +
315+
log"${MDC(DeltaLogKeys.NUM_FILES, batch.size.toLong)} files; " +
315316
log"finished ${MDC(DeltaLogKeys.NUM_FILES2, numFiles)} so far"
316317
)
317318
numFiles += statsBatchSize

spark/src/main/scala/org/apache/spark/sql/delta/commands/ReorgTableForUpgradeUniformHelper.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ trait ReorgTableForUpgradeUniformHelper extends DeltaLogging {
114114
log"at version ${MDC(DeltaLogKeys.VERSION, snapshot.version)}, there are " +
115115
log"${MDC(DeltaLogKeys.NUM_FILES, numOfAddFiles)} addFiles, and " +
116116
log"${MDC(DeltaLogKeys.NUM_FILES2, numOfAddFilesWithTag)} addFiles with " +
117-
log"ICEBERG_COMPAT_VERSION=${MDC(DeltaLogKeys.VERSION2, icebergCompatVersion)} tag.")
117+
log"ICEBERG_COMPAT_VERSION=${MDC(DeltaLogKeys.VERSION2, icebergCompatVersion.toLong)} tag.")
118118
(numOfAddFiles, numOfAddFilesWithTag)
119119
}
120120

spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,10 +423,10 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
423423
)
424424

425425
recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats)
426-
logInfo(log"Found ${MDC(DeltaLogKeys.NUM_FILES, numFiles)} files " +
426+
logInfo(log"Found ${MDC(DeltaLogKeys.NUM_FILES, numFiles.toLong)} files " +
427427
log"(${MDC(DeltaLogKeys.NUM_BYTES, sizeOfDataToDelete)} bytes) and directories in " +
428428
log"a total of ${MDC(DeltaLogKeys.NUM_DIRS, dirCounts)} directories " +
429-
log"that are safe to delete. Vacuum stats: ${MDC(DeltaLogKeys.STATS, stats)}")
429+
log"that are safe to delete. Vacuum stats: ${MDC(DeltaLogKeys.VACUUM_STATS, stats)}")
430430

431431
return diffFiles.map(f => urlEncodedStringToPath(f).toString).toDF("path")
432432
}

spark/src/main/scala/org/apache/spark/sql/delta/commands/backfill/BackfillBatch.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ trait BackfillBatch extends DeltaLogging {
6868
)
6969
}
7070

71-
logInfo(log"Batch ${MDC(DeltaLogKeys.BATCH_ID, batchId)} starting, committing " +
72-
log"${MDC(DeltaLogKeys.NUM_FILES, filesInBatch.size)} candidate files")
71+
logInfo(log"Batch ${MDC(DeltaLogKeys.BATCH_ID, batchId.toLong)} starting, committing " +
72+
log"${MDC(DeltaLogKeys.NUM_FILES, filesInBatch.size.toLong)} candidate files")
7373
// This step is necessary to mark all files in this batch as "read" in the
7474
// child transaction object `txn` and to set the read transactions ids to be the same as the
7575
// parent transaction object `origTxn`, for proper conflict checking.

spark/src/main/scala/org/apache/spark/sql/delta/hooks/GenerateSymlinkManifest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with
337337
}
338338

339339
logInfo(log"Deleted manifest partitions [" +
340-
log"${MDC(DeltaLogKeys.NUM_FILES, partitionRelativePathsToDelete.size)}]:\n\t" +
340+
log"${MDC(DeltaLogKeys.NUM_FILES, partitionRelativePathsToDelete.size.toLong)}]:\n\t" +
341341
log"${MDC(DeltaLogKeys.PATHS, partitionRelativePathsToDelete.mkString("\n\t"))}")
342342
}
343343

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ case class DeltaSink(
9393
logInfo(
9494
log"Committed transaction, batchId=${MDC(DeltaLogKeys.BATCH_ID, batchId)}, " +
9595
log"duration=${MDC(DeltaLogKeys.DURATION, durationMs)} ms, " +
96-
log"added ${MDC(DeltaLogKeys.NUM_FILES, newFiles.size)} files, " +
97-
log"removed ${MDC(DeltaLogKeys.NUM_FILES2, deletedFiles.size)} files.")
96+
log"added ${MDC(DeltaLogKeys.NUM_FILES, newFiles.size.toLong)} files, " +
97+
log"removed ${MDC(DeltaLogKeys.NUM_FILES2, deletedFiles.size.toLong)} files.")
9898
val executionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
9999
SQLMetrics.postDriverMetricUpdates(sc, executionId, metrics.values.toSeq)
100100
}
@@ -153,8 +153,8 @@ case class DeltaSink(
153153
val totalSize = newFiles.map(_.getFileSize).sum
154154
val totalLogicalRecords = newFiles.map(_.numLogicalRecords.getOrElse(0L)).sum
155155
logInfo(
156-
log"Wrote ${MDC(DeltaLogKeys.NUM_FILES, newFiles.size)} files, with total size " +
157-
log"${MDC(DeltaLogKeys.NUM_BYTES, totalSize)}, " +
156+
log"Wrote ${MDC(DeltaLogKeys.NUM_FILES, newFiles.size.toLong)} files, " +
157+
log"with total size ${MDC(DeltaLogKeys.NUM_BYTES, totalSize)}, " +
158158
log"${MDC(DeltaLogKeys.NUM_RECORDS, totalLogicalRecords)} logical records, " +
159159
log"duration=${MDC(DeltaLogKeys.DURATION, writeFilesTimeMs)} ms.")
160160

spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ object DeltaFileOperations extends DeltaLogging {
128128
base: Int = 100,
129129
jitter: Int = 1000): Unit = {
130130
val sleepTime = Random.nextInt(jitter) + base
131-
logWarning(log"Sleeping for ${MDC(DeltaLogKeys.TIME_MS, sleepTime)} ms to rate limit " +
131+
logWarning(log"Sleeping for ${MDC(DeltaLogKeys.TIME_MS, sleepTime.toLong)} ms to rate limit " +
132132
log"${MDC(DeltaLogKeys.OP_NAME, opName)}", t)
133133
Thread.sleep(sleepTime)
134134
}

0 commit comments

Comments
 (0)