Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.

Commit a6e7227

Browse files
committed
Expand vistation guarantees.
Update the proposal to support exactly-once visitation even when the service is executing non-deterministically. Also, add discussion of the visitation guarantees provided when the dataset produces outputs non-deterministically.
1 parent a575f85 commit a6e7227

File tree

1 file changed

+59
-52
lines changed

1 file changed

+59
-52
lines changed

rfcs/20200113-tf-data-service.md

Lines changed: 59 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
# Distributed tf.data service
22

3-
| Status | Proposed |
3+
| Status | Proposed |
44
| :------------ | :------------------------------------------------------ |
55
| **RFC #** | [195](https://github.com/tensorflow/community/pull/195) |
6-
| **Author(s)** | Andrew Audibert ([email protected]) Rohan Jain ([email protected]) |
6+
| **Author(s)** | Andrew Audibert ([email protected]) Rohan Jain |
7+
78
| **Sponsor** | Jiri Simsa ([email protected]) |
89
| **Updated** | 2019-01-09 |
910

@@ -62,7 +63,9 @@ utilization for valuable accelerator resources, reducing total cost.
6263
Today, the tf.distribute API statically shards data across accelerators. This
6364
can lead to suboptimal utilization because some shards may contain more data
6465
than others. The tf.data service provides a mechanism for dynamically sharding,
65-
reducing the data imbalance across accelerators.
66+
reducing the data imbalance across accelerators. Note that dynamic load
67+
balancing and deterministic output are mutually exclusive; if users require
68+
deterministic output, they must trade off dynamic load balancing.
6669

6770
### Visitation guarantees
6871

@@ -154,7 +157,7 @@ def tf.data.experimental.service.create_iteration(
154157
if consumer_index == 0:
155158
# The iteration object is a byte array which needs to be shared among all
156159
# consumers. Here we suppose there are broadcast_send and broadcast_recv
157-
# method available.
160+
# methods available.
158161
iteration_id = tf.data.experimental.service.create_iteration(ds, address, 3)
159162
broadcast_send(iteration_id)
160163
else:
@@ -373,14 +376,26 @@ list<Tensors> GetElement(iterator_id);
373376
void ProcessDataset(int dataset_id, int iteration_id, list<int> iterator_ids);
374377
```
375378
376-
#### Visitation Guarantees
379+
#### Visitation Guarantee
377380
378-
When iterating over a dataset, the tf.data service will process all input data
379-
at least once, even in the presence of master or worker failures. If there are
380-
no failures, all input data will be processed exactly once.
381+
When iterating over a deterministic dataset, the tf.data service will process
382+
all input data exactly once, even in the presence of master or worker failures.
383+
We achieve exactly-once by having consumers keep track of their index within
384+
each task, and having restored tasks skip elements to reach the requested index.
385+
For the skipping to give exactly-once semantics, the dataset must produce
386+
outputs deterministically.
381387
382-
With determinstic execution enabled, the tf.data service provides an
383-
exactly-once visitation guarantee even in the face of master or worker failures.
388+
If the dataset is not deterministic, the user can choose either at-least-once or
389+
a close-to-exactly-once visitation guarantee. We can achieve
390+
close-to-exactly-once by using the same skipping technique that we use to
391+
achieve exactly-once for deterministic datasets. If users prefer an
392+
at-least-once guarantee, we can instead start restored tasks from their latest
393+
checkpoint.
394+
395+
In some cases, we can provide an exactly-once visitation guarantee to
396+
non-deterministic pipelines. If input workers are brought down gracefully, they
397+
can first write checkpoints of their tasks. This way, tasks can begin exactly
398+
where they left off.
384399
385400
#### Determinism
386401
@@ -404,10 +419,7 @@ element for consumer `i`.
404419
To provide determinism even when servers fail, consumers can keep track of which
405420
element index they have processed up to for each task. Input workers would
406421
attach per-task element indices when they produce elements, so consumers can
407-
ignore duplicate elements caused by worker restarts. We will use an analogous
408-
mechanism to avoid re-processing the same split in case of master falure. Input
409-
workers will track the split index of splits as they receive them, and ignore
410-
duplicate splits.
422+
ignore duplicate elements caused by worker restarts.
411423
412424
#### Failure Recovery
413425
@@ -427,29 +439,21 @@ The unrecoverable state includes
427439
* **dataset id** for the iterated dataset so that we can recover the
428440
iteration's split generator
429441
* **iteration id**
430-
* **participating worker ids**, so that we can send splits to the correct
431-
workers.
442+
* **assignments from splits to tasks**, so that we can restart failed
443+
tasks on new workers.
432444
433445
Recoverable state includes
434446
435447
* **Split generators**: Recoverable from our information about in-progress
436448
iterations.
437449
* **Worker addresses**: Recoverable when workers reconnect.
438450
* **Worker loads**: Recoverable when workers reconnect.
439-
* **Assignment from splits to workers**: Recoverable when workers reconnect.
440-
* **Outstanding splits**: Recoverable by re-running split generators from
441-
their checkpoint state.
451+
* **Assignment from tasks to workers**: Recoverable when workers reconnect.
442452
443453
To improve recovery time, the master will periodically write checkpoints of its
444454
split generators and outstanding splits, so that split generators don't need to
445455
be run from the beginning during master recovery.
446456
447-
A concern with the above recovery strategy is that a master could transmit a
448-
split before crashing, then restart and transmit the same split again. To avoid
449-
this duplication, the master attaches a split index to every split it sends to a
450-
worker. When workers reconnect, they inform the master of their latest split
451-
index.
452-
453457
Workers have no unrecoverable state. If a worker crashes, a new worker can take
454458
its place. It is up to the master to reassign splits from the crashed worker to
455459
the new worker.
@@ -461,9 +465,9 @@ from.
461465
462466
We will read and write this state through a MasterState interface which can be
463467
implemented using various storage backends. For use cases that require fault
464-
tolerance, the user must configure a fault-tolerant MasterState, e.g. Spanner
465-
internally, Cloud Spanner in GCP, or etcd externally. If fault tolerance isn't
466-
required, the user could configure state to be held in memory only.
468+
tolerance, the user must configure a fault-tolerant MasterState, e.g. Cloud
469+
Spanner or etcd. If fault tolerance isn't required, the user could configure
470+
state to be held in memory only.
467471
468472
#### Leadership Transfer
469473
@@ -478,24 +482,27 @@ ZooKeeper cluster, and it would also require adding a new dependency on a
478482
ZooKeeper client.
479483
480484
What TensorFlow does have is a FileSystem API. We will leverage this API to
481-
perform leadership transfer as follows:
482-
483-
1. The first master will create a file named "master_seqno_0". If it
484-
successfully creates the file, it will consider itself the leader.
485-
1. The leader master will check every N milliseconds that the "master_seqno"
486-
file it created still exists. If the file no longer exists, the master will
487-
cease operation immediately.
488-
1. When a master thinks it should be leader, it attempts to atomically rename
489-
the master_seqno_n file to master_seqno_n+1. If this succeeds, the master
490-
will wait (N + M) milliseconds, verify that its renamed file still exists,
491-
and begin acting as leader. This gives the previous leader time to notice
492-
the rename.
493-
494-
The above scheme relies on rename being atomic so that two masters don't both
495-
succeed at renaming the same file. Users may opt to use a filesystem that
496-
doesn't support atomic rename, but they do so at the (unlikely) risk of two
497-
concurrently running masters thinking they are leader. Common filesystems such
498-
as Posix and HDFS support atomic rename.
485+
perform leadership transfer by creating empty files and inspecting file
486+
modification times.
487+
488+
```
489+
files = list_directory(leadership_directory)
490+
if all_files_older_than(files, leadership_transfer_interval):
491+
file = create_unique_file(leadership_directory);
492+
if file_is_strictly_newest(file, leadership_directory):
493+
become_leader()
494+
# Another master may be leader. Wait for some time before trying again.
495+
wait_random_interval()
496+
```
497+
498+
The leader master will periodically write files to the leadership directory to
499+
indicate that it is still leading.
500+
501+
The above scheme relies on the filesystem's create_file() and list() operations
502+
being strongly consistent . Users may opt to use a filesystem that doesn't
503+
support strong consistency, but they do so at the risk of two concurrently
504+
running masters thinking they are leader. Common filesystems such as Posix,
505+
HDFS, and GCS support such strong consistency.
499506
500507
#### Caveats
501508
@@ -507,12 +514,12 @@ the tf.data service.
507514
relies on the order of the input files, the user's assumptions will be
508515
violated when splitting causes each input worker to process only a subset of
509516
the input files.
510-
- If a particular dataset operation doesn't support splitting, it must be moved
511-
after the part of the dataset which is distributed. Alternately, the user could
512-
set num_tasks=1 to avoid the need for splitting, but this will have a heavy
513-
performance cost since it only allows a single worker to generate dataset
514-
elements. The most commonly used but unsupported datasets are
515-
`from_generator` and `zip`.
517+
- If a particular dataset operation doesn't support splitting, it must be
518+
moved after the part of the dataset which is distributed. Alternately, the
519+
user could set num_tasks=1 to avoid the need for splitting, but this will
520+
have a heavy performance cost since it only allows a single worker to
521+
generate dataset elements. The most commonly used but unsupported datasets
522+
are `from_generator` and `zip`.
516523
517524
### Alternatives Considered
518525

0 commit comments

Comments
 (0)