Skip to content

Commit a0f956d

Browse files
committed
add odps demo
1 parent a3b328e commit a0f956d

File tree

3 files changed

+181
-0
lines changed

3 files changed

+181
-0
lines changed

tools/dataset/README.MD

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,5 +285,13 @@ export KAFKA_GID=xxx
285285
export KAFKA_TOPICS=wide-and-deep-data
286286

287287
python kafka_reader.py
288+
289+
# test odps reader
290+
#
291+
# create config.py
292+
#
293+
294+
python odps_reader.py
295+
288296
```
289297

tools/dataset/odps_reader.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from odps import ODPS
16+
from odps.models import Schema, Column, Partition
17+
18+
import paddle
19+
import paddle.distributed.fleet as fleet
20+
import os
21+
import sys
22+
23+
cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
24+
cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
25+
cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
26+
hash_dim_ = 1000001
27+
continuous_range_ = range(1, 14)
28+
categorical_range_ = range(14, 40)
29+
30+
31+
class WideDeepDatasetReader(fleet.MultiSlotDataGenerator):
32+
def line_process(self, record):
33+
label = [record.label]
34+
dense_feature = []
35+
sparse_feature = []
36+
for idx in continuous_range_:
37+
dense_feature.append((float(record[idx]) - cont_min_[idx - 1]) /
38+
cont_diff_[idx - 1])
39+
for idx in categorical_range_:
40+
sparse_feature.append([hash(str(idx) + record[idx]) % hash_dim_])
41+
return [dense_feature] + sparse_feature + [label]
42+
43+
def generate_sample(self, line):
44+
def wd_reader():
45+
for record in table_reader:
46+
input_data = self.line_process(record)
47+
feature_name = ["dense_input"]
48+
for idx in categorical_range_:
49+
feature_name.append("C" + str(idx - 13))
50+
feature_name.append("label")
51+
yield zip(feature_name, input_data)
52+
53+
return wd_reader
54+
55+
56+
if __name__ == "__main__":
57+
my_data_generator = WideDeepDatasetReader()
58+
#my_data_generator.set_batch(16)
59+
60+
from config import *
61+
# config should include flowing configuration
62+
# access_id
63+
# secret_key
64+
# project
65+
# endpoint
66+
67+
o = ODPS(access_id, secret_key, project, endpoint=endpoint)
68+
69+
table_name = 'wide_and_deep'
70+
71+
table = o.get_table(table_name) #.to_df()
72+
73+
table_reader = table.open_reader()
74+
75+
my_data_generator.run_from_memory()

tools/dataset/utils/odps_writer.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from odps import ODPS
16+
from odps.models import Schema, Column, Partition
17+
18+
from config import *
19+
# config should include flowing configuration
20+
# access_id
21+
# secret_key
22+
# project
23+
# endpoint
24+
'''
25+
For more information, ref to
26+
https://pyodps.readthedocs.io/
27+
'''
28+
29+
o = ODPS(access_id, secret_key, project, endpoint=endpoint)
30+
31+
label_col = [Column(name='label', type='bigint')]
32+
dense_col = [
33+
Column(
34+
name='dense' + str(i), type='double') for i in range(1, 14)
35+
]
36+
sparse_col = [Column(name='C' + str(i), type='string') for i in range(14, 40)]
37+
38+
columns = label_col + dense_col + sparse_col
39+
40+
schema = Schema(
41+
columns=columns) # schema = Schema(columns=columns, partitions=partitions)
42+
43+
table_name = 'wide_and_deep'
44+
45+
print(schema.columns)
46+
47+
48+
def create_table():
49+
table = o.create_table(table_name, schema, if_not_exists=True)
50+
51+
52+
#create_table()
53+
54+
table = o.get_table(table_name) #.to_df()
55+
print(table.to_df())
56+
57+
58+
def write_data():
59+
records = []
60+
61+
# prepare data
62+
input_file = './part-0'
63+
with open(input_file, 'r') as f:
64+
for line in f:
65+
example = []
66+
67+
features = line.rstrip('\n').split('\t')
68+
label = int(features[0])
69+
example.append(label)
70+
71+
for idx in range(1, 14):
72+
if features[idx] == "":
73+
example.append(0.0)
74+
else:
75+
example.append(float(features[idx]))
76+
for idx in range(14, 40):
77+
example.append(features[idx].encode('utf8'))
78+
79+
records.append(example)
80+
81+
with table.open_writer() as writer:
82+
writer.write(records)
83+
84+
85+
#write_data()
86+
87+
88+
def read_data():
89+
with table.open_reader() as reader:
90+
print("data count in table:", reader.count)
91+
for r in reader:
92+
print(r.label)
93+
#print([i for i in r[1:14]])
94+
print([i for i in r])
95+
break
96+
97+
98+
read_data()

0 commit comments

Comments
 (0)