Skip to content

Commit 33311d1

Browse files
AlVaskerad-pat
authored andcommitted
Benchmarking test for fast executemany with and without BCP
1 parent d82ec51 commit 33311d1

File tree

1 file changed

+277
-0
lines changed

1 file changed

+277
-0
lines changed
Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
# benchmark_bcp.py
2+
import time
3+
import random
4+
import string
5+
import datetime as dt
6+
from contextlib import contextmanager
7+
from decimal import Decimal
8+
9+
import sqlalchemy as sa
10+
from sqlalchemy import (
11+
Table, Column, Integer, BigInteger, String, Float,
12+
Boolean, Time, MetaData, insert, event
13+
)
14+
# SQL Server-specific types for precise mapping
15+
from sqlalchemy.dialects.mssql import DATETIME2, DATETIMEOFFSET
16+
17+
SQL_COPT_SS_BCP = 1219
18+
SQL_BCP_ON = 1
19+
20+
# ---------- CONFIG ----------
21+
# Adjust connection string as needed
22+
CONN_URL = (
23+
"mssql+pyodbc://sa:YourStrong!Passw0rd@localhost,1433/BcpTest"
24+
"?driver=ODBC+Driver+18+for+SQL+Server&Encrypt=no"
25+
)
26+
TOTAL_ROWS = 50_000
27+
CHUNK_ROWS = 10_000
28+
BCP_BATCH = 10_000
29+
NAME_LEN = 20
30+
# ----------------------------
31+
32+
md = MetaData()
33+
t = Table(
34+
"BcpTest_extend", md,
35+
Column("id", Integer, nullable=False), # SQLINT4
36+
Column("id_big", BigInteger, nullable=False), # SQL_BIGINT
37+
Column("is_active", Boolean, nullable=False), # BIT
38+
Column("name", String(50)), # VARCHAR
39+
Column("val", Float), # FLOAT(53)
40+
Column("t", Time, nullable=True), # TIME (Python datetime.time)
41+
Column("d_str", String(10)), # DATE as string
42+
Column("dt_str", String(32)), # DATETIME as string
43+
Column("amount_dec_18_4", sa.DECIMAL(18, 4)), # DECIMAL(18,4)
44+
Column("amount_num_38_0", sa.NUMERIC(38, 0)), # NUMERIC(38,0)
45+
Column("d_native", sa.DATE), # SQL Server DATE
46+
Column("dt2_native", DATETIME2(precision=7)), # SQL Server DATETIME2(7)
47+
Column("dto_native", DATETIMEOFFSET(precision=7)), # SQL Server DATETIMEOFFSET(7)
48+
schema="dbo",
49+
)
50+
51+
def make_engine(use_sa_fast_executemany: bool):
52+
return sa.create_engine(
53+
CONN_URL,
54+
connect_args={"attrs_before": {SQL_COPT_SS_BCP: SQL_BCP_ON}},
55+
fast_executemany=use_sa_fast_executemany,
56+
pool_pre_ping=True,
57+
)
58+
59+
def set_bcp_event(engine, enable_bcp: bool, bcp_batch_rows: int):
60+
@event.listens_for(engine, "before_cursor_execute")
61+
def _apply_bcp_options(conn, cursor, statement, parameters, context, executemany):
62+
if not executemany:
63+
return
64+
try:
65+
cursor.use_bcp_fast = bool(enable_bcp)
66+
if enable_bcp:
67+
cursor.bcp_batch_rows = int(bcp_batch_rows)
68+
except AttributeError:
69+
pass
70+
71+
@contextmanager
72+
def timer(label):
73+
t0 = time.perf_counter()
74+
yield lambda: time.perf_counter() - t0
75+
dt_elapsed = time.perf_counter() - t0
76+
print(f"{label}: {dt_elapsed:.3f}s")
77+
78+
def rand_name(n=NAME_LEN):
79+
alph = string.ascii_letters
80+
return "".join(random.choice(alph) for _ in range(n))
81+
82+
def rand_time():
83+
# Random time with microseconds
84+
return dt.time(
85+
hour=random.randint(0, 23),
86+
minute=random.randint(0, 59),
87+
second=random.randint(0, 59),
88+
microsecond=random.randint(0, 999999),
89+
)
90+
91+
def rand_date_str():
92+
# 'YYYY-MM-DD'
93+
start = dt.date(2000, 1, 1)
94+
end = dt.date(2030, 12, 31)
95+
days = (end - start).days
96+
d = start + dt.timedelta(days=random.randint(0, days))
97+
return d.isoformat()
98+
99+
def rand_datetime_str():
100+
# 'YYYY-MM-DD HH:MM:SS.ffffff'
101+
start = dt.datetime(2000, 1, 1)
102+
end = dt.datetime(2030, 12, 31, 23, 59, 59, 999999)
103+
delta = end - start
104+
us_total = random.randrange(delta.days * 24 * 3600 * 1_000_000 + delta.seconds * 1_000_000 + delta.microseconds)
105+
d = start + dt.timedelta(microseconds=us_total)
106+
return d.isoformat(sep=" ")
107+
108+
# NEW: native generators
109+
def rand_date():
110+
# Python datetime.date in a broad range SQL Server DATE supports (0001-01-01..9999-12-31)
111+
# Keep it reasonable for tests:
112+
start = dt.date(2000, 1, 1)
113+
end = dt.date(2030, 12, 31)
114+
days = (end - start).days
115+
return start + dt.timedelta(days=random.randint(0, days))
116+
117+
def rand_datetime2():
118+
# Naive datetime (no tzinfo), full microseconds; SQL Server DATETIME2(7) stores 100ns
119+
start = dt.datetime(2000, 1, 1)
120+
end = dt.datetime(2030, 12, 31, 23, 59, 59, 999999)
121+
delta = end - start
122+
us_total = random.randrange(delta.days * 24 * 3600 * 1_000_000 + delta.seconds * 1_000_000 + delta.microseconds)
123+
return start + dt.timedelta(microseconds=us_total)
124+
125+
def rand_datetimeoffset():
126+
# tz-aware datetime with minute-aligned UTC offset within SQL Server’s +/-14:00 bounds
127+
base = rand_datetime2()
128+
# choose offsets like -12:00 .. +14:00, minute aligned (often 15/30/45 blocks)
129+
hour = random.randint(-12, 14)
130+
minute = random.choice([0, 15, 30, 45])
131+
# clamp boundary cases to keep within [-14:00, +14:00] safely
132+
if hour == -12 and random.random() < 0.2:
133+
minute = 0
134+
if hour == 14:
135+
minute = 0
136+
sign = -1 if hour < 0 else 1
137+
tz = dt.timezone(dt.timedelta(hours=hour, minutes=sign*minute))
138+
return base.replace(tzinfo=tz)
139+
140+
# Fast, safe Decimal generators (stay comfortably within column precision)
141+
def rand_decimal(precision: int, scale: int, max_int_digits: int = 12) -> Decimal:
142+
int_digits = max(1, min(precision - scale, max_int_digits))
143+
int_part = random.randint(0, 10**int_digits - 1)
144+
sign = -1 if random.random() < 0.2 else 1
145+
if scale > 0:
146+
frac_digits = min(scale, 6) # keep generator fast
147+
frac_part = random.randint(0, 10**frac_digits - 1)
148+
val = Decimal(sign) * (Decimal(int_part) + (Decimal(frac_part) / (Decimal(10) ** frac_digits)))
149+
q = Decimal(1) / (Decimal(10) ** scale) # quantize to exact scale
150+
return val.quantize(q)
151+
else:
152+
return Decimal(sign) * Decimal(int_part)
153+
154+
def rows_generator(start_id: int, count: int):
155+
big_base = 9_000_000_000 # ensure > 32-bit range sometimes
156+
for i in range(start_id, start_id + count):
157+
yield {
158+
"id": i,
159+
"id_big": big_base + i,
160+
"is_active": bool(i & 1),
161+
"name": rand_name(),
162+
"val": (random.random() * 100.0),
163+
"t": rand_time(),
164+
"d_str": rand_date_str(),
165+
"dt_str": rand_datetime_str(),
166+
"amount_dec_18_4": rand_decimal(18, 4),
167+
"amount_num_38_0": rand_decimal(38, 0),
168+
"d_native": rand_date(), # DATE
169+
"dt2_native": rand_datetime2(), # DATETIME2(7)
170+
"dto_native": rand_datetimeoffset() # DATETIMEOFFSET(7)
171+
}
172+
173+
def ensure_table(engine):
174+
with engine.begin() as conn:
175+
conn.exec_driver_sql("""
176+
IF OBJECT_ID('dbo.BcpTest_extend', 'U') IS NOT NULL
177+
DROP TABLE dbo.BcpTest_extend;
178+
""")
179+
md.create_all(conn)
180+
# Optional (benchmarking):
181+
# conn.exec_driver_sql("ALTER DATABASE BcpTest SET RECOVERY SIMPLE;")
182+
# conn.exec_driver_sql("ALTER DATABASE BcpTest MODIFY FILE (NAME = BcpTest_log, SIZE = 1024MB, FILEGROWTH = 256MB);")
183+
184+
def bulk_insert(engine, use_bcp: bool, total_rows: int, chunk_rows: int):
185+
total_inserted = 0
186+
total_time = 0.0
187+
188+
with timer(f"TOTAL {'BCP' if use_bcp else 'NORMAL'}"):
189+
if use_bcp:
190+
with engine.connect() as conn:
191+
conn = conn.execution_options(isolation_level="AUTOCOMMIT")
192+
ins = insert(t)
193+
194+
remaining = total_rows
195+
next_id = 1
196+
while remaining > 0:
197+
n = min(remaining, chunk_rows)
198+
chunk = list(rows_generator(next_id, n))
199+
200+
t0 = time.perf_counter()
201+
r = conn.execute(ins, chunk)
202+
dt_chunk = time.perf_counter() - t0
203+
204+
total_time += dt_chunk
205+
rc = getattr(r, "rowcount", -1)
206+
total_inserted += rc if (rc is not None and rc >= 0) else n
207+
208+
done = total_rows - (remaining - n)
209+
rate = n / dt_chunk if dt_chunk > 0 else float("inf")
210+
print(f"Chunk {done:>9}/{total_rows} rows in {dt_chunk:.3f}s ({rate:,.0f} rows/s)")
211+
212+
next_id += n
213+
remaining -= n
214+
else:
215+
with engine.begin() as conn:
216+
ins = insert(t)
217+
218+
remaining = total_rows
219+
next_id = 1
220+
while remaining > 0:
221+
n = min(remaining, chunk_rows)
222+
chunk = list(rows_generator(next_id, n))
223+
224+
t0 = time.perf_counter()
225+
r = conn.execute(ins, chunk)
226+
dt_chunk = time.perf_counter() - t0
227+
228+
total_time += dt_chunk
229+
rc = getattr(r, "rowcount", -1)
230+
total_inserted += rc if (rc is not None and rc >= 0) else n
231+
232+
done = total_rows - (remaining - n)
233+
rate = n / dt_chunk if dt_chunk > 0 else float("inf")
234+
print(f"Chunk {done:>9}/{total_rows} rows in {dt_chunk:.3f}s ({rate:,.0f} rows/s)")
235+
236+
next_id += n
237+
remaining -= n
238+
239+
rows_per_s = total_rows / total_time if total_time > 0 else float("inf")
240+
print(
241+
f"\nSummary [{'BCP' if use_bcp else 'NORMAL'}]: "
242+
f"{total_rows:,} rows in {total_time:.3f}s → {rows_per_s:,.0f} rows/s "
243+
f"(reported inserted: {total_inserted:,})\n"
244+
)
245+
246+
def verify_count(engine, expected: int):
247+
with engine.begin() as conn:
248+
c = conn.exec_driver_sql("SELECT COUNT(*) FROM dbo.BcpTest_extend;").scalar()
249+
print(f"Verify table count: {c:,} (expected {expected:,})")
250+
return c
251+
252+
def main():
253+
# NORMAL
254+
eng_normal = make_engine(use_sa_fast_executemany=False)
255+
ensure_table(eng_normal)
256+
bulk_insert(eng_normal, use_bcp=False, total_rows=TOTAL_ROWS, chunk_rows=CHUNK_ROWS)
257+
verify_count(eng_normal, TOTAL_ROWS)
258+
259+
# reset
260+
with eng_normal.begin() as conn:
261+
conn.exec_driver_sql("TRUNCATE TABLE dbo.BcpTest_extend;")
262+
263+
# BCP fast path
264+
eng_bcp = sa.create_engine(
265+
CONN_URL,
266+
connect_args={"attrs_before": {SQL_COPT_SS_BCP: SQL_BCP_ON}},
267+
fast_executemany=True,
268+
isolation_level="AUTOCOMMIT",
269+
pool_pre_ping=True,
270+
)
271+
set_bcp_event(eng_bcp, enable_bcp=True, bcp_batch_rows=BCP_BATCH)
272+
ensure_table(eng_bcp)
273+
bulk_insert(eng_bcp, use_bcp=True, total_rows=TOTAL_ROWS, chunk_rows=CHUNK_ROWS)
274+
verify_count(eng_bcp, TOTAL_ROWS)
275+
276+
if __name__ == "__main__":
277+
main()

0 commit comments

Comments
 (0)