Skip to content

Commit 022562d

Browse files
committed
SNOW-747988: add stress tests and minor updates (#1653)
1 parent 6750a33 commit 022562d

File tree

11 files changed

+262
-88
lines changed

11 files changed

+262
-88
lines changed

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ author_email = [email protected]
99
license = Apache-2.0
1010
license_files = LICENSE.txt, NOTICE
1111
classifiers =
12-
Development Status :: 4 - Beta
12+
Development Status :: 3 - Alpha
1313
Environment :: Console
1414
Environment :: Other Environment
1515
Intended Audience :: Developers

src/snowflake/connector/cpp/ArrowIterator/CArrowTableIterator.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ void CArrowTableIterator::reconstructRecordBatches_nanoarrow()
7878
}
7979
if (scale > 0 && columnSchemaView.type != ArrowType::NANOARROW_TYPE_DECIMAL128)
8080
{
81-
// TODO: this log is causing seg fault
8281
logger->debug(__FILE__, __func__, __LINE__, "Convert fixed number column to double column, column scale %d, column type id: %d",
8382
scale, columnSchemaView.type);
8483
convertScaledFixedNumberColumn_nanoarrow(

src/snowflake/connector/cpp/ArrowIterator/arrow_iterator.pyx

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2012-2021 Snowflake Computing Inc. All rights reserved.
2+
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
33
#
44

55
# distutils: language = c++
@@ -213,12 +213,12 @@ cdef class PyArrowIterator(EmptyPyArrowIterator):
213213
self.unit = 'table'
214214
self.nanoarrow_Table = self.cIterator.getArrowArrayPtrs()
215215
self.nanoarrow_Schema = self.cIterator.getArrowSchemaPtrs()
216-
cdef vector[PyObject] py_batches
217-
batches = []
218-
for i in range(self.nanoarrow_Table.size()):
219-
array_ptr = self.nanoarrow_Table[i]
220-
schema_ptr = self.nanoarrow_Schema[i]
221-
batch = pyarrow.RecordBatch._import_from_c(array_ptr, schema_ptr)
222-
batches.append(batch)
223-
self.pyarrow_table = pyarrow.Table.from_batches(batches=batches)
216+
self.pyarrow_table = pyarrow.Table.from_batches(
217+
batches=[
218+
pyarrow.RecordBatch._import_from_c(
219+
self.nanoarrow_Table[i],
220+
self.nanoarrow_Schema[i]
221+
) for i in range(self.nanoarrow_Table.size())
222+
]
223+
)
224224
snow_logger.debug(msg=f"Batches read: {self.nanoarrow_Table.size()}", path_name=__file__, func_name="init_table_unit")

src/snowflake/connector/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
# Update this for the versions
22
# Don't change the forth version number from None
3-
VERSION = (3, 1, 0, None)
3+
VERSION = (3, 1, "0a1", None)

stress/__init__.py

Whitespace-only changes.

stress/unit/local_iterator.py

Lines changed: 0 additions & 76 deletions
This file was deleted.

test/stress/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#
2+
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
3+
#
File renamed without changes.

test/stress/e2e_iterator.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#
2+
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
3+
#
4+
5+
import argparse
6+
7+
import util as stress_util
8+
from util import task_memory_decorator, task_time_execution_decorator
9+
10+
import snowflake.connector
11+
from parameters import CONNECTION_PARAMETERS
12+
13+
stress_util.print_to_console = False
14+
can_draw = True
15+
try:
16+
import matplotlib.pyplot as plt
17+
except ImportError:
18+
can_draw = False
19+
20+
21+
def prepare_data(cursor, row_count=100, test_table_name="TEMP_ARROW_TEST_TABLE"):
22+
cursor.execute(
23+
f"""
24+
CREATE TEMP TABLE {test_table_name} (
25+
C1 BIGINT, C2 BINARY, C3 BOOLEAN, C4 CHAR, C5 CHARACTER, C6 DATE, C7 DATETIME, C8 DEC(12,3),
26+
C9 DECIMAL(12,3), C10 DOUBLE, C11 FLOAT, C12 INT, C13 INTEGER, C14 NUMBER, C15 REAL, C16 BYTEINT,
27+
C17 SMALLINT, C18 STRING, C19 TEXT, C20 TIME, C21 TIMESTAMP, C22 TIMESTAMP_TZ, C23 TIMESTAMP_LTZ,
28+
C24 TIMESTAMP_NTZ, C25 TINYINT, C26 VARBINARY, C27 VARCHAR);
29+
"""
30+
)
31+
32+
for _ in range(row_count):
33+
cursor.execute(
34+
f"""
35+
INSERT INTO {test_table_name} SELECT
36+
123456,
37+
TO_BINARY('HELP', 'UTF-8'),
38+
TRUE,
39+
'a',
40+
'b',
41+
'2023-07-18',
42+
'2023-07-18 12:51:00',
43+
984.28,
44+
268.35,
45+
123.456,
46+
738.132,
47+
6789,
48+
23456,
49+
12583,
50+
513.431,
51+
10,
52+
9,
53+
'abc456',
54+
'def123',
55+
'12:34:56',
56+
'2021-01-01 00:00:00 +0000',
57+
'2021-01-01 00:00:00 +0000',
58+
'2021-01-01 00:00:00 +0000',
59+
'2021-01-01 00:00:00 +0000',
60+
1,
61+
TO_BINARY('HELP', 'UTF-8'),
62+
'vxlmls!21321#@!#!'
63+
;
64+
"""
65+
)
66+
67+
68+
def task_fetch_rows(cursor, table_name):
69+
ret = cursor.execute(f"select * from {table_name}").fetchall()
70+
for _ in ret:
71+
pass
72+
73+
74+
def task_fetch_arrow_batches(cursor, table_name):
75+
ret = cursor.execute(f"select * from {table_name}").fetch_arrow_batches()
76+
for _ in ret:
77+
pass
78+
79+
80+
def execute_task(task, cursor, table_name, iteration_cnt):
81+
for _ in range(iteration_cnt):
82+
task(cursor, table_name)
83+
84+
85+
if __name__ == "__main__":
86+
parser = argparse.ArgumentParser()
87+
parser.add_argument("--iteration_cnt", type=int, default=5000)
88+
parser.add_argument("--data_file", type=str, default="test_data")
89+
parser.add_argument("--row_count", type=int, default=100)
90+
parser.add_argument("--test_table_name", type=str, default="ARROW_TEST_TABLE")
91+
args = parser.parse_args()
92+
93+
test_table_name = "TEMP_ARROW_TEST_TABLE"
94+
95+
with snowflake.connector.connect(
96+
**CONNECTION_PARAMETERS
97+
) as conn, conn.cursor() as cursor:
98+
if not args.test_table_name:
99+
print("preparing data started")
100+
prepare_data(cursor, args.row_count)
101+
print("preparing data is done")
102+
else:
103+
print("using data in existing table")
104+
test_table_name = args.test_table_name
105+
106+
memory_check_task = task_memory_decorator(task_fetch_arrow_batches)
107+
execute_task(memory_check_task, cursor, test_table_name, args.iteration_cnt)
108+
memory_records = stress_util.collect_memory_records()
109+
110+
perf_check_task = task_time_execution_decorator(task_fetch_arrow_batches)
111+
execute_task(perf_check_task, cursor, test_table_name, args.iteration_cnt)
112+
time_records = stress_util.collect_time_execution_records()
113+
114+
print("average time is", sum(time_records) / len(time_records))
115+
116+
if can_draw:
117+
plt.plot([i for i in range(len(time_records))], time_records)
118+
plt.title("per iteration execution time")
119+
plt.show()
120+
plt.plot(
121+
[item[0] for item in memory_records],
122+
[item[1] for item in memory_records],
123+
)
124+
plt.title("memory usage")
125+
plt.show()

test/stress/local_iterator.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
#
2+
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
3+
#
4+
5+
import argparse
6+
import base64
7+
import io
8+
9+
import util as stress_util
10+
from util import task_memory_decorator, task_time_execution_decorator
11+
12+
from snowflake.connector.arrow_context import ArrowConverterContext
13+
from snowflake.connector.arrow_iterator import PyArrowIterator
14+
from snowflake.connector.version import VERSION
15+
16+
stress_util.print_to_console = False
17+
can_draw = True
18+
try:
19+
import matplotlib.pyplot as plt
20+
except ImportError:
21+
can_draw = False
22+
23+
24+
def create_pyarrow_iterator(input_data):
25+
# create nanoarrow based iterator
26+
return PyArrowIterator(
27+
None,
28+
input_data,
29+
ArrowConverterContext(session_parameters={"TIMEZONE": "America/Los_Angeles"}),
30+
False,
31+
False,
32+
False,
33+
)
34+
35+
36+
def create_old_pyarrow_iterator(input_data):
37+
# created vendored arrow based iterator
38+
return PyArrowIterator(
39+
None,
40+
io.BytesIO(input_data),
41+
ArrowConverterContext(session_parameters={"TIMEZONE": "America/Los_Angeles"}),
42+
False,
43+
False,
44+
False,
45+
)
46+
47+
48+
def task_for_loop_iterator(input_data, create_iterator_method):
49+
for _ in create_iterator_method(input_data):
50+
pass
51+
52+
53+
def task_for_loop_table_iterator(input_data, create_iterator_method):
54+
iterator = create_iterator_method(input_data)
55+
iterator.init_table_unit()
56+
for _ in iterator:
57+
pass
58+
59+
60+
def execute_task(task, bytes_data, create_iterator_method, iteration_cnt):
61+
for _ in range(iteration_cnt):
62+
task(bytes_data, create_iterator_method)
63+
64+
65+
if __name__ == "__main__":
66+
parser = argparse.ArgumentParser()
67+
parser.add_argument("--iteration_cnt", type=int, default=100000)
68+
parser.add_argument("--data_file", type=str, default="test_data")
69+
args = parser.parse_args()
70+
71+
with open(args.data_file) as f:
72+
b64data = f.read()
73+
74+
decode_bytes = base64.b64decode(b64data)
75+
76+
# if connector is pre-release, then it's nanoarrow based iterator
77+
print(
78+
"Testing connector version: ",
79+
".".join([str(v) for v in VERSION if v is not None]),
80+
)
81+
create_arrow_iterator_method = (
82+
create_old_pyarrow_iterator
83+
if str(VERSION[2]).isdigit()
84+
else create_pyarrow_iterator
85+
)
86+
87+
perf_check_task_for_loop_iterator = task_time_execution_decorator(
88+
task_for_loop_table_iterator
89+
)
90+
memory_check_task_for_loop_iterator = task_memory_decorator(
91+
task_for_loop_table_iterator
92+
)
93+
94+
execute_task(
95+
memory_check_task_for_loop_iterator,
96+
decode_bytes,
97+
create_arrow_iterator_method,
98+
args.iteration_cnt,
99+
)
100+
memory_records = stress_util.collect_memory_records()
101+
execute_task(
102+
perf_check_task_for_loop_iterator,
103+
decode_bytes,
104+
create_arrow_iterator_method,
105+
args.iteration_cnt,
106+
)
107+
time_records = stress_util.collect_time_execution_records()
108+
109+
print("average time is", sum(time_records) / len(time_records))
110+
111+
if can_draw:
112+
plt.plot([i for i in range(len(time_records))], time_records)
113+
plt.title("per iteration execution time")
114+
plt.show()
115+
plt.plot(
116+
[item[0] for item in memory_records], [item[1] for item in memory_records]
117+
)
118+
plt.title("memory usage")
119+
plt.show()

0 commit comments

Comments
 (0)