Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.

Commit c22e495

Browse files
committed
Various updates
1 parent 389c14e commit c22e495

File tree

1 file changed

+41
-36
lines changed

1 file changed

+41
-36
lines changed

rfcs/20201121-keras-model-fit-ps.md

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ For future-compatibility of `model.fit` API where a `dataset_fn` may have a sign
151151

152152

153153
```
154-
def dataset_fn(input_context): return tf.data.Dataset.from_tensor_slices(...)
154+
def dataset_fn(input_context):
155+
return tf.data.Dataset.from_tensor_slices(...)
155156
history = model.fit(DatasetFactory(dataset_fn), epochs=..., steps_per_epoch=..., callbacks=[...])
156157
```
157158

@@ -209,11 +210,10 @@ Since `ClusterCoordinator` instance spins off worker and failure handling thread
209210

210211
```
211212
class ClusterCoordinator(object):
212-
instance = None
213-
def __new__(cls):
214-
if not ClusterCoordinator.instance:
215-
ClusterCoordinator.instance = super(ClusterCoordinator, cls).__new__(cls)
216-
return ClusterCoordinator.instance
213+
def __new__(cls, strategy):
214+
if not strategy.cluster_coordinator: # TODO: Needs a lock for thread-safety
215+
strategy.cluster_coordinator = super(ClusterCoordinator, cls).__new__(cls)
216+
return strategy.cluster_coordinator
217217
```
218218

219219
Being a singleton is important considering there are power users who would like to `schedule` functions themselves in addition to `model.fit` usage. That is, they can instantiate one before `model.fit` does, or use one after `model.fit` has instantiated one. In either case, they should access the same `ClusterCoordinator` instance.
@@ -236,9 +236,8 @@ class Model(...):
236236
237237
def fit(self, ...):
238238
if (self.distribute_strategy.should_use_with_coordinator() and
239-
not self.distribute_strategy._cluster_coordinator):
240-
self.distribute_strategy._cluster_coordinator = \
241-
cluster_coordinator.ClusterCoordinator(self.distribute_strategy)
239+
not self.distribute_strategy.cluster_coordinator):
240+
cluster_coordinator.ClusterCoordinator(self.distribute_strategy)
242241
... # the rest of fit
243242
244243
```
@@ -263,9 +262,9 @@ class Model(...):
263262
264263
self.train_function = ...
265264
266-
if self._cluster_coordinator:
265+
if self.distribute_strategy.cluster_coordinator:
267266
# Note that `train_function` has to be a `tf.function`.
268-
self.train_function = lambda distributed_iterator: self._cluster_coordinator.schedule(
267+
self.train_function = lambda distributed_iterator: self.distribute_strategy.cluster_coordinator.schedule(
269268
train_function, args=(distributed_iterator,))
270269
271270
return self.train_function
@@ -292,22 +291,23 @@ class ClusterCoordinatorDataHandler(DataHandler):
292291
def per_worker_dataset_fn():
293292
return strategy.distribute_datasets_from_function(x)
294293
295-
self._dataset = self._model._cluster_coordinator.create_per_worker_dataset(
296-
per_worker_dataset_fn)
294+
coordinator = self._model.distribute_strategy.cluster_coordinator
295+
self._dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
296+
297297
if steps_per_epoch is None:
298298
raise RuntimeError(
299299
"Steps per epoch must be specified with `ParameterServerStrategy`.")
300300
self._inferred_steps = steps_per_epoch
301301
302302
def sync(self):
303-
self._model._cluster_coordinator.join()
303+
self._model.distribute_strategy.cluster_coordinator.join()
304304
305305
def resolve_logs(self, logs):
306306
return logs.fetch()
307307
```
308308

309309

310-
And, in the existing `DataHandler`,
310+
And, in the existing `DataHandler` (note that `_configure_dataset_and_inferred_steps` and `resolve_logs` are newly created methods):
311311

312312

313313
```
@@ -337,7 +337,7 @@ The `DataHandler` `model.fit` uses depends on whether or not it is using a `Clus
337337

338338
```
339339
def get_data_handler(*args, **kwargs):
340-
if model._cluster_coordinator:
340+
if model.distribute_strategy.cluster_coordinator:
341341
return ClusterCoordinatorDataHandler(*args, **kwargs)
342342
return DataHandler(*args, **kwargs)
343343
```
@@ -486,13 +486,12 @@ Similarly, the hyper and slot variables an `optimizer` object uses, would be cre
486486

487487
Initially, we aim to have `model.evaluate` and `model.predict` to only be carried out on the coordinator. That is, it does not involve distribution via a `ClusterCoordinator`, and thus the evaluate function is executed on the coordinator.
488488

489-
In the longer term, we seek distributed support for `model.evaluate`, where the evaluate function is scheduled onto the workers to execute. Visitation guarantee cannot be supported currently with the parameter server training API, so we can implement distributed evaluation without it, or wait until that is supported, and integrate it. Things possibly involved with distributed `model.evaluate` include:
489+
In the longer term, we seek distributed support for `model.evaluate`, where the evaluate function is scheduled onto the workers to execute. The current `ClusterCoordinator` API has a limitation where distributed evaluation does not have visitation guarantee, when workers can become unavailable. Thus, we have a couple of options:
490490

491-
* support for local variables
492-
* support for local resources
493-
* efficient skipping of dataset batches or `dataset.shard` can be tf.function'ed
491+
1. Implement distributed `model.evaluate` without visitation guarantee, but require user's opt-in because of the behavior change (by `model.evaluate(..., distributed_eval=True)`)
492+
2. Support distributed `model.evaluate` only after `ClusterCoordinator` provides visitation guarantee mechanism
494493

495-
With those, we do not expect an API change at `model.fit` level, but if we do encounter something that results in a change, it is reasonable to add an argument `model.fit(distribute_eval=...)`.
494+
Note that similar to the dataset factory change for `model.fit`, validation dataset will also need to be a function. That is, `model.fit` will take a `validation_data_fn` instead of a `validation_data`, and `model.evaluate` will take a `dataset_fn` as opposed to a `dataset` instance.
496495

497496
See below “Evaluation” section for other proposed evaluation solutions accompanying `model.fit` usage.
498497

@@ -535,15 +534,15 @@ In addition to the existing train-evaluate solution provided by `model.fit`, we
535534

536535
#### Built-in, alternating evaluation in `model.fit`
537536

538-
If `validation_data` argument is provided, and certain conditions are satisfied, `model.fit` also runs evaluation via `model.evaluate` API every epoch, in an train-evaluate alternating manner. As described above, at this time, only the coordinator is used for `model.evaluate` evaluation, and we plan to extend this to worker-distributed evaluation when visitation guarantee is supported.
537+
If `validation_data` argument is provided, and certain conditions are satisfied, `model.fit` also runs evaluation via `model.evaluate` API every epoch, in an train-evaluate alternating manner. As described above, at this time, only the coordinator is used for `model.evaluate` evaluation, and we plan to extend this to worker-distributed evaluation when visitation guarantee is supported. See above "model.evaluate" section for more information.
539538

540539
#### Sidecar evaluation
541540

542-
In addition to the built-in evaluation `model.fit` provides, sidecar evaluation is also supported with a [recommended user flow](https://www.tensorflow.org/tutorials/distribute/parameter_server_training#side-car_evaluation).
541+
In addition to the built-in evaluation `model.fit` provides, sidecar evaluation is also supported. Currently, we have a [recommended user flow](https://www.tensorflow.org/tutorials/distribute/parameter_server_training#side-car_evaluation) using a sidecar evaluator task for CTL users. The section discusses the proposed changes in sidecar evaluator accompanying `model.fit` usage with parameter server training.
543542

544-
##### SidecarEvaluator API
543+
##### A sidecar evaluator task
545544

546-
We plan to propose a `SidecarEvaluator` API in a separate RFC for user’s convenience: with this, user is expected to kick start an additional task `evaluator`, in which the python program runs a `SidecarEvaluator` as follows:
545+
In the short term, a task that is allocated for evaluation (aka sidecar evaluator) continues to be the recommended evaluation solution for PS training. We plan to propose a `SidecarEvaluator` API in a separate RFC for user’s convenience: with this, user is expected to kick start an additional task `evaluator`, in which the python program runs a `SidecarEvaluator` as follows:
547546

548547

549548
```
@@ -571,7 +570,9 @@ SidecarEvaluator(
571570

572571
##### A sidecar evaluation thread on coordinator
573572

574-
A potentially more seamless and encapsulated sidecar evaluation, where the user is not required to allocate an evaluator task or run separate code, can be done with an evaluation thread on the coordinator. This thread would remotely execute an evaluation function on a worker, and wait for its result synchronously. Once the result is returned, it can write a summary, adjust learning rate, or signal to end the training. Then, it re-`schedule`s an evaluation function, and so on:
573+
A potentially more seamless and encapsulated sidecar evaluation, where the user is not required to allocate an evaluator task or run separate code (for evaluation), can be done with an evaluation thread on the coordinator. With this approach, the user does not allocate a task with type 'evaluator', because one 'worker' task (that runs a `tf.distribute.Server`) from the cluster can be used for evaluation. It can be any of the workers, but for convenience, let’s say the Nth worker is used for evaluation.
574+
575+
The thread would be started by `model.fit`, if the user expresses to opt in via an argument such as `fit(..., run_sidecar_eval_thread=True)`. The thread would remotely execute an evaluation function on this worker #N, and wait for its result synchronously. Once the result is returned, it can write a summary, adjust learning rate, or signal to end the training. After that, it re-`schedule`s an evaluation function, and so on:
575576

576577
```
577578
class Model(...):
@@ -595,22 +596,26 @@ class Model(...):
595596
tmp_logs = self.test_function(iterator)
596597
... # Callbacks, etc.
597598
598-
def fit(self, ...):
599-
# At some point, we start a thread for sidecar eval
600-
t = threading.Thread(target=self._continuously_evaluate)
601-
t.start()
602-
...
603-
self.should_eval = False
604-
t.join()
599+
def fit(self, ..., run_sidecar_eval_thread=False):
600+
if run_sidecar_eval_thread:
601+
# At some point, we start a thread for sidecar eval
602+
t = threading.Thread(target=self._continuously_evaluate)
603+
t.start()
604+
...
605+
if run_sidecar_eval_thread:
606+
self.should_eval = False
607+
t.join()
605608
```
606609

610+
Note that with this approach, the training cluster will be limited to the first N-1 workers it has remaining, so the training cluster and evaluation do not block each other.
611+
607612
If we compare the sidecar evaluator thread solution vs sidecar evaluator task (process):
608613

609-
Pros:
610-
* This does not require a task to be set aside as evaluator
614+
Pros (advantages of evaluator thread approach):
615+
* This does not require a task to be set aside as evaluator, so 1) less work on the user, and 2) there is one fewer version of python binary
611616
* There is easier communication between the sidecar evaluator (thread) and the coordinator main thread, which is important for many callbacks
612617

613-
Cons:
618+
Cons (disadvantages of evaluator thread approach):
614619
* This solution presents a challenge when workers can easily become unavailable, in which case it is not straightforward to immediately find another available worker to take over*
615620
* This solution is blocked on `tf.keras.models.load_model` being available on PS, if `variable_partitioner` is used. Here, model saving and loading are for cloning the model, so if there is an alternative to clone, this solution is not blocked.
616621
* Users who can afford to allocate a high priority on an evaluator task cannot do so with workers; workers would simply have the same, usually lower, priority (and thus more frequent function-takeovers)

0 commit comments

Comments
 (0)