-
Notifications
You must be signed in to change notification settings - Fork 41
Add import log classes and utils #2591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 20 commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
be9ee23
Initial commit
inv-jishnu 89b9f05
Spotless applied again
inv-jishnu 49c83b6
Removed unused code
inv-jishnu b2871fb
Merge branch 'master' into feat/data-loader/import-log-2
ypeckstadt c5c9c0a
Removed unused classes and references
inv-jishnu 4964e8d
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu ff81f5f
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu 3934c2a
Improve Javadocs
ypeckstadt 9958f95
Changes
inv-jishnu 1afbc21
Renamed parameters
inv-jishnu 8c5114d
logging changes
inv-jishnu ffab395
removed repeated code
inv-jishnu 79df1ed
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu cf31672
Merge branch 'master' into feat/data-loader/import-log-2
brfrn169 6dd213e
Added excetpion throw
inv-jishnu 6542177
Synchronisation changes
inv-jishnu 603e46e
Added volatile back to fix spotbugs issue
inv-jishnu eaf9d88
Removed unused variable
inv-jishnu 502034e
Chanaged LOGGER to logger
inv-jishnu 6b22d1a
logger name change in test
inv-jishnu b393b62
Edge case fix
inv-jishnu cacdd86
Remove unused param
inv-jishnu c26cc6e
Revert "Remove unused param"
inv-jishnu 9a4bfa8
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu 415805b
Removed null assignment
inv-jishnu 0584f5e
Merge branch 'master' into feat/data-loader/import-log-2
inv-jishnu 7c72397
comment change
inv-jishnu 0f34395
renamed params to make it more clear
inv-jishnu a59f317
Merge branch 'master' into feat/data-loader/import-log-2
ypeckstadt d807387
Merge branch 'master' into feat/data-loader/import-log-2
feeblefakie File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
242 changes: 242 additions & 0 deletions
242
...core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/AbstractImportLogger.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,242 @@ | ||
| package com.scalar.db.dataloader.core.dataimport.log; | ||
|
|
||
| import com.fasterxml.jackson.databind.JsonNode; | ||
| import com.scalar.db.dataloader.core.Constants; | ||
| import com.scalar.db.dataloader.core.DataLoaderObjectMapper; | ||
| import com.scalar.db.dataloader.core.dataimport.ImportEventListener; | ||
| import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus; | ||
| import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter; | ||
| import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory; | ||
| import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult; | ||
| import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus; | ||
| import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult; | ||
| import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult; | ||
| import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus; | ||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import lombok.RequiredArgsConstructor; | ||
|
|
||
| /** | ||
| * An abstract base class for logging import events during data loading operations. This class | ||
| * implements the {@link ImportEventListener} interface and provides common functionality for | ||
| * logging transaction batch results and managing event listeners. Concrete implementations should | ||
| * define how to log transaction batches and handle errors. | ||
| */ | ||
| @RequiredArgsConstructor | ||
| public abstract class AbstractImportLogger implements ImportEventListener { | ||
|
|
||
| /** Object mapper used for JSON serialization/deserialization. */ | ||
| protected static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper(); | ||
|
|
||
| /** Configuration for the import logger. */ | ||
| protected final ImportLoggerConfig config; | ||
|
|
||
| /** Factory for creating log writers. */ | ||
| protected final LogWriterFactory logWriterFactory; | ||
|
|
||
| /** List of event listeners to be notified of import events. */ | ||
| protected final List<ImportEventListener> listeners = new ArrayList<>(); | ||
|
|
||
| /** | ||
| * Called when a data chunk import is started. Currently, this implementation does not log the | ||
| * start of a data chunk. | ||
| * | ||
| * @param importDataChunkStatus the status of the data chunk being imported | ||
| */ | ||
| @Override | ||
| public void onDataChunkStarted(ImportDataChunkStatus importDataChunkStatus) { | ||
| // Currently we are not logging the start of a data chunk | ||
| } | ||
|
|
||
| /** | ||
| * Called when a transaction batch is started. Currently, this implementation does not log the | ||
| * start of a transaction batch, but it notifies all registered listeners. | ||
| * | ||
| * @param batchStatus the status of the transaction batch being started | ||
| */ | ||
| @Override | ||
| public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) { | ||
| // Currently we are not logging the start of a transaction batch | ||
| notifyTransactionBatchStarted(batchStatus); | ||
| } | ||
|
|
||
| /** | ||
| * Called when a transaction batch is completed. This method logs the transaction batch result if | ||
| * it should be logged based on the configuration, and notifies all registered listeners. | ||
| * | ||
| * @param batchResult the result of the completed transaction batch | ||
| */ | ||
| @Override | ||
| public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { | ||
| // skip logging success records if the configuration is set to skip | ||
| if (shouldSkipLoggingSuccess(batchResult)) { | ||
| return; | ||
| } | ||
|
|
||
| logTransactionBatch(batchResult); | ||
| notifyTransactionBatchCompleted(batchResult); | ||
| } | ||
|
|
||
| /** | ||
| * Logs a transaction batch result. This method should be implemented by concrete subclasses to | ||
| * define how to log transaction batch results. | ||
| * | ||
| * @param batchResult the transaction batch result to log | ||
| */ | ||
| protected abstract void logTransactionBatch(ImportTransactionBatchResult batchResult); | ||
|
|
||
| /** | ||
| * Determines whether logging of a successful transaction batch should be skipped. Logging is | ||
| * skipped if the batch was successful and the configuration specifies not to log success records. | ||
| * | ||
| * @param batchResult the transaction batch result to check | ||
| * @return true if logging should be skipped, false otherwise | ||
| */ | ||
| protected boolean shouldSkipLoggingSuccess(ImportTransactionBatchResult batchResult) { | ||
| return batchResult.isSuccess() && !config.isLogSuccessRecords(); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a filtered JSON representation of a transaction batch result. This method filters out | ||
| * raw record data if the configuration specifies not to log raw source records. | ||
| * | ||
| * @param batchResult the transaction batch result to convert to JSON | ||
| * @return a JsonNode representing the filtered transaction batch result | ||
| */ | ||
| protected JsonNode createFilteredTransactionBatchLogJsonNode( | ||
| ImportTransactionBatchResult batchResult) { | ||
|
|
||
| // If the batch result does not contain any records, return the batch result as is | ||
| if (batchResult.getRecords() == null) { | ||
| return OBJECT_MAPPER.valueToTree(batchResult); | ||
| } | ||
|
|
||
| // Create a new list to store the modified import task results | ||
| List<ImportTaskResult> modifiedRecords = new ArrayList<>(); | ||
|
|
||
| // Loop over the records in the batchResult | ||
| for (ImportTaskResult taskResult : batchResult.getRecords()) { | ||
| // Create a new ImportTaskResult and not add the raw record yet | ||
| List<ImportTargetResult> targetResults = | ||
| batchResult.isSuccess() | ||
| ? taskResult.getTargets() | ||
| : updateTargetStatusForAbortedTransactionBatch(taskResult.getTargets()); | ||
| ImportTaskResult.ImportTaskResultBuilder builder = | ||
| ImportTaskResult.builder() | ||
| .rowNumber(taskResult.getRowNumber()) | ||
| .targets(targetResults) | ||
| .dataChunkId(taskResult.getDataChunkId()); | ||
|
|
||
| // Only add the raw record if the configuration is set to log raw source data | ||
| if (config.isLogRawSourceRecords()) { | ||
| builder.rawRecord(taskResult.getRawRecord()); | ||
| } | ||
| ImportTaskResult modifiedTaskResult = builder.build(); | ||
|
|
||
| // Add the modified task result to the list | ||
| modifiedRecords.add(modifiedTaskResult); | ||
| } | ||
|
|
||
| // Create a new transaction batch result with the modified import task results | ||
| ImportTransactionBatchResult modifiedBatchResult = | ||
| ImportTransactionBatchResult.builder() | ||
| .dataChunkId(batchResult.getDataChunkId()) | ||
| .transactionBatchId(batchResult.getTransactionBatchId()) | ||
| .transactionId(batchResult.getTransactionId()) | ||
| .records(modifiedRecords) | ||
| .errors(batchResult.getErrors()) | ||
| .success(batchResult.isSuccess()) | ||
| .build(); | ||
|
|
||
| // Convert the modified batch result to a JsonNode | ||
| return OBJECT_MAPPER.valueToTree(modifiedBatchResult); | ||
| } | ||
|
|
||
| /** | ||
| * Safely closes a log writer. If an IOException occurs during closing, it logs the error using | ||
| * the {@link #logError} method. | ||
| * | ||
| * @param logWriter the log writer to close, may be null | ||
| */ | ||
| protected void closeLogWriter(LogWriter logWriter) { | ||
| if (logWriter != null) { | ||
| try { | ||
| logWriter.close(); | ||
| } catch (IOException e) { | ||
| logError("Failed to close a log writer", e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Logs an error message and exception. This method should be implemented by concrete subclasses | ||
| * to define how to log errors. | ||
| * | ||
| * @param errorMessage the error message to log | ||
| * @param e the exception that caused the error | ||
| */ | ||
| protected abstract void logError(String errorMessage, Exception e); | ||
|
|
||
| /** | ||
| * Creates a log writer for the specified log file path. | ||
| * | ||
| * @param logFilePath the path to the log file | ||
| * @return a new log writer | ||
| * @throws IOException if an I/O error occurs while creating the log writer | ||
| */ | ||
| protected LogWriter createLogWriter(String logFilePath) throws IOException { | ||
| return logWriterFactory.createLogWriter(logFilePath); | ||
| } | ||
|
|
||
| /** | ||
| * Notifies all registered listeners that a transaction batch has started. | ||
| * | ||
| * @param status the status of the transaction batch that has started | ||
| */ | ||
| private void notifyTransactionBatchStarted(ImportTransactionBatchStatus status) { | ||
| for (ImportEventListener listener : listeners) { | ||
| listener.onTransactionBatchStarted(status); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Notifies all registered listeners that a transaction batch has completed. | ||
| * | ||
| * @param batchResult the result of the completed transaction batch | ||
| */ | ||
| private void notifyTransactionBatchCompleted(ImportTransactionBatchResult batchResult) { | ||
| for (ImportEventListener listener : listeners) { | ||
| listener.onTransactionBatchCompleted(batchResult); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Updates the status of target results for an aborted transaction batch. For each target with a | ||
| * status of SAVED, changes the status to ABORTED and adds an error message. | ||
| * | ||
| * @param targetResults the list of target results to update | ||
| * @return the updated list of target results | ||
| */ | ||
| private List<ImportTargetResult> updateTargetStatusForAbortedTransactionBatch( | ||
| List<ImportTargetResult> targetResults) { | ||
| for (int i = 0; i < targetResults.size(); i++) { | ||
| ImportTargetResult target = targetResults.get(i); | ||
| if (target.getStatus().equals(ImportTargetResultStatus.SAVED)) { | ||
| ImportTargetResult newTarget = | ||
| ImportTargetResult.builder() | ||
| .importAction(target.getImportAction()) | ||
| .status(ImportTargetResultStatus.ABORTED) | ||
| .importedRecord(target.getImportedRecord()) | ||
| .namespace(target.getNamespace()) | ||
| .tableName(target.getTableName()) | ||
| .dataMapped(target.isDataMapped()) | ||
| .errors(Collections.singletonList(Constants.ABORT_TRANSACTION_STATUS)) | ||
| .build(); | ||
| targetResults.set(i, newTarget); | ||
| } | ||
| } | ||
| return targetResults; | ||
| } | ||
| } | ||
37 changes: 37 additions & 0 deletions
37
...r/core/src/main/java/com/scalar/db/dataloader/core/dataimport/log/ImportLoggerConfig.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| package com.scalar.db.dataloader.core.dataimport.log; | ||
|
|
||
| import lombok.Builder; | ||
| import lombok.Value; | ||
|
|
||
| /** | ||
| * Configuration class for import loggers. This class uses Lombok's {@code @Value} annotation to | ||
| * create an immutable class and {@code @Builder} annotation to provide a builder pattern for | ||
| * creating instances. | ||
| */ | ||
| @Value | ||
| @Builder | ||
| public class ImportLoggerConfig { | ||
| /** | ||
| * The directory path where log files will be stored. This path should end with a directory | ||
| * separator (e.g., "/"). | ||
| */ | ||
| String logDirectoryPath; | ||
|
|
||
| /** | ||
| * Whether to log records that were successfully imported. If true, successful import operations | ||
| * will be logged to success log files. | ||
| */ | ||
| boolean isLogSuccessRecords; | ||
|
|
||
| /** | ||
| * Whether to log raw source records that failed to be imported. If true, failed import operations | ||
| * will be logged to failure log files. | ||
| */ | ||
| boolean isLogRawSourceRecords; | ||
|
|
||
| /** | ||
| * Whether to format the logs with pretty printing. If true, the JSON logs will be formatted with | ||
| * indentation for better readability. | ||
| */ | ||
| boolean prettyPrint; | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
config.logRawSourceRecordsEnabled()orconfig.isLogRawSourceRecordsEnabled()orconfig.shouldLogRawSourceRecords()is a bit better?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@inv-jishnu How about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brfrn169 san,
I had renamed it again in0f34395 now to
isLogRawSourceRecordsEnabledfromisLogRawSourceRecords. Sorry for the confusion.Thank you.