@@ -171,82 +171,6 @@ def rdd_generator():
171171 expected = np .sum (self .weights )
172172 self .assertAlmostEqual (pred , expected , 2 )
173173
174- # def test_spark_sparse_tensor(self):
175- # """InputMode.SPARK feeding sparse tensors"""
176- # def sparse_train(args, ctx):
177- # import tensorflow as tf
178- #
179- # # reset graph in case we're re-using a Spark python worker (during tests)
180- # tf.compat.v1.reset_default_graph()
181- #
182- # cluster, server = ctx.start_cluster_server(ctx)
183- # if ctx.job_name == "ps":
184- # server.join()
185- # elif ctx.job_name == "worker":
186- # with tf.device(tf.compat.v1.train.replica_device_setter(
187- # worker_device="/job:worker/task:%d" % ctx.task_index,
188- # cluster=cluster)):
189- # y_ = tf.compat.v1.placeholder(tf.float32, name='y_label')
190- # label = tf.identity(y_, name='label')
191- #
192- # row_indices = tf.compat.v1.placeholder(tf.int64, name='x_row_indices')
193- # col_indices = tf.compat.v1.placeholder(tf.int64, name='x_col_indices')
194- # values = tf.compat.v1.placeholder(tf.float32, name='x_values')
195- # indices = tf.stack([row_indices[0], col_indices[0]], axis=1)
196- # data = values[0]
197- #
198- # x = tf.SparseTensor(indices=indices, values=data, dense_shape=[args.batch_size, 10])
199- # w = tf.Variable(tf.random.truncated_normal([10, 1]), name='w')
200- # y = tf.sparse.sparse_dense_matmul(x, w, name='y')
201- #
202- # global_step = tf.compat.v1.train.get_or_create_global_step()
203- # cost = tf.reduce_mean(input_tensor=tf.square(y_ - y), name='cost')
204- # optimizer = tf.compat.v1.train.GradientDescentOptimizer(0.1).minimize(cost, global_step)
205- #
206- # with tf.compat.v1.train.MonitoredTrainingSession(master=server.target,
207- # is_chief=(ctx.task_index == 0),
208- # checkpoint_dir=args.model_dir,
209- # save_checkpoint_steps=20) as sess:
210- # tf_feed = ctx.get_data_feed(input_mapping=args.input_mapping)
211- # while not sess.should_stop() and not tf_feed.should_stop():
212- # batch = tf_feed.next_batch(args.batch_size)
213- # if len(batch) > 0:
214- # print("batch: {}".format(batch))
215- # feed = {y_: batch['y_label'],
216- # row_indices: batch['x_row_indices'],
217- # col_indices: batch['x_col_indices'],
218- # values: batch['x_values']}
219- # _, pred, trained_weights = sess.run([optimizer, y, w], feed_dict=feed)
220- # print("trained_weights: {}".format(trained_weights))
221- # sess.close()
222- #
223- # # wait for MonitoredTrainingSession to save last checkpoint
224- # time.sleep(10)
225- #
226- # args = {}
227- # estimator = TFEstimator(sparse_train, args) \
228- # .setInputMapping({'labels': 'y_label', 'row_indices': 'x_row_indices', 'col_indices': 'x_col_indices', 'values': 'x_values'}) \
229- # .setInputMode(TFCluster.InputMode.SPARK) \
230- # .setModelDir(self.model_dir) \
231- # .setClusterSize(self.num_workers) \
232- # .setNumPS(1) \
233- # .setBatchSize(1)
234- #
235- # model_weights = np.array([[1.0, 1.0, 1.0, 1.0, 1.0, -1.0, -1.0, -1.0, -1.0, -1.0]]).T
236- # examples = [scipy.sparse.random(1, 10, density=0.5,) for i in range(200)]
237- # rdd = self.sc.parallelize(examples).map(lambda e: ((e * model_weights).tolist()[0][0], e.row.tolist(), e.col.tolist(), e.data.tolist()))
238- # df = rdd.toDF(["labels", "row_indices", "col_indices", "values"])
239- # df.show(5)
240- # model = estimator.fit(df)
241- #
242- # model.setOutputMapping({'label': 'label', 'y/SparseTensorDenseMatMul': 'predictions'})
243- # test_examples = [scipy.sparse.random(1, 10, density=0.5,) for i in range(50)]
244- # test_rdd = self.sc.parallelize(test_examples).map(lambda e: ((e * model_weights).tolist()[0][0], e.row.tolist(), e.col.tolist(), e.data.tolist()))
245- # test_df = test_rdd.toDF(["labels", "row_indices", "col_indices", "values"])
246- # test_df.show(5)
247- # preds = model.transform(test_df)
248- # preds.show(5)
249-
250174
251175if __name__ == '__main__' :
252176 unittest .main ()
0 commit comments