Skip to content

Commit 92a5eae

Browse files
authored
Merge pull request #258 from yahoo/leewyang_mnist_estimator
mnist estimator example
2 parents 3962e5b + f197ecc commit 92a5eae

File tree

2 files changed

+228
-0
lines changed

2 files changed

+228
-0
lines changed

examples/mnist/estimator/README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# MNIST using tf.estimator with tf.layers
2+
3+
Original Source: https://github.com/tensorflow/tensorflow/blob/r1.6/tensorflow/examples/tutorials/layers/cnn_mnist.py
4+
5+
This is the `tf.estimator` version of MNIST from TensorFlow's [tutorial on layers and estimators](https://www.tensorflow.org/versions/master/tutorials/layers), adapted for TensorFlowOnSpark.
6+
7+
Notes:
8+
- This example assumes that Spark, TensorFlow, and TensorFlowOnSpark are already installed.
9+
- To minimize code changes, this example uses InputMode.TENSORFLOW.
10+
11+
#### Launch the Spark Standalone cluster
12+
13+
export MASTER=spark://$(hostname):7077
14+
export SPARK_WORKER_INSTANCES=3
15+
export CORES_PER_WORKER=1
16+
export TOTAL_CORES=$((${CORES_PER_WORKER}*${SPARK_WORKER_INSTANCES}))
17+
export TFoS_HOME=<path to TensorFlowOnSpark>
18+
19+
${SPARK_HOME}/sbin/start-master.sh; ${SPARK_HOME}/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G ${MASTER}
20+
21+
#### Run MNIST using InputMode.TENSORFLOW
22+
23+
In this mode, each worker will load the entire MNIST dataset into memory (automatically downloading the dataset if needed).
24+
25+
# remove any old artifacts
26+
rm -rf ${TFoS_HOME}/mnist_model
27+
28+
# train and validate
29+
${SPARK_HOME}/bin/spark-submit \
30+
--master ${MASTER} \
31+
--conf spark.cores.max=${TOTAL_CORES} \
32+
--conf spark.task.cpus=${CORES_PER_WORKER} \
33+
--conf spark.task.maxFailures=1 \
34+
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
35+
${TFoS_HOME}/examples/mnist/estimator/mnist_estimator.py \
36+
--cluster_size ${SPARK_WORKER_INSTANCES} \
37+
--model ${TFoS_HOME}/mnist_model
38+
39+
#### Shutdown the Spark Standalone cluster
40+
41+
${SPARK_HOME}/sbin/stop-slave.sh; ${SPARK_HOME}/sbin/stop-master.sh
42+
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
"""Convolutional Neural Network Estimator for MNIST, built with tf.layers."""
15+
16+
from __future__ import absolute_import
17+
from __future__ import division
18+
from __future__ import print_function
19+
20+
import numpy as np
21+
import tensorflow as tf
22+
23+
tf.logging.set_verbosity(tf.logging.INFO)
24+
25+
26+
def cnn_model_fn(features, labels, mode):
27+
"""Model function for CNN."""
28+
# Input Layer
29+
# Reshape X to 4-D tensor: [batch_size, width, height, channels]
30+
# MNIST images are 28x28 pixels, and have one color channel
31+
input_layer = tf.reshape(features["x"], [-1, 28, 28, 1])
32+
33+
# Convolutional Layer #1
34+
# Computes 32 features using a 5x5 filter with ReLU activation.
35+
# Padding is added to preserve width and height.
36+
# Input Tensor Shape: [batch_size, 28, 28, 1]
37+
# Output Tensor Shape: [batch_size, 28, 28, 32]
38+
conv1 = tf.layers.conv2d(
39+
inputs=input_layer,
40+
filters=32,
41+
kernel_size=[5, 5],
42+
padding="same",
43+
activation=tf.nn.relu)
44+
45+
# Pooling Layer #1
46+
# First max pooling layer with a 2x2 filter and stride of 2
47+
# Input Tensor Shape: [batch_size, 28, 28, 32]
48+
# Output Tensor Shape: [batch_size, 14, 14, 32]
49+
pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)
50+
51+
# Convolutional Layer #2
52+
# Computes 64 features using a 5x5 filter.
53+
# Padding is added to preserve width and height.
54+
# Input Tensor Shape: [batch_size, 14, 14, 32]
55+
# Output Tensor Shape: [batch_size, 14, 14, 64]
56+
conv2 = tf.layers.conv2d(
57+
inputs=pool1,
58+
filters=64,
59+
kernel_size=[5, 5],
60+
padding="same",
61+
activation=tf.nn.relu)
62+
63+
# Pooling Layer #2
64+
# Second max pooling layer with a 2x2 filter and stride of 2
65+
# Input Tensor Shape: [batch_size, 14, 14, 64]
66+
# Output Tensor Shape: [batch_size, 7, 7, 64]
67+
pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)
68+
69+
# Flatten tensor into a batch of vectors
70+
# Input Tensor Shape: [batch_size, 7, 7, 64]
71+
# Output Tensor Shape: [batch_size, 7 * 7 * 64]
72+
pool2_flat = tf.reshape(pool2, [-1, 7 * 7 * 64])
73+
74+
# Dense Layer
75+
# Densely connected layer with 1024 neurons
76+
# Input Tensor Shape: [batch_size, 7 * 7 * 64]
77+
# Output Tensor Shape: [batch_size, 1024]
78+
dense = tf.layers.dense(inputs=pool2_flat, units=1024, activation=tf.nn.relu)
79+
80+
# Add dropout operation; 0.6 probability that element will be kept
81+
dropout = tf.layers.dropout(
82+
inputs=dense, rate=0.4, training=mode == tf.estimator.ModeKeys.TRAIN)
83+
84+
# Logits layer
85+
# Input Tensor Shape: [batch_size, 1024]
86+
# Output Tensor Shape: [batch_size, 10]
87+
logits = tf.layers.dense(inputs=dropout, units=10)
88+
89+
predictions = {
90+
# Generate predictions (for PREDICT and EVAL mode)
91+
"classes": tf.argmax(input=logits, axis=1),
92+
# Add `softmax_tensor` to the graph. It is used for PREDICT and by the
93+
# `logging_hook`.
94+
"probabilities": tf.nn.softmax(logits, name="softmax_tensor")
95+
}
96+
if mode == tf.estimator.ModeKeys.PREDICT:
97+
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
98+
99+
# Calculate Loss (for both TRAIN and EVAL modes)
100+
loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
101+
102+
# Configure the Training Op (for TRAIN mode)
103+
if mode == tf.estimator.ModeKeys.TRAIN:
104+
optimizer = tf.train.GradientDescentOptimizer(learning_rate=0.001)
105+
train_op = optimizer.minimize(
106+
loss=loss,
107+
global_step=tf.train.get_global_step())
108+
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
109+
110+
# Add evaluation metrics (for EVAL mode)
111+
eval_metric_ops = {
112+
"accuracy": tf.metrics.accuracy(
113+
labels=labels, predictions=predictions["classes"])}
114+
return tf.estimator.EstimatorSpec(
115+
mode=mode, loss=loss, eval_metric_ops=eval_metric_ops)
116+
117+
118+
def main(args, ctx):
119+
# Load training and eval data
120+
mnist = tf.contrib.learn.datasets.load_dataset("mnist")
121+
train_data = mnist.train.images # Returns np.array
122+
train_labels = np.asarray(mnist.train.labels, dtype=np.int32)
123+
eval_data = mnist.test.images # Returns np.array
124+
eval_labels = np.asarray(mnist.test.labels, dtype=np.int32)
125+
126+
# Create the Estimator
127+
mnist_classifier = tf.estimator.Estimator(
128+
model_fn=cnn_model_fn, model_dir=args.model)
129+
130+
# Set up logging for predictions
131+
# Log the values in the "Softmax" tensor with label "probabilities"
132+
tensors_to_log = {"probabilities": "softmax_tensor"}
133+
logging_hook = tf.train.LoggingTensorHook(
134+
tensors=tensors_to_log, every_n_iter=50)
135+
136+
# Train the model
137+
train_input_fn = tf.estimator.inputs.numpy_input_fn(
138+
x={"x": train_data},
139+
y=train_labels,
140+
batch_size=args.batch_size,
141+
num_epochs=None,
142+
shuffle=True)
143+
# mnist_classifier.train(
144+
# input_fn=train_input_fn,
145+
# steps=1000,
146+
# hooks=[logging_hook])
147+
148+
# Evaluate the model and print results
149+
eval_input_fn = tf.estimator.inputs.numpy_input_fn(
150+
x={"x": eval_data},
151+
y=eval_labels,
152+
num_epochs=1,
153+
shuffle=False)
154+
# eval_results = mnist_classifier.evaluate(input_fn=eval_input_fn)
155+
# print(eval_results)
156+
157+
# Using tf.estimator.train_and_evaluate
158+
train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=args.steps, hooks=[logging_hook])
159+
eval_spec = tf.estimator.EvalSpec(input_fn=eval_input_fn)
160+
tf.estimator.train_and_evaluate(mnist_classifier, train_spec, eval_spec)
161+
162+
163+
if __name__ == "__main__":
164+
# tf.app.run()
165+
166+
from pyspark.context import SparkContext
167+
from pyspark.conf import SparkConf
168+
from tensorflowonspark import TFCluster
169+
import argparse
170+
171+
sc = SparkContext(conf=SparkConf().setAppName("mnist_spark"))
172+
executors = sc._conf.get("spark.executor.instances")
173+
num_executors = int(executors) if executors is not None else 1
174+
175+
parser = argparse.ArgumentParser()
176+
parser.add_argument("--batch_size", help="number of records per batch", type=int, default=100)
177+
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
178+
parser.add_argument("--model", help="HDFS path to save/load model during train/inference", default="mnist_model")
179+
parser.add_argument("--output", help="HDFS path to save test/inference output", default="predictions")
180+
parser.add_argument("--num_ps", help="number of PS nodes in cluster", type=int, default=1)
181+
parser.add_argument("--steps", help="maximum number of steps", type=int, default=1000)
182+
args = parser.parse_args()
183+
print("args:", args)
184+
185+
cluster = TFCluster.run(sc, main, args, args.cluster_size, args.num_ps, tensorboard=False, input_mode=TFCluster.InputMode.TENSORFLOW, log_dir=args.model, master_node='master')
186+
cluster.shutdown()

0 commit comments

Comments
 (0)