-
Notifications
You must be signed in to change notification settings - Fork 299
Expand file tree
/
Copy pathtest_node.py
More file actions
174 lines (152 loc) · 8.23 KB
/
test_node.py
File metadata and controls
174 lines (152 loc) · 8.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import unittest
import numpy as np
import tensorflow as tf
from onnx_tf.frontend import tensorflow_graph_to_onnx_model
from onnx import checker
# for testing
from onnx_tf.backend import prepare
from onnx_tf.common.legacy import legacy_opset_pre_ver
def get_node_by_name(nodes, name):
for node in nodes:
if node.name == name:
return node
def get_rnd(shape, low=-1.0, high=1.0, dtype=np.float32):
if dtype == np.float32:
return (np.random.uniform(low, high,
np.prod(shape)).reshape(shape).astype(np.float32))
elif dtype == np.int32:
return (np.random.uniform(low, high,
np.prod(shape)).reshape(shape).astype(np.int32))
elif dtype == np.bool_:
return np.random.choice(a=[False, True], size=shape)
class TestNode(unittest.TestCase):
""" Tests for nodes.
Tests are dynamically added.
Therefore edit test_cases to add more tests.
"""
pass
def create_test(test_data):
test_option = test_data[5] if len(test_data) > 5 else {}
def do_test_expected(self):
tf.reset_default_graph()
tf_op = test_data[1]
output_name = test_data[2]
inputs = test_data[3]
attrs = test_data[4]
# Now construct input feed dict
# keyed by input name
onnx_feed_dict = {}
# keyed by placeholder op
tf_feed_dict = {}
tf_param_list = []
for idx, input_tensor in enumerate(inputs):
if type(input_tensor) is np.ndarray:
placeholder = tf.placeholder(
input_tensor.dtype, shape=input_tensor.shape, name="in_" + str(idx))
onnx_feed_dict["in_" + str(idx)] = input_tensor
tf_feed_dict[placeholder] = input_tensor
tf_param_list.append(placeholder)
else:
tf_param_list.append(input_tensor)
test_op = tf_op(*tf_param_list, **attrs)
tf_graph = tf.get_default_graph().as_graph_def(add_shapes=True)
# Construct onnx graph, run with backend.
onnx_model = tensorflow_graph_to_onnx_model(
tf_graph,
output_name,
ignore_unimplemented=test_option.get("ignore_unimplemented", False))
if not test_option.get("ignore_unimplemented", False):
checker.check_model(onnx_model)
backend_rep = prepare(onnx_model)
backend_output = []
backend_rep_outputs = backend_rep.run(onnx_feed_dict)
for output in backend_rep.outputs:
backend_output.append(backend_rep_outputs[output])
backend_output = np.asarray(backend_output)
backend_output = np.squeeze(
backend_output, 0) if backend_output.shape[0] == 1 else backend_output
with tf.Session() as sess:
tf_output = sess.run(test_op, tf_feed_dict)
# skip comparison if test_option specifies that
# the test is call only.
if test_option.get("call_only", False):
return
for backend_o, tf_o in zip(backend_output, tf_output):
np.testing.assert_allclose(backend_o, tf_o, rtol=1e-3, atol=1e-7)
return do_test_expected
# yapf: disable
# organized as a tuple of the format:
# (test_name, tensorflow_op, output_node_name, LIST of inputs, MAP of attributes)
# Note that a python array is used differently than a numpy array
# in the sense that numpy array are passed in via tf.placeholder
# whereas python arrays are passed in as constants.
test_cases = [
("test_abs", tf.abs, "Abs", [get_rnd([1, 2, 3, 4])], {}),
("test_arg_max", tf.argmax, "ArgMax", [get_rnd([1, 2, 3, 4])], {"axis": -1}),
("test_arg_min", tf.argmin, "ArgMin", [get_rnd([1, 2, 3, 4])], {"axis": -1}),
("test_cast", tf.cast, "Cast", [get_rnd([10, 10]), tf.float16], {}),
("test_ceil", tf.ceil, "Ceil", [get_rnd([10, 10], -10, 10)], {}),
("test_constant_fill", tf.fill, "Fill", [[1, 2, 3], 1], {}),
("test_exp", tf.exp, "Exp", [get_rnd([10, 10])], {}),
("test_expand_dims", tf.expand_dims, "ExpandDims", [get_rnd([1, 2, 3, 4])], {"axis": 1}),
("test_floor", tf.floor, "Floor", [get_rnd([10, 10], -10, 10)], {}),
("test_identity", tf.identity, "Identity", [get_rnd([10, 10])], {}),
("test_log", tf.log, "Log", [get_rnd([10, 10])], {}),
("test_log_softmax", tf.nn.log_softmax, "LogSoftmax", [get_rnd([10, 10])], {}),
("test_or", tf.logical_or, "LogicalOr", [get_rnd([10, 10], dtype=np.bool_), get_rnd([10, 10], dtype=np.bool_)], {}),
("test_pow", tf.pow, "Pow", [get_rnd([10, 10]), get_rnd([10, 10])], {}),
("test_pack", tf.stack, "stack_1", [[get_rnd([3, 4]), get_rnd([3, 4])]], {}),
("test_pad", tf.pad, "Pad", [get_rnd([2, 3]), [[1, 1,], [2, 2]]], {"mode": "constant"}),
("test_random_normal", tf.random_normal, "random_normal/RandomStandardNormal", [], {"shape": [100, 100], "mean": 0.0, "stddev": 1.0, "dtype": tf.float32, "seed": 42}, {"call_only": True}),
("test_random_uniform", tf.random_uniform, "random_uniform", [], {"shape": [100, 100], "minval": 0.0, "maxval": 1.0, "dtype": tf.float32, "seed": 42}, {"call_only": True}),
("test_reciprocal", tf.reciprocal, "Reciprocal", [get_rnd([10, 10])], {}),
("test_reduce_max", tf.reduce_max, "Max", [get_rnd([10, 10])], {"keep_dims": True}),
("test_reduce_mean", tf.reduce_mean, "Mean", [get_rnd([10, 10])], {"keep_dims": True}),
("test_reduce_min", tf.reduce_min, "Min", [get_rnd([10, 10])], {"keep_dims": True}),
("test_reduce_prod", tf.reduce_prod, "Prod", [get_rnd([10, 10])], {"keep_dims": True}),
("test_reduce_sum", tf.reduce_sum, "Sum", [get_rnd([10, 10])], {"keep_dims": True}),
("test_reduce_sum_scalar_axes", tf.reduce_sum, "Sum", [get_rnd([10, 10]), 0], {"keep_dims": True}),
("test_relu", tf.nn.relu, "Relu", [get_rnd([10, 10])], {}),
("test_relu6", tf.nn.relu6, "Relu6", [get_rnd([10, 10])], {}),
("test_reshape", tf.reshape, "Reshape", [get_rnd([10, 10]), [4, 25]], {}),
("test_selu", tf.nn.selu, "Selu", [get_rnd([10, 10])], {}),
("test_shape", tf.shape, "Shape", [get_rnd([1, 2, 3, 4])], {}),
("test_sigmoid", tf.sigmoid, "Sigmoid", [get_rnd([10, 10])], {}),
("test_slice", tf.slice, "Slice", [get_rnd([5, 6, 7])], {"begin": [1, 0, 0], "size": [1, 1, 3]}),
("test_softmax", tf.nn.softmax, "Softmax", [get_rnd([10, 10])], {}),
("test_softplus", tf.nn.softplus, "Softplus", [get_rnd([10, 10])], {}),
("test_softsign", tf.nn.softsign, "Softsign", [get_rnd([10, 10])], {}),
("test_space_to_depth", tf.space_to_depth, "SpaceToDepth", [get_rnd([2, 8, 8, 5])], {"block_size": 2}),
("test_split_v", tf.split, "split", [get_rnd([10, 10]), [2, 3, 5]], {}),
("test_split", tf.split, "split", [get_rnd([10, 10]), 2], {}),
("test_sqrt", tf.sqrt, "Sqrt", [get_rnd([10, 10])], {}),
("test_squeeze", tf.squeeze, "Squeeze", [get_rnd([1, 1, 10, 10])], {"axis":[0, 1]}),
("test_subtract", tf.subtract, "Sub", [get_rnd([10, 10]), get_rnd([10, 10])], {}),
("test_tanh", tf.tanh, "Tanh", [get_rnd([10, 10])], {}),
("test_top_k", tf.nn.top_k, "TopKV2", [get_rnd([10, 10, 10, 10])], {"k": 3}),
# Use reverse to test ignore_unimplemented
("test_unimplemented", tf.reverse, "ReverseV2", [get_rnd([1, 2, 3, 4]), [3]], {}, {"ignore_unimplemented": True}),
("test_unpack", tf.unstack, "unstack", [get_rnd([2, 3, 4])], {}),
("test_xor", tf.logical_xor, "LogicalXor", [get_rnd([10, 10], dtype=np.bool_), get_rnd([10, 10], dtype=np.bool_)], {}),
("test_transpose", tf.transpose, "transpose", [get_rnd([2, 10])], {"perm":[1, 0]}),
("test_concat", tf.concat, "concat", [[get_rnd([1, 10]),get_rnd([10, 10]),get_rnd([20, 10])], 0], {}),
("test_bias_add_nchw", tf.nn.bias_add, "BiasAdd", [get_rnd([10, 32, 10, 10]),get_rnd([32])], {"data_format":"NCHW"}),
("test_bias_add_nhwc", tf.nn.bias_add, "BiasAdd", [get_rnd([10, 10, 10, 32]),get_rnd([32])], {"data_format":"NHWC"}),
("test_fill", tf.fill, "Fill", [[3, 10], 5], {}),
]
if not legacy_opset_pre_ver(6):
test_cases.append(("test_tile", tf.tile, "Tile", [get_rnd([1, 2, 3, 4]), np.random.randint(1, 10, (4,), dtype=np.int32)], {}))
if not legacy_opset_pre_ver(9):
test_cases.append(("test_strided_slice", tf.strided_slice, "StridedSlice", [get_rnd([5, 5]), [0, 0], [1, 5], [1, 1]], {}))
test_cases.append(("test_strided_slice_shrink", tf.strided_slice, "StridedSlice", [get_rnd([5, 5]), [0, 0], [1, 3], [1, 1]], {"shrink_axis_mask":1}))
# yapf: enable
for k, val in enumerate(test_cases):
test_method = create_test(val)
test_method.__name__ = str(val[0])
setattr(TestNode, test_method.__name__, test_method)
if __name__ == '__main__':
unittest.main()