Skip to content

Commit 22247fe

Browse files
committed
update: add batch size
1 parent 817bf65 commit 22247fe

File tree

3 files changed

+21
-18
lines changed

3 files changed

+21
-18
lines changed

fake-stream/fakestream/FakeStreamWorker.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@ def dump_log(line, log_path=f'logs/{fileftime()}.log'):
1212

1313

1414
class FakeStreamWorker(AbstractParallel, ABC):
15-
def __init__(self, log_func, regex_params, **kwargs):
15+
def __init__(self, log_func, regex_params, batch_size, **kwargs):
1616
super().__init__(**kwargs)
1717
self.print_log = log_func
1818
self.regex_params = regex_params
19+
self.batch_size = batch_size
1920
self.host = None
2021
self.dbname = None
21-
self.workers_num = 1
22-
self.connection = None
2322
self.fix_val = None
24-
self.time_offset = None
2523
self.speed = None
24+
self.workers_num = 1
2625
self.table_names = None
26+
self.connection = None
2727
self.run_flag = None
2828
self.table_map = None
2929

@@ -57,16 +57,19 @@ def process_task(self, task, **kwargs):
5757
self.table_map[table_name] = update_table(
5858
exec_sql(f"desc {table_name}", self.connection, reshape=False), self.regex_params)
5959
for k, v in self.table_map.items():
60-
fs = fill_table(v, self.regex_params, self.host, self.dbname, k, default_regex=self.fix_val)
61-
ret, res = add([fs], k, self.connection, freq=1)
60+
records = []
61+
for b in range(self.batch_size):
62+
record = fill_table(v, self.regex_params, self.host, self.dbname, k, default_regex=self.fix_val)
63+
records.append(record)
64+
ret, res = add(records, k, self.connection, freq=len(records))
6265
if ret:
63-
success_cnt += 1
66+
success_cnt += len(records)
6467
else:
65-
failure_cnt += 1
68+
failure_cnt += len(records)
6669
for line in res:
6770
self.print_log(LOGGER.info(f'execute sql "{line}"'))
6871
except Exception as e:
69-
failure_cnt += 1
72+
failure_cnt += self.batch_size
7073
self.print_log(LOGGER.error(e.args))
7174
self.print_log(LOGGER.error(f'execute failure!'))
7275

fake-stream/fakestream/MainApp.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def __init__(self, parent, title, size, *args, **kwargs):
4545

4646
self.regex_params = read_bin('regex-parameters.json')
4747
self.parallel = FakeStreamWorker(is_join=False, print_process=False, log_func=self.print_log,
48-
regex_params=self.regex_params)
48+
regex_params=self.regex_params, batch_size=1)
4949
self.db_params = read_bin(self.db_parameter_path)
5050
self.db_params = {} if self.db_params is None else self.db_params
5151
self.sorted_params = []
@@ -183,9 +183,9 @@ def InitUI(self):
183183
paramGrid = wx.GridBagSizer(0, 0)
184184
self.fixValST = wx.StaticText(panel, -1, "设定值")
185185
self.fixValTC = wx.TextCtrl(panel, -1, style=wx.ALIGN_LEFT, value='6')
186-
self.offsetST = wx.StaticText(panel, -1, "时间补偿(秒)")
187-
self.offsetTC = wx.TextCtrl(panel, -1, style=wx.ALIGN_LEFT, value='0',
188-
validator=NumberValidator(DataType.FLOAT))
186+
self.offsetST = wx.StaticText(panel, -1, "批大小")
187+
self.offsetTC = wx.TextCtrl(panel, -1, style=wx.ALIGN_LEFT, value='1',
188+
validator=NumberValidator(DataType.UINT))
189189
self.workerST = wx.StaticText(panel, -1, "线程数")
190190
self.workerTC = wx.TextCtrl(panel, -1, style=wx.ALIGN_LEFT, value='1',
191191
validator=NumberValidator(DataType.UINT))
@@ -265,8 +265,8 @@ def OnClickConnect(self, event):
265265
def OnClickRun(self, event):
266266
self.OnClickConnect(event)
267267
run_params = dict(
268+
batch_size=int(self.offsetTC.GetValue()),
268269
fix_val=self.fixValTC.GetValue(),
269-
time_offset=float(self.offsetTC.GetValue()),
270270
speed=float(self.speedsTC.GetValue()),
271271
workers_num=int(self.workerTC.GetValue()),
272272
table_names=self.tbNameTC.GetValue().split(),
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
{
2-
"Name.*.*.*.*": "Faker::fake.name()",
3-
"CountryCode.*.*.*.*": "Faker::fake.country_code()",
4-
"District.*.*.*.*": "Faker::fake.district()",
2+
"name.*.*.*.*": "::fake.name()",
3+
"countryCode.*.*.*.*": "::fake.country_code()",
4+
"district.*.*.*.*": "::fake.district()",
55
"*.*.*.*.int": "6+",
6-
"citycol.*.*.*.*": "Faker::fake.word()",
6+
"citycol.*.*.*.*": "::fake.word()",
77
"*.*.*.*.datetime": "::time.strftime('%Y-%m-%d %H:%M:%S.%m')"
88
}

0 commit comments

Comments
 (0)