Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.

Commit cd7a634

Browse files
committed
Added transactional checkpointing example and possible implementation changes
1 parent 84519a1 commit cd7a634

File tree

1 file changed

+116
-9
lines changed

1 file changed

+116
-9
lines changed

rfcs/20200505-transactional-fs.md

Lines changed: 116 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,7 @@ Status MergeFiles(const string& fname, const string& dirname,
697697

698698
## Final design proposal
699699

700-
During the review meeting it has been decided to merge wrapped filesystem approach and stateful token approach. Then the final proposal is as shown below. The `WrappedFileSystem` and `Filesystem` classes described above are to be extended with three new methods,
700+
During the review meeting it has been decided to merge wrapped filesystem approach and stateful token approach. Then the final proposal is as shown below. The `WrappedFileSystem` and `Filesystem` classes described above are to be extended with four new methods,
701701

702702
```cpp
703703
class WrappedFileSystem : public Filesystem {
@@ -707,6 +707,11 @@ class WrappedFileSystem : public Filesystem {
707707
return fs_->AddToTransaction(uri, (token ? token : token_));
708708
}
709709

710+
virtual Status GetTransactionForPath(const string& uri,
711+
TransactionToken*& token) {
712+
return fs_->GetTransactionForPath(uri, token);
713+
}
714+
710715
virtual Status GetTokenOrStartTransaction(const string& uri,
711716
TransactionToken*& token) {
712717
return fs_->GetTokenOrStartTransaction(uri, token);
@@ -726,18 +731,120 @@ Then the current C API's `TF_FilesystemOps` table is to be extended by 6 new fun
726731
struct TF_FilesystemOps{
727732
// Existing members are not modified
728733
// Transaction management
729-
void (*const StartTransaction)(TF_Filesystem*, TransactionToken**);
730-
void (*const EndTransaction)(TF_Filesystem*, TransactionToken*);
731-
void (*const AddToTransaction)(TF_Filesystem* fs, const char* file_name, TransactionToken** token);
732-
void (*const GetTransactionTokenForFile)(TF_Filesystem* fs, const char* file_name, TransactionToken** token);
733-
void (*const GetOrStartTransactionTokenForFile)(TF_Filesystem* fs, const char* file_name, TransactionToken** token);
734+
void (*start_transaction)(TF_Filesystem*, TF_TransactionToken**, TF_Status );
735+
void (*end_transaction)(TF_Filesystem*, TF_TransactionToken*);
736+
void (*add_to_transaction)(TF_Filesystem* fs, const char* path, TF_TransactionToken* token);
737+
void (*get_transaction_for_path)(TF_Filesystem* fs, const char* path, TF_TransactionToken** token);
738+
void (*get_or_start_transaction_for_path)(TF_Filesystem* fs, const char* path, TF_TransactionToken** token);
734739
// Optional Transaction Debugging
735-
void (*const DecodeTransactionToken)(const TF_Filesystem*, const TransactionToken*, char**);
740+
char* (*decode_transaction_token)(const TF_Filesystem*, const TF_TransactionToken*);
736741
};
737742
```
738743

739-
The new functions will be null pointers until respective plugins implement them. `ModularFilesystem` implementation will check whether a plugin implements the transactions and will ignore the transaction if it is not implemented, possibly after producing a log message, thus falling back to current transactionless state. Since these function pointers will be added after existing pointers, already compiled plugins will keep functioning and they can be gradually start supporting transactions. Any filesystem plugin that start supporting transaction will be used by the framework.
744+
The new functions will be null pointers until respective plugins implement them. `ModularFilesystem` implementation will check whether a plugin implements the transactions and will ignore the transaction if it is not implemented, possibly after producing a log message, thus falling back to
745+
current transactionless state. Since these function pointers will be added after existing pointers, already compiled plugins will keep functioning and they can be gradually start supporting transactions. Any filesystem plugin that start supporting transaction will be used by the framework.
746+
747+
## Example use
748+
749+
With these final modifications, there is no need to carry transaction tokens through different compilation units or ops in the graph. For example current checkpointing logic involve adding one or more `SaveV2` ops and a `MergeV2Checkpoints` op to the graph. In order to prevent
750+
corrupt checkpoints in case of errors and optimize i/o, SaveV2 ops write their outputs to the temporary directories, given as constant input arguments, and MergeV2Checkpoints op is given the names of the temporary save files generated by all SaveV2 ops which reads and merges
751+
them into a final file and then deletes temporary files. Just by adding a few lines to SaveV2 to and MergeV2Checkpoints ops, these operations can be completed transactionally. In this case overview of the operations would be,
752+
753+
- SaveV2 op uses `Env::GetTokenOrStartTransaction(base_dir)` call to start a new transaction or get an existing transaction on the base output directory.
754+
- SaveV2 op adds files it generates to the transaction.
755+
- MergeV2Checkpoint op uses `Env::GetTransactionTokenForPath(base_dir)` to get the transaction started by SaveV2 ops and adds its output to the same transaction.
756+
- MergeV2Checkpoint op removes intermediate files and calls `EndTransaction()` to finalize the transaction.
757+
758+
Then a transactional file system may operate in a cache and move files to the final destination only at the end of the transaction which would ensure that the files in the checkpoint directory are consistent. The
759+
implementation of how this is achieved may be different for each filesystem but the end result should be consistent.
760+
761+
An example implementation of this could be as shown in the diff set below.
762+
```diff
763+
--- a/tensorflow/core/kernels/save_restore_v2_ops.cc
764+
+++ b/tensorflow/core/kernels/save_restore_v2_ops.cc
765+
@@ -237,6 +237,17 @@ class MergeV2Checkpoints : public OpKernel {
766+
gtl::ArraySlice<tstring>(checkpoint_prefixes.flat<tstring>());
767+
Env* env = Env::Default();
768+
const string& merged_prefix = destination_prefix.scalar<tstring>()();
769+
+ auto token_deleter = [env](TransactionToken* token) {
770+
+ if (token) {
771+
+ env->EndTransaction(token);
772+
+ }
773+
+ };
774+
+ TransactionToken* token = nullptr;
775+
+ env->GetTokenOrStartTransaction(string(io::Dirname(input_prefixes[0])), &token);
776+
+ auto token_scope =
777+
+ std::unique_ptr<TransactionToken, decltype(token_deleter)>(
778+
+ token, token_deleter);
779+
+ env->AddToTransaction(string(io::Dirname(merged_prefix)), token);
780+
OP_REQUIRES_OK(
781+
context, tensorflow::MergeBundles(env, input_prefixes, merged_prefix));
782+
783+
--- a/tensorflow/core/util/tensor_bundle/tensor_bundle.cc
784+
+++ b/tensorflow/core/util/tensor_bundle/tensor_bundle.cc
785+
@@ -421,9 +421,10 @@ BundleWriter::BundleWriter(Env* env, StringPiece prefix, const Options& options)
786+
if (!status_.ok() && !errors::IsAlreadyExists(status_)) {
787+
return;
788+
}
789+
-
790+
+ TransactionToken* token=nullptr;
791+
+ env->GetTokenOrStartTransaction(string(io::Dirname(prefix_)),&token).IgnoreError();
792+
std::unique_ptr<WritableFile> wrapper;
793+
- status_ = env_->NewWritableFile(data_path_, &wrapper);
794+
+ status_ = env_->NewWritableFile(data_path_, &wrapper,token);
795+
if (!status_.ok()) return;
796+
out_ = std::unique_ptr<FileOutputBuffer>(
797+
new FileOutputBuffer(wrapper.release(), 8 << 20 /* 8MB write buffer */));
798+
@@ -527,7 +528,9 @@ Status BundleWriter::Finish() {
799+
if (!status_.ok()) return status_;
800+
// Build key -> BundleEntryProto table.
801+
std::unique_ptr<WritableFile> file;
802+
- status_ = env_->NewWritableFile(metadata_path_, &file);
803+
+ TransactionToken* token;
804+
+ env_->GetTokenOrStartTransaction(string(io::Dirname(prefix_)),&token).IgnoreError();
805+
+ status_ = env_->NewWritableFile(metadata_path_, &file, token);
806+
if (!status_.ok()) return status_;
807+
{
808+
// N.B.: the default use of Snappy compression may not be supported on all
809+
@@ -554,7 +557,7 @@ Status BundleWriter::Finish() {
810+
}
811+
status_.Update(file->Close());
812+
if (!status_.ok()) {
813+
- Env::Default()->DeleteFile(metadata_path_).IgnoreError();
814+
+ Env::Default()->DeleteFile(metadata_path_, token).IgnoreError();
815+
return status_;
816+
} else if (use_temp_file_) {
817+
status_ = Env::Default()->RenameFile(metadata_path_, MetaFilename(prefix_));
818+
@@ -590,6 +593,8 @@ static Status MergeOneBundle(Env* env, StringPiece prefix,
819+
MergeState* merge_state) {
820+
VLOG(1) << "Merging bundle:" << prefix;
821+
const string filename = MetaFilename(prefix);
822+
+ TransactionToken* token=nullptr;
823+
+ env->GetTokenOrStartTransaction(string(filename),&token).IgnoreError();
824+
uint64 file_size;
825+
TF_RETURN_IF_ERROR(env->GetFileSize(filename, &file_size));
826+
std::unique_ptr<RandomAccessFile> file;
827+
@@ -690,7 +695,9 @@ Status MergeBundles(Env* env, gtl::ArraySlice<tstring> prefixes,
828+
// Merges all metadata tables.
829+
// TODO(zhifengc): KeyValue sorter if it becomes too big.
830+
MergeState merge;
831+
- Status status = env->CreateDir(string(io::Dirname(merged_prefix)));
832+
+ TransactionToken *token=nullptr;
833+
+ env->GetTokenOrStartTransaction(string(io::Dirname(prefixes[0])),&token);
834+
+ Status status = env->CreateDir(string(io::Dirname(merged_prefix)),token);
835+
if (!status.ok() && !errors::IsAlreadyExists(status)) return status;
836+
for (int i = 0; i < prefixes.size(); ++i) {
837+
TF_RETURN_IF_ERROR(MergeOneBundle(env, prefixes[i], &merge));
838+
@@ -708,7 +715,7 @@ Status MergeBundles(Env* env, gtl::ArraySlice<tstring> prefixes,
839+
// Writes the final metadata table under the merged prefix.
840+
std::unique_ptr<WritableFile> merged_metadata;
841+
TF_RETURN_IF_ERROR(
842+
- env->NewWritableFile(MetaFilename(merged_prefix), &merged_metadata));
843+
+ env->NewWritableFile(MetaFilename(merged_prefix), &merged_metadata,token));
844+
{
845+
table::TableBuilder builder(TableBuilderOptions(), merged_metadata.get());
846+
// Header entry.
740847

741-
With these final modifications, there is no need to carry transaction tokens through different compilation units or ops in the graph. For example checkpointing logic can be implemented in an atomic-like way by simply modifying the save and merge ops to use same transaction using accessed file names and directories.
848+
```
742849

743850
[filesystem_plugin]: https://github.com/tensorflow/community/blob/master/rfcs/20190506-filesystem-plugin-modular-tensorflow.md

0 commit comments

Comments
 (0)