Skip to content

Commit 5217774

Browse files
authored
Add an example for DeepFM estimator in deepctr (#2526)
* Add an example for DeepFM estimator in deepctr * Add a hook to report batch done * Add a tutorial for DeepCTR estimator models * Fix by comments * Fix by comments
1 parent 87934a7 commit 5217774

File tree

4 files changed

+372
-0
lines changed

4 files changed

+372
-0
lines changed
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
# Distributed Training of DeepCTR Estimator using ElasticDL on Kubernetes
2+
3+
This document shows how to run a distributed training job of a deepctr
4+
estimator model (DeepFM) using [ElasticDL](https://github.com/sql-machine-learning/elasticdl)
5+
on Kubernetes.
6+
7+
## Prerequisites
8+
9+
1. Install Minikube, preferably >= v1.11.0, following the installation
10+
[guide](https://kubernetes.io/docs/tasks/tools/install-minikube). Minikube
11+
runs a single-node Kubernetes cluster in a virtual machine on your personal
12+
computer.
13+
14+
1. Install Docker CE, preferably >= 18.x, following the
15+
[guide](https://docs.docker.com/docker-for-mac/install/) for building Docker
16+
images containing user-defined models and the ElasticDL framework.
17+
18+
1. Install Python, preferably >= 3.6, because the ElasticDL command-line tool is
19+
in Python.
20+
21+
## Models
22+
23+
In this tutorial, we use a [DeepFM estimator](https://github.com/shenweichen/DeepCTR/blob/master/deepctr/estimator/models/deepfm.py)
24+
model in DeepCTR. The complete program to train the model with the
25+
dataset definition is in [ElasticDL model zoo](https://github.com/sql-machine-learning/elasticdl/tree/develop/model_zoo/deepctr).
26+
27+
## Dataset
28+
29+
In this tutorial, We use the [criteo dataset](https://github.com/shenweichen/DeepCTR/blob/master/examples/criteo_sample.txt)
30+
in DeepCTR examples.
31+
32+
```bash
33+
mkdir ./data
34+
wget https://github.com/shenweichen/DeepCTR/blob/master/examples/criteo_sample.txt -O ./data/criteo_sample.txt
35+
```
36+
37+
## The Kubernetes Cluster
38+
39+
The following command starts a Kubernetes cluster locally using Minikube. It
40+
uses [VirtualBox](https://www.virtualbox.org/), a hypervisor coming with
41+
macOS, to create the virtual machine cluster.
42+
43+
```bash
44+
minikube start --vm-driver=virtualbox \
45+
--cpus 2 --memory 6144 --disk-size=50gb
46+
eval $(minikube docker-env)
47+
```
48+
49+
The command `minikube docker-env` returns a set of Bash environment variable
50+
to configure your local environment to re-use the Docker daemon inside
51+
the Minikube instance.
52+
53+
The following command is necessary to enable
54+
[RBAC](https://kubernetes.io/docs/reference/access-authn-authz/rbac/) of
55+
Kubernetes.
56+
57+
```bash
58+
kubectl apply -f \
59+
https://raw.githubusercontent.com/sql-machine-learning/elasticdl/develop/elasticdl/manifests/elasticdl-rbac.yaml
60+
```
61+
62+
If you happen to live in a region where `raw.githubusercontent.com` is banned,
63+
you might want to Git clone the above repository to get the YAML file.
64+
65+
## Install ElasticDL Client Tool
66+
67+
The following command installs the command line tool `elasticdl`, which talks to
68+
the Kubernetes cluster and operates ElasticDL jobs.
69+
70+
```bash
71+
pip install elasticdl_client
72+
```
73+
74+
## Build the Docker Image with Model Definition
75+
76+
Kubernetes runs Docker containers, so we need to put user-defined models,
77+
the ElasticDL api package and all dependencies into a Docker image.
78+
79+
In this tutorial, we use a complete program using a DeepFM estimator model of DeepCTR
80+
in the ElasticDL repository. To retrieve the source code, please run the following command.
81+
82+
```bash
83+
git clone https://github.com/sql-machine-learning/elasticdl
84+
```
85+
86+
Complete codes are in directory [elasticdl/model_zoo/deepctr](https://github.com/sql-machine-learning/elasticdl/tree/develop/model_zoo/deepctr).
87+
88+
We build the image based on tensorflow:1.13.2 and the dockerfile
89+
is
90+
91+
```text
92+
FROM tensorflow/tensorflow:1.13.2-py3 as base
93+
94+
RUN pip install elasticdl_api
95+
RUN pip install deepctr
96+
97+
COPY ./model_zoo model_zoo
98+
```
99+
100+
Then, we use docker to build the image
101+
102+
```bash
103+
docker build -t elasticdl:deepctr_estimator -f ${deepctr_dockerfile} .
104+
```
105+
106+
## Submit the Training Job
107+
108+
The following command submits a training job:
109+
110+
```bash
111+
elasticdl train \
112+
--image_name=elasticdl/elasticdl:1.0.0 \
113+
--worker_image=elasticdl:deepctr_estimator \
114+
--ps_image=elasticdl:deepctr_estimator \
115+
--job_command="python -m model_zoo.deepctr.deepfm_estimator --training_data=/data/criteo_sample.txt --validation_data=/data/criteo_sample.txt" \
116+
--num_workers=1 \
117+
--num_ps=1 \
118+
--num_evaluator=1 \
119+
--master_resource_request="cpu=0.2,memory=1024Mi" \
120+
--master_resource_limit="cpu=1,memory=2048Mi" \
121+
--ps_resource_request="cpu=0.2,memory=1024Mi" \
122+
--ps_resource_limit="cpu=1,memory=2048Mi" \
123+
--worker_resource_request="cpu=0.3,memory=1024Mi" \
124+
--worker_resource_limit="cpu=1,memory=2048Mi" \
125+
--chief_resource_request="cpu=0.3,memory=1024Mi" \
126+
--chief_resource_limit="cpu=1,memory=2048Mi" \
127+
--evaluator_resource_request="cpu=0.3,memory=1024Mi" \
128+
--evaluator_resource_limit="cpu=1,memory=2048Mi" \
129+
--job_name=test-deepfm-estimator \
130+
--distribution_strategy=ParameterServerStrategy \
131+
--need_tf_config=true \
132+
--volume="host_path={criteo_data_path},mount_path=/data" \
133+
```
134+
135+
`--image_name` is the image to launch the ElasticDL master which
136+
has nothing to do with the estimator model. The ElasticDL master is
137+
responsible for launching pod and assigning data shards to workers with
138+
elasticity and fault-tolerance.
139+
140+
`{criteo_data_path}` is the absolute path of the `./data` with `criteo_sample.txt`.
141+
Here, the option `--volume="host_path={criteo_data_path},mount_path=/data"`
142+
bind mount it into the containers/pods.
143+
144+
The option `--num_workers=1` tells the master to start a worker pod.
145+
The option `--num_ps=1` tells the master to start a ps pod.
146+
The option `--num_evaluator=1` tells the master to start an evaluator pod.
147+
148+
And the master will start a chief worker for a TensorFlow estimator model by default.
149+
150+
### Check Job Status
151+
152+
After the job submission, we can run the command `kubectl get pods` to list
153+
related containers.
154+
155+
```bash
156+
NAME READY STATUS RESTARTS AGE
157+
elasticdl-test-deepctr-estimator-master 1/1 Running 0 9s
158+
test-deepctr-estimator-edljob-chief-0 1/1 Running 0 6s
159+
test-deepctr-estimator-edljob-evaluator-0 0/1 Pending 0 6s
160+
test-deepctr-estimator-edljob-ps-0 1/1 Running 0 7s
161+
test-deepctr-estimator-edljob-worker-0 1/1 Running 0 6s
162+
```
163+
164+
We can view the log of workers by `kubectl logs test-deepctr-estimator-edljob-chief-0`.
165+
166+
```text
167+
INFO:tensorflow:global_step/sec: 4.84156
168+
INFO:tensorflow:global_step/sec: 4.84156
169+
INFO:tensorflow:Saving checkpoints for 203 into /data/ckpts/model.ckpt.
170+
INFO:tensorflow:Saving checkpoints for 203 into /data/ckpts/model.ckpt.
171+
INFO:tensorflow:global_step/sec: 7.05433
172+
INFO:tensorflow:global_step/sec: 7.05433
173+
```
File renamed without changes.

elasticai_api/tensorflow/hooks.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Copyright 2021 The ElasticDL Authors. All rights reserved.
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
import tensorflow as tf
15+
16+
from elasticai_api.util.log_utils import default_logger as logger
17+
18+
19+
class ElasticDataShardReportHook(tf.train.SessionRunHook):
20+
def __init__(self, data_shard_service) -> None:
21+
self._data_shard_service = data_shard_service
22+
23+
def after_run(self, run_context, run_values):
24+
try:
25+
self._data_shard_service.report_batch_done()
26+
except Exception as ex:
27+
logger.error("elastic_ai: report batch done failed: %s", ex)

model_zoo/deepctr/deepfm_estimator.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
# Copyright 2021 The ElasticDL Authors. All rights reserved.
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
import argparse
15+
import csv
16+
import os
17+
18+
import tensorflow as tf
19+
from deepctr.estimator.models import DeepFMEstimator
20+
21+
from elasticai_api.common.data_shard_service import DataShardService
22+
from elasticai_api.tensorflow.hooks import ElasticDataShardReportHook
23+
24+
tf.logging.set_verbosity(tf.logging.INFO)
25+
26+
27+
def read_csv(file_path):
28+
rows = []
29+
with open(file_path) as csvfile:
30+
spamreader = csv.reader(csvfile)
31+
for i, row in enumerate(spamreader):
32+
if i > 0:
33+
row_values = []
34+
row_values.append(int(row[0]))
35+
for i in range(1, 14):
36+
value = row[i] if row[i] else 0
37+
row_values.append(float(value))
38+
row_values.extend(row[14:])
39+
rows.append(row_values)
40+
return rows
41+
42+
43+
def train_generator(data_path, shard_service):
44+
rows = read_csv(data_path)
45+
while True:
46+
# Read samples by the range of the shard from
47+
# the data shard serice.
48+
shard = shard_service.fetch_shard()
49+
if not shard:
50+
break
51+
for i in range(shard.start, shard.end):
52+
yield tuple(rows[i])
53+
54+
55+
def eval_generator(data_path):
56+
rows = read_csv(data_path)
57+
for row in rows:
58+
yield tuple(row)
59+
60+
61+
def input_fn(sample_generator, batch_size, dense_features, sparse_features):
62+
output_types = tuple(
63+
[tf.int32]
64+
+ [tf.float32 for i in dense_features]
65+
+ [tf.string for i in sparse_features]
66+
)
67+
dataset = tf.data.Dataset.from_generator(
68+
sample_generator, output_types=output_types,
69+
)
70+
dataset = dataset.shuffle(100).batch(batch_size)
71+
values = dataset.make_one_shot_iterator().get_next()
72+
73+
label_value = values[0]
74+
feature_values = {}
75+
feature_index = 1
76+
for feature in dense_features:
77+
feature_values[feature] = values[feature_index]
78+
feature_index += 1
79+
80+
for feature in sparse_features:
81+
feature_values[feature] = values[feature_index]
82+
feature_index += 1
83+
return feature_values, label_value
84+
85+
86+
def arg_parser():
87+
parser = argparse.ArgumentParser(description="Process training parameters")
88+
parser.add_argument("--training_data", type=str, required=True)
89+
parser.add_argument(
90+
"--validation_data", type=str, default="", required=False
91+
)
92+
return parser
93+
94+
95+
if __name__ == "__main__":
96+
parser = arg_parser()
97+
args = parser.parse_args()
98+
99+
training_data = args.training_data
100+
validation_data = args.validation_data
101+
102+
model_dir = "/data/ckpts/"
103+
os.makedirs(model_dir, exist_ok=True)
104+
105+
sparse_features = ["C" + str(i) for i in range(1, 27)]
106+
dense_features = ["I" + str(i) for i in range(1, 14)]
107+
108+
dnn_feature_columns = []
109+
linear_feature_columns = []
110+
111+
for i, feat in enumerate(sparse_features):
112+
dnn_feature_columns.append(
113+
tf.feature_column.embedding_column(
114+
tf.feature_column.categorical_column_with_hash_bucket(
115+
feat, 1000
116+
),
117+
4,
118+
)
119+
)
120+
linear_feature_columns.append(
121+
tf.feature_column.categorical_column_with_hash_bucket(feat, 1000)
122+
)
123+
for feat in dense_features:
124+
dnn_feature_columns.append(tf.feature_column.numeric_column(feat))
125+
linear_feature_columns.append(tf.feature_column.numeric_column(feat))
126+
127+
batch_size = 64
128+
129+
config = tf.estimator.RunConfig(
130+
model_dir=model_dir, save_checkpoints_steps=100, keep_checkpoint_max=3
131+
)
132+
model = DeepFMEstimator(
133+
linear_feature_columns,
134+
dnn_feature_columns,
135+
task="binary",
136+
config=config,
137+
)
138+
139+
# Create a data shard service which can split the dataset
140+
# into shards.
141+
rows = read_csv(training_data)
142+
training_data_shard_svc = DataShardService(
143+
batch_size=batch_size,
144+
num_epochs=100,
145+
dataset_size=len(rows),
146+
num_minibatches_per_shard=1,
147+
dataset_name="iris_training_data",
148+
)
149+
150+
def train_input_fn():
151+
return input_fn(
152+
lambda: train_generator(training_data, training_data_shard_svc),
153+
batch_size,
154+
dense_features,
155+
sparse_features,
156+
)
157+
158+
def eval_input_fn():
159+
return input_fn(
160+
lambda: eval_generator(validation_data),
161+
batch_size,
162+
dense_features,
163+
sparse_features,
164+
)
165+
166+
hooks = [
167+
ElasticDataShardReportHook(training_data_shard_svc),
168+
]
169+
train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, hooks=hooks)
170+
eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn)
171+
172+
tf.estimator.train_and_evaluate(model, train_spec, eval_spec)

0 commit comments

Comments
 (0)