Skip to content

Commit 7bfeb61

Browse files
leewyangtmielika
authored andcommitted
example for inferencing from saved_model; add util.single_node_env() (#377)
1 parent 64291fc commit 7bfeb61

File tree

6 files changed

+205
-84
lines changed

6 files changed

+205
-84
lines changed

examples/mnist/tf/README.md

Lines changed: 5 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,39 +9,6 @@
99
# hdfs dfs -rm -r mnist_model
1010
# hdfs dfs -rm -r predictions
1111

12-
${SPARK_HOME}/bin/spark-submit \
13-
--master yarn \
14-
--deploy-mode cluster \
15-
--queue ${QUEUE} \
16-
--num-executors 4 \
17-
--executor-memory 27G \
18-
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/tf/mnist_dist_dataset.py \
19-
--conf spark.dynamicAllocation.enabled=false \
20-
--conf spark.yarn.maxAppAttempts=1 \
21-
--archives hdfs:///user/${USER}/Python.zip#Python \
22-
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA:$LIB_JVM:$LIB_HDFS \
23-
--driver-library-path=$LIB_CUDA \
24-
TensorFlowOnSpark/examples/mnist/tf/mnist_spark_dataset.py \
25-
${TF_ROOT}/${TF_VERSION}/examples/mnist/tf/mnist_spark_dataset.py \
26-
--images_labels mnist/csv2/train \
27-
--format csv2 \
28-
--mode train \
29-
--model mnist_model
30-
31-
# to use inference mode, change `--mode train` to `--mode inference` and add `--output predictions`
32-
# one item in csv2 format is `image | label`, to use input data in TFRecord format, change `--format csv` to `--format tfr`
33-
# to use infiniband, add `--rdma`
34-
```
35-
36-
### _using QueueRunners_
37-
```bash
38-
# for CPU mode:
39-
# export QUEUE=default
40-
# remove references to $LIB_CUDA
41-
42-
# hdfs dfs -rm -r mnist_model
43-
# hdfs dfs -rm -r predictions
44-
4512
${SPARK_HOME}/bin/spark-submit \
4613
--master yarn \
4714
--deploy-mode cluster \
@@ -55,16 +22,14 @@ ${SPARK_HOME}/bin/spark-submit \
5522
--conf spark.executorEnv.LD_LIBRARY_PATH=$LIB_CUDA:$LIB_JVM:$LIB_HDFS \
5623
--driver-library-path=$LIB_CUDA \
5724
TensorFlowOnSpark/examples/mnist/tf/mnist_spark.py \
58-
--images mnist/tfr/train/images \
59-
--labels mnist/tfr/train/labels \
60-
--format csv \
25+
--images_labels mnist/csv2/train \
26+
--format csv2 \
6127
--mode train \
6228
--model mnist_model
6329

6430
# to use inference mode, change `--mode train` to `--mode inference` and add `--output predictions`
65-
# to use input data in TFRecord format, change `--format csv` to `--format tfr`
31+
# one item in csv2 format is `image | label`, to use input data in TFRecord format, change `--format csv` to `--format tfr`
6632
# to use infiniband, add `--rdma`
67-
```
6833

6934
### _using Spark ML Pipeline_
7035
```bash
@@ -83,7 +48,7 @@ ${SPARK_HOME}/bin/spark-submit \
8348
--queue ${QUEUE} \
8449
--num-executors 4 \
8550
--executor-memory 27G \
86-
--jars hdfs:///user/${USER}/tensorflow-hadoop-1.0-SNAPSHOT.jar \
51+
--jars hdfs:///user/${USER}/tensorflow-hadoop-1.0-SNAPSHOT.jar \
8752
--py-files TensorFlowOnSpark/tfspark.zip,TensorFlowOnSpark/examples/mnist/tf/mnist_dist_pipeline.py \
8853
--conf spark.dynamicAllocation.enabled=false \
8954
--conf spark.yarn.maxAppAttempts=1 \
@@ -102,6 +67,6 @@ TensorFlowOnSpark/examples/mnist/tf/mnist_spark_pipeline.py \
10267
--inference_output predictions
10368
10469
# to use input data in TFRecord format, change `--format csv` to `--format tfr`
105-
# tensorflow-hadoop-1.0-SNAPSHOT.jar is needed for transforming csv input to TFRecord
70+
# tensorflow-hadoop-1.0-SNAPSHOT.jar is needed for transforming csv input to TFRecord
10671
# `--tfrecord_dir` is needed for temporarily saving dataframe to TFRecord on hdfs
10772
```

examples/mnist/tf/mnist_dist.py

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ def print_log(worker_num, arg):
1616

1717
def map_fun(args, ctx):
1818
from datetime import datetime
19+
from tensorflowonspark import TFNode
1920
import math
2021
import os
2122
import tensorflow as tf
@@ -54,6 +55,27 @@ def _parse_tfr(example_proto):
5455
label = tf.to_float(features['label'])
5556
return (image, label)
5657

58+
def build_model(graph, x):
59+
with graph.as_default():
60+
# Variables of the hidden layer
61+
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units],
62+
stddev=1.0 / IMAGE_PIXELS), name="hid_w")
63+
hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b")
64+
tf.summary.histogram("hidden_weights", hid_w)
65+
66+
# Variables of the softmax layer
67+
sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10],
68+
stddev=1.0 / math.sqrt(hidden_units)), name="sm_w")
69+
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
70+
tf.summary.histogram("softmax_weights", sm_w)
71+
72+
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
73+
hid = tf.nn.relu(hid_lin)
74+
75+
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
76+
prediction = tf.argmax(y, 1, name="prediction")
77+
return y, prediction
78+
5779
if job_name == "ps":
5880
server.join()
5981
elif job_name == "worker":
@@ -78,36 +100,21 @@ def _parse_tfr(example_proto):
78100
iterator = ds.make_one_shot_iterator()
79101
x, y_ = iterator.get_next()
80102

81-
# Variables of the hidden layer
82-
hid_w = tf.Variable(tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, hidden_units],
83-
stddev=1.0 / IMAGE_PIXELS), name="hid_w")
84-
hid_b = tf.Variable(tf.zeros([hidden_units]), name="hid_b")
85-
tf.summary.histogram("hidden_weights", hid_w)
86-
87-
# Variables of the softmax layer
88-
sm_w = tf.Variable(tf.truncated_normal([hidden_units, 10],
89-
stddev=1.0 / math.sqrt(hidden_units)), name="sm_w")
90-
sm_b = tf.Variable(tf.zeros([10]), name="sm_b")
91-
tf.summary.histogram("softmax_weights", sm_w)
103+
# Build core model
104+
y, prediction = build_model(tf.get_default_graph(), x)
92105

106+
# Add training bits
93107
x_img = tf.reshape(x, [-1, IMAGE_PIXELS, IMAGE_PIXELS, 1])
94108
tf.summary.image("x_img", x_img)
95109

96-
hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
97-
hid = tf.nn.relu(hid_lin)
98-
99-
y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
100-
101110
global_step = tf.train.get_or_create_global_step()
102111

103112
loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))
104113
tf.summary.scalar("loss", loss)
105114
train_op = tf.train.AdagradOptimizer(0.01).minimize(
106115
loss, global_step=global_step)
107116

108-
# Test trained model
109117
label = tf.argmax(y_, 1, name="label")
110-
prediction = tf.argmax(y, 1, name="prediction")
111118
correct_prediction = tf.equal(prediction, label)
112119
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name="accuracy")
113120
tf.summary.scalar("acc", accuracy)
@@ -117,8 +124,10 @@ def _parse_tfr(example_proto):
117124
init_op = tf.global_variables_initializer()
118125

119126
# Create a "supervisor", which oversees the training process and stores model state into HDFS
120-
logdir = ctx.absolute_path(args.model)
121-
print("tensorflow model path: {0}".format(logdir))
127+
model_dir = ctx.absolute_path(args.model)
128+
export_dir = ctx.absolute_path(args.export)
129+
print("tensorflow model path: {0}".format(model_dir))
130+
print("tensorflow export path: {0}".format(export_dir))
122131
summary_writer = tf.summary.FileWriter("tensorboard_%d" % worker_num, graph=tf.get_default_graph())
123132

124133
if args.mode == 'inference':
@@ -130,7 +139,7 @@ def _parse_tfr(example_proto):
130139
with tf.train.MonitoredTrainingSession(master=server.target,
131140
is_chief=(task_index == 0),
132141
scaffold=tf.train.Scaffold(init_op=init_op, summary_op=summary_op, saver=saver),
133-
checkpoint_dir=logdir,
142+
checkpoint_dir=model_dir,
134143
hooks=[tf.train.StopAtStepHook(last_step=args.steps)]) as sess:
135144
print("{} session ready".format(datetime.now().isoformat()))
136145

@@ -163,6 +172,41 @@ def _parse_tfr(example_proto):
163172

164173
print("{} stopping MonitoredTrainingSession".format(datetime.now().isoformat()))
165174

175+
# export model (on chief worker only)
176+
if args.mode == "train" and task_index == 0:
177+
tf.reset_default_graph()
178+
179+
# add placeholders for input images (and optional labels)
180+
x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS], name='x')
181+
y_ = tf.placeholder(tf.float32, [None, 10], name='y_')
182+
label = tf.argmax(y_, 1, name="label")
183+
184+
# add core model
185+
y, prediction = build_model(tf.get_default_graph(), x)
186+
187+
# restore from last checkpoint
188+
saver = tf.train.Saver()
189+
with tf.Session() as sess:
190+
ckpt = tf.train.get_checkpoint_state(model_dir)
191+
print("ckpt: {}".format(ckpt))
192+
assert ckpt, "Invalid model checkpoint path: {}".format(model_dir)
193+
saver.restore(sess, ckpt.model_checkpoint_path)
194+
195+
print("Exporting saved_model to: {}".format(export_dir))
196+
# exported signatures defined in code
197+
signatures = {
198+
tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: {
199+
'inputs': { 'image': x },
200+
'outputs': { 'prediction': prediction },
201+
'method_name': tf.saved_model.signature_constants.PREDICT_METHOD_NAME
202+
}
203+
}
204+
TFNode.export_saved_model(sess,
205+
export_dir,
206+
tf.saved_model.tag_constants.SERVING,
207+
signatures)
208+
print("Exported saved_model")
209+
166210
# WORKAROUND for https://github.com/tensorflow/tensorflow/issues/21745
167211
# wait for all other nodes to complete (via done files)
168212
done_dir = "{}/{}/done".format(ctx.absolute_path(args.model), args.mode)
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Copyright 2018 Yahoo Inc.
2+
# Licensed under the terms of the Apache 2.0 license.
3+
# Please see LICENSE file in the project root for terms.
4+
5+
# This example demonstrates how to leverage Spark for parallel inferencing from a SavedModel.
6+
#
7+
# Normally, you can use TensorFlowOnSpark to just form a TensorFlow cluster for training and inferencing.
8+
# However, in some situations, you may have a SavedModel without the original code for defining the inferencing
9+
# graph. In these situations, we can use Spark to instantiate a single-node TensorFlow instance on each executor,
10+
# where each executor can independently load the model and inference on input data.
11+
#
12+
# Note: this particular example demonstrates use of `tf.data.Dataset` to read the input data for inferencing,
13+
# but it could also be adapted to just use an RDD of TFRecords from Spark.
14+
15+
from __future__ import absolute_import
16+
from __future__ import division
17+
from __future__ import print_function
18+
19+
import argparse
20+
import logging
21+
import sys
22+
import tensorflow as tf
23+
import time
24+
import traceback
25+
26+
IMAGE_PIXELS = 28
27+
28+
def inference(it, num_workers, args):
29+
from tensorflowonspark import util
30+
31+
# consume worker number from RDD partition iterator
32+
for i in it:
33+
worker_num = i
34+
print("worker_num: {}".format(i))
35+
36+
# setup env for single-node TF
37+
util.single_node_env()
38+
39+
# load saved_model using default tag and signature
40+
sess = tf.Session()
41+
tf.saved_model.loader.load(sess, ['serve'], args.export)
42+
43+
# parse function for TFRecords
44+
def parse_tfr(example_proto):
45+
feature_def = {"label": tf.FixedLenFeature(10, tf.int64),
46+
"image": tf.FixedLenFeature(IMAGE_PIXELS * IMAGE_PIXELS, tf.int64)}
47+
features = tf.parse_single_example(example_proto, feature_def)
48+
norm = tf.constant(255, dtype=tf.float32, shape=(784,))
49+
image = tf.div(tf.to_float(features['image']), norm)
50+
label = tf.to_float(features['label'])
51+
return (image, label)
52+
53+
# define a new tf.data.Dataset (for inferencing)
54+
ds = tf.data.Dataset.list_files("{}/part-*".format(args.images_labels))
55+
ds = ds.shard(num_workers, worker_num)
56+
ds = ds.interleave(tf.data.TFRecordDataset, cycle_length=1)
57+
ds = ds.map(parse_tfr).batch(10)
58+
iterator = ds.make_one_shot_iterator()
59+
image_label = iterator.get_next(name='inf_image')
60+
61+
# create an output file per spark worker for the predictions
62+
tf.gfile.MakeDirs(args.output)
63+
output_file = tf.gfile.GFile("{}/part-{:05d}".format(args.output, worker_num), mode='w')
64+
65+
while True:
66+
try:
67+
# get images and labels from tf.data.Dataset
68+
img, lbl = sess.run(['inf_image:0', 'inf_image:1'])
69+
70+
# inference by feeding these images and labels into the input tensors
71+
# you can view the exported model signatures via:
72+
# saved_model_cli show --dir mnist_export --all
73+
74+
# note that we feed directly into the graph tensors (bypassing the exported signatures)
75+
# also note that we can feed/fetch tensors that were not explicitly exported, e.g. `y_` and `label:0`
76+
77+
labels, preds = sess.run(['label:0', 'prediction:0'], feed_dict={'x:0': img, 'y_:0': lbl})
78+
for i in range(len(labels)):
79+
output_file.write("{} {}\n".format(labels[i], preds[i]))
80+
except tf.errors.OutOfRangeError:
81+
break
82+
83+
output_file.close()
84+
85+
if __name__ == '__main__':
86+
import os
87+
from pyspark.context import SparkContext
88+
from pyspark.conf import SparkConf
89+
90+
sc = SparkContext(conf=SparkConf().setAppName("mnist_inference"))
91+
executors = sc._conf.get("spark.executor.instances")
92+
num_executors = int(executors) if executors is not None else 1
93+
94+
parser = argparse.ArgumentParser()
95+
parser.add_argument("--cluster_size", help="number of nodes in the cluster (for S with labelspark Standalone)", type=int, default=num_executors)
96+
parser.add_argument('--images_labels', type=str, help='Directory for input images with labels')
97+
parser.add_argument("--export", help="HDFS path to export model", type=str, default="mnist_export")
98+
parser.add_argument("--output", help="HDFS path to save predictions", type=str, default="predictions")
99+
args, _ = parser.parse_known_args()
100+
print("args: {}".format(args))
101+
102+
# Not using TFCluster... just running single-node TF instances on each executor
103+
nodes = list(range(args.cluster_size))
104+
nodeRDD = sc.parallelize(list(range(args.cluster_size)), args.cluster_size)
105+
nodeRDD.foreachPartition(lambda worker_num: inference(worker_num, args.cluster_size, args))
106+

examples/mnist/tf/mnist_spark.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
parser.add_argument("--driver_ps_nodes", help="""run tensorflow PS node on driver locally.
2727
You will need to set cluster_size = num_executors + num_ps""", default=False)
2828
parser.add_argument("--epochs", help="number of epochs", type=int, default=1)
29+
parser.add_argument("--export", help="HDFS path to export model", type=str, default="mnist_export")
2930
parser.add_argument("--format", help="example format: (csv2|tfr)", choices=["csv2", "tfr"], default="tfr")
3031
parser.add_argument("--images_labels", help="HDFS path to MNIST image_label files in parallelized format")
3132
parser.add_argument("--mode", help="train|inference", default="train")

tensorflowonspark/pipeline.py

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import tensorflow as tf
2626
from tensorflow.contrib.saved_model.python.saved_model import reader
2727
from tensorflow.python.saved_model import loader
28-
from . import TFCluster, gpu_info, dfutil
28+
from . import TFCluster, gpu_info, dfutil, util
2929

3030
import argparse
3131
import copy
@@ -570,32 +570,15 @@ def single_node_env(args):
570570
Args:
571571
:args: command line arguments as either argparse args or argv list
572572
"""
573+
# setup ARGV for the TF process
573574
if isinstance(args, list):
574575
sys.argv = args
575576
elif args.argv:
576577
sys.argv = args.argv
577578

578-
# ensure expanded CLASSPATH w/o glob characters (required for Spark 2.1 + JNI)
579-
if 'HADOOP_PREFIX' in os.environ and 'TFOS_CLASSPATH_UPDATED' not in os.environ:
580-
classpath = os.environ['CLASSPATH']
581-
hadoop_path = os.path.join(os.environ['HADOOP_PREFIX'], 'bin', 'hadoop')
582-
hadoop_classpath = subprocess.check_output([hadoop_path, 'classpath', '--glob']).decode()
583-
logging.debug("CLASSPATH: {0}".format(hadoop_classpath))
584-
os.environ['CLASSPATH'] = classpath + os.pathsep + hadoop_classpath
585-
os.environ['TFOS_CLASSPATH_UPDATED'] = '1'
586-
587-
# reserve GPU, if requested
588-
if tf.test.is_built_with_cuda():
589-
# GPU
590-
num_gpus = args.num_gpus if 'num_gpus' in args else 1
591-
gpus_to_use = gpu_info.get_gpus(num_gpus)
592-
logging.info("Using gpu(s): {0}".format(gpus_to_use))
593-
os.environ['CUDA_VISIBLE_DEVICES'] = gpus_to_use
594-
# Note: if there is a GPU conflict (CUDA_ERROR_INVALID_DEVICE), the entire task will fail and retry.
595-
else:
596-
# CPU
597-
logging.info("Using CPU")
598-
os.environ['CUDA_VISIBLE_DEVICES'] = ''
579+
# setup ENV for Hadoop-compatibility and/or GPU allocation
580+
num_gpus = args.num_gpus if 'num_gpus' in args else 1
581+
util.single_node_env(num_gpus)
599582

600583

601584
def get_meta_graph_def(saved_model_dir, tag_set):

0 commit comments

Comments
 (0)