Skip to content

Commit fbaca7b

Browse files
refactor: rewrite talkingdata training part (#2008)
* refactor: rewrite talkingdata training part * stay hard copy
1 parent c57b14b commit fbaca7b

File tree

4 files changed

+10083
-10130
lines changed

4 files changed

+10083
-10130
lines changed

demo/talkingdata-adtracking-fraud-detection/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ Download OpenMLDB server pkg, version >= 0.5.0 .
2525
Install all dependencies:
2626

2727
```
28-
pip install pandas xgboost==1.4.2 tornado "openmldb>=0.5.0"
28+
pip install pandas xgboost==1.4.2 sklearn tornado "openmldb>=0.5.0" requests
2929
```
3030

3131
### Data Prepare

demo/talkingdata-adtracking-fraud-detection/predict_server.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,29 @@
2121
import requests
2222
import tornado.ioloop
2323
import tornado.web
24-
import xgboost as xgb
24+
from xgboost.sklearn import XGBClassifier
2525
import logging
2626

2727
logging.basicConfig(encoding="utf-8", level=logging.INFO, format="%(asctime)s-%(name)s-%(levelname)s-%(message)s")
2828

2929
arg_keys = ["endpoint", "database", "deployment", "model_path"]
30-
bst = xgb.Booster()
30+
bst = XGBClassifier()
3131
# schema column type, ref hybridse::sdk::DataTypeName
3232
table_schema = []
3333
url = ""
3434

3535

3636
def build_feature(res):
3737
"""
38-
The last value in list, label `is_attributed` is dummy.
38+
The first value in list is the label column, it's dummy.
3939
Real-time feature has it, cuz the history data in OpenMLDB is the training data too.
4040
It'll have this column, but no effect to feature extraction.
4141
4242
:param res: an OpenMLDB reqeust response
4343
:return: real feature
4444
"""
45-
# col `is_attributed` is dummy, col `ip` won't train, so start from 2
46-
return xgb.DMatrix(np.array([res[2:]]))
45+
# col label is dummy, so start from 1
46+
return np.array([res[1:]])
4747

4848

4949
class SchemaHandler(tornado.web.RequestHandler):

demo/talkingdata-adtracking-fraud-detection/train_and_serve.py

Lines changed: 77 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -9,100 +9,67 @@
99
import sqlalchemy as db
1010
import pandas as pd
1111
import xgboost as xgb
12+
from xgboost.sklearn import XGBClassifier
13+
from sklearn.model_selection import train_test_split
14+
from sklearn.metrics import classification_report
15+
from sklearn.metrics import accuracy_score
1216
import requests
17+
1318
# fmt:on
1419

1520
# openmldb cluster configs
16-
zk = '127.0.0.1:2181'
17-
zk_path = '/openmldb'
21+
ZK = '127.0.0.1:2181'
22+
ZK_PATH = '/openmldb'
1823

1924
# db, deploy name and model_path will update to predict server. You only need to modify here.
20-
db_name = 'demo_db'
21-
deploy_name = 'demo'
25+
DB_NAME = 'demo_db'
26+
DEPLOY_NAME = 'demo'
2227
# save model to
23-
model_path = '/tmp/model.json'
28+
MODEL_PATH = '/tmp/model.json'
2429

25-
table_name = 'talkingdata' + str(int(time.time()))
30+
TABLE_NAME = 'talkingdata' + str(int(time.time()))
2631
# make sure that taskmanager can access the path
27-
train_feature_dir = '/tmp/train_feature'
32+
TRAIN_FEATURE_DIR = '/tmp/train_feature'
2833

29-
predict_server = 'localhost:8881'
34+
PREDICT_SERVER = 'localhost:8881'
3035

3136

3237
def column_string(col_tuple) -> str:
38+
"""convert to str, used by CREATE TABLE DDL"""
3339
return ' '.join(col_tuple)
3440

3541

36-
def xgb_modelfit_nocv(params, dtrain, dvalid, objective='binary:logistic', metrics='auc',
37-
feval=None, num_boost_round=3000, early_stopping_rounds=20):
38-
xgb_params = {
39-
'booster': 'gbtree',
40-
'obj': objective,
41-
'eval_metric': metrics,
42-
'num_leaves': 31, # we should let it be smaller than 2^(max_depth)
43-
'max_depth': -1, # -1 means no limit
44-
'max_bin': 255, # Number of bucketed bin for feature values
45-
'subsample': 0.6, # Subsample ratio of the training instance.
46-
'colsample_bytree': 0.3,
47-
'min_child_weight': 5,
48-
'alpha': 0, # L1 regularization term on weights
49-
'lambda': 0, # L2 regularization term on weights
50-
'nthread': 8,
51-
'verbosity': 0,
52-
}
53-
xgb_params.update(params)
54-
55-
print('preparing validation datasets')
56-
57-
evals_results = {}
58-
59-
bst1 = xgb.train(xgb_params,
60-
dtrain,
61-
evals=dvalid,
62-
evals_result=evals_results,
63-
num_boost_round=num_boost_round,
64-
early_stopping_rounds=early_stopping_rounds,
65-
verbose_eval=10,
66-
feval=feval)
67-
68-
n_estimators = bst1.best_iteration
69-
print('\nModel Report')
70-
print('n_estimators : ', n_estimators)
71-
print(metrics + ':', evals_results['eval'][metrics][n_estimators - 1])
72-
73-
return bst1
74-
75-
76-
# use pandas extension types to support NA in integer column
77-
dtypes = {
78-
'ip': 'UInt32',
79-
'app': 'UInt16',
80-
'device': 'UInt16',
81-
'os': 'UInt16',
82-
'channel': 'UInt16',
83-
'is_attributed': 'UInt8',
84-
'click_id': 'UInt32'
85-
}
86-
42+
# NOTE: ignore column 'attributed_time'
8743
train_schema = [('ip', 'int'), ('app', 'int'), ('device', 'int'),
88-
('os', 'int'), ('channel', 'int'), ('click_time', 'timestamp'), ('is_attributed', 'int')]
44+
('os', 'int'), ('channel', 'int'), ('click_time', 'timestamp'),
45+
('is_attributed', 'int')]
8946

9047

9148
def cut_data():
49+
"""prepare sample data, use train_schema, not the origin schema"""
9250
data_path = 'data/'
9351
sample_cnt = 10000 # you can prepare sample data by yourself
94-
print(f'Prepare train data, use {sample_cnt} rows, save it as train_sample.csv')
95-
train_df_tmp = pd.read_csv(data_path + 'train.csv', nrows=sample_cnt,
96-
dtype=dtypes, usecols=[c[0] for c in train_schema])
97-
assert len(train_df_tmp) == sample_cnt
52+
53+
print(
54+
f'Prepare train data, use {sample_cnt} rows, save it as train_sample.csv')
55+
df = pd.read_csv(data_path + 'train.csv',
56+
usecols=[c[0] for c in train_schema])
57+
9858
# take a portion from train sample data
99-
train_df_tmp.to_csv('train_sample.csv', index=False)
100-
del train_df_tmp
59+
df_tmp = df.sample(n=sample_cnt)
60+
assert len(df_tmp) == sample_cnt
61+
attr_count = df_tmp.is_attributed.value_counts()
62+
print(attr_count)
63+
# 'is_attributed' must have two values: 0, 1
64+
assert attr_count.count() > 1
65+
df_tmp.to_csv('train_sample.csv', index=False)
66+
del df_tmp
67+
del df
10168
gc.collect()
10269

10370

10471
def nothrow_execute(sql):
105-
# only used for drop deployment, cuz 'if not exist' is not supported now
72+
"""only used for drop deployment, cuz 'if not exist' is not supported now"""
10673
try:
10774
print('execute ' + sql)
10875
_, rs = connection.execute(sql)
@@ -112,110 +79,96 @@ def nothrow_execute(sql):
11279
print(e)
11380

11481

115-
print(f'Prepare openmldb, db {db_name} table {table_name}')
82+
# skip preparing sample data
11683
# cut_data()
84+
85+
86+
print(f'Prepare openmldb, db {DB_NAME} table {TABLE_NAME}')
11787
engine = db.create_engine(
118-
f'openmldb:///{db_name}?zk={zk}&zkPath={zk_path}')
88+
f'openmldb:///{DB_NAME}?zk={ZK}&zkPath={ZK_PATH}')
11989
connection = engine.connect()
12090

121-
connection.execute(f'CREATE DATABASE IF NOT EXISTS {db_name};')
91+
connection.execute(f'CREATE DATABASE IF NOT EXISTS {DB_NAME};')
12292
schema_string = ','.join(list(map(column_string, train_schema)))
123-
connection.execute(f'CREATE TABLE IF NOT EXISTS {table_name}({schema_string});')
93+
connection.execute(
94+
f'CREATE TABLE IF NOT EXISTS {TABLE_NAME}({schema_string});')
12495

125-
print('Load train_sample data to offline storage for training(hard copy)')
126-
connection.execute(f'USE {db_name}')
96+
# use soft copy after 9391eaab8f released
97+
print(f'Load train_sample data {os.path.abspath("train_sample.csv")} to offline storage for training(hard copy)')
98+
connection.execute(f'USE {DB_NAME}')
12799
connection.execute("SET @@execute_mode='offline';")
128100
# use sync offline job, to make sure `LOAD DATA` finished
129101
connection.execute('SET @@sync_job=true;')
130102
connection.execute('SET @@job_timeout=1200000;')
131-
# use soft link after https://github.com/4paradigm/OpenMLDB/issues/1565 fixed
132103
connection.execute(f"LOAD DATA INFILE 'file://{os.path.abspath('train_sample.csv')}' "
133-
f"INTO TABLE {table_name} OPTIONS(format='csv',header=true);")
104+
f"INTO TABLE {TABLE_NAME} OPTIONS(format='csv',header=true, deep_copy=true);")
134105

135106
print('Feature extraction')
136107
# the first column `is_attributed` is the label
137108
sql_part = f"""
138-
select is_attributed, ip, app, device, os, channel, hour(click_time) as hour, day(click_time) as day,
109+
select is_attributed, app, device, os, channel, hour(click_time) as hour, day(click_time) as day,
139110
count(channel) over w1 as qty,
140111
count(channel) over w2 as ip_app_count,
141112
count(channel) over w3 as ip_app_os_count
142-
from {table_name}
113+
from {TABLE_NAME}
143114
window
144115
w1 as (partition by ip order by click_time ROWS_RANGE BETWEEN 1h PRECEDING AND CURRENT ROW),
145116
w2 as(partition by ip, app order by click_time ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
146117
w3 as(partition by ip, app, os order by click_time ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
147118
"""
148119
# extraction will take time
149-
connection.execute('SET @@job_timeout=1200000;')
150-
connection.execute(f"{sql_part} INTO OUTFILE '{train_feature_dir}' OPTIONS(mode='overwrite');")
120+
connection.execute(
121+
f"{sql_part} INTO OUTFILE '{TRAIN_FEATURE_DIR}' OPTIONS(mode='overwrite');")
151122

152-
print(f'Load features from feature dir {train_feature_dir}')
123+
print(f'Load features from feature dir {TRAIN_FEATURE_DIR}')
153124
# train_feature_dir has multi csv files
154-
train_df = pd.concat(map(pd.read_csv, glob.glob(os.path.join('', train_feature_dir + '/*.csv'))))
155-
print('peek:')
156-
print(train_df.head())
157-
len_train = len(train_df)
158-
train_row_cnt = int(len_train * 3 / 4)
159-
train_df = train_df[(len_train - train_row_cnt):len_train]
160-
val_df = train_df[:(len_train - train_row_cnt)]
125+
# all int, so no need to set read types
126+
train_df = pd.concat(map(pd.read_csv, glob.glob(
127+
os.path.join('', TRAIN_FEATURE_DIR + '/*.csv'))))
161128

162-
print('train size: ', len(train_df))
163-
print('valid size: ', len(val_df))
129+
# drop column label
130+
X_data = train_df.drop('is_attributed', axis=1)
131+
y = train_df.is_attributed
164132

165-
target = 'is_attributed'
166-
predictors = ['app', 'device', 'os', 'channel', 'hour',
167-
'day', 'qty', 'ip_app_count', 'ip_app_os_count']
133+
# Split the dataset into train and Test
134+
SEED = 7
135+
TEST_SIZE = 0.25
136+
X_train, X_test, y_train, y_test = train_test_split(
137+
X_data, y, test_size=TEST_SIZE, random_state=SEED)
168138

169139
gc.collect()
170140

171141
print('Training by xgb')
172-
params_xgb = {
173-
'num_leaves': 7, # we should let it be smaller than 2^(max_depth)
174-
'max_depth': 3, # -1 means no limit
175-
'min_child_samples': 100,
176-
'max_bin': 100, # Number of bucketed bin for feature values
177-
'subsample': 0.7, # Subsample ratio of the training instance.
178-
# Subsample ratio of columns when constructing each tree.
179-
'colsample_bytree': 0.7,
180-
# Minimum sum of instance weight(hessian) needed in a child(leaf)
181-
'min_child_weight': 0
182-
}
183-
xgtrain = xgb.DMatrix(train_df[predictors].values,
184-
label=train_df[target].values)
185-
xgvalid = xgb.DMatrix(val_df[predictors].values, label=val_df[target].values)
186-
watchlist = [(xgvalid, 'eval'), (xgtrain, 'train')]
187-
188-
bst = xgb_modelfit_nocv(params_xgb,
189-
xgtrain,
190-
watchlist,
191-
objective='binary:logistic',
192-
metrics='auc',
193-
num_boost_round=300,
194-
early_stopping_rounds=50)
142+
143+
# default is binary:logistic
144+
train_model = XGBClassifier(use_label_encoder=False).fit(X_train, y_train)
145+
pred = train_model.predict(X_test)
146+
print('Classification report:\n', classification_report(y_test, pred))
147+
print(f'Accuracy score: {accuracy_score(y_test, pred) * 100}')
195148

196149
del train_df
197-
del val_df
198150
gc.collect()
199151

200-
print('Save model.json to ', model_path)
201-
bst.save_model(model_path)
152+
print('Save model to ', MODEL_PATH)
153+
train_model.save_model(MODEL_PATH)
202154

203155
print('Prepare online serving')
204156

205157
print('Deploy sql')
206158
connection.execute("SET @@execute_mode='online';")
207-
connection.execute(f'USE {db_name}')
208-
nothrow_execute(f'DROP DEPLOYMENT {deploy_name}')
209-
deploy_sql = f"""DEPLOY {deploy_name} {sql_part}"""
159+
connection.execute(f'USE {DB_NAME}')
160+
nothrow_execute(f'DROP DEPLOYMENT {DEPLOY_NAME}')
161+
deploy_sql = f"""DEPLOY {DEPLOY_NAME} {sql_part}"""
210162
print(deploy_sql)
211163
connection.execute(deploy_sql)
212164
print('Import data to online')
213165
# online feature extraction needs history data
214166
# set job_timeout bigger if the `LOAD DATA` job timeout
215167
connection.execute(
216168
f"LOAD DATA INFILE 'file://{os.path.abspath('train_sample.csv')}' "
217-
f"INTO TABLE {db_name}.{table_name} OPTIONS(mode='append',format='csv',header=true);")
169+
f"INTO TABLE {DB_NAME}.{TABLE_NAME} OPTIONS(mode='append',format='csv',header=true);")
218170

219171
print('Update model to predict server')
220-
infos = {'database': db_name, 'deployment': deploy_name, 'model_path': model_path}
221-
requests.post('http://' + predict_server + '/update', json=infos)
172+
infos = {'database': DB_NAME,
173+
'deployment': DEPLOY_NAME, 'model_path': MODEL_PATH}
174+
requests.post('http://' + PREDICT_SERVER + '/update', json=infos)

0 commit comments

Comments
 (0)