Skip to content

Commit 6b649de

Browse files
authored
SNOW-1015703: fix mem leak and fix flaw in mem leak detect program (#1867)
1 parent ed29c92 commit 6b649de

File tree

9 files changed

+202
-101
lines changed

9 files changed

+202
-101
lines changed

DESCRIPTION.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne
88

99
# Release Notes
1010

11+
- v3.7.1(TBD)
12+
- Fixed a memory leak in decimal data conversion.
13+
1114
- v3.7.0(January 25,2024)
1215

1316
- Added a new boolean parameter `force_return_table` to `SnowflakeCursor.fetch_arrow_all` to force returning `pyarrow.Table` in case of zero rows.

src/snowflake/connector/nanoarrow_cpp/ArrowIterator/DecimalConverter.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,18 @@ PyObject* DecimalFromDecimalConverter::toPyObject(int64_t rowIndex) const {
8484
ArrowDecimalGetBytes(&arrowDecimal, outBytes);
8585
PyObject* int128_bytes = PyBytes_FromStringAndSize(&outBytes, 16);
8686
*/
87-
return PyObject_CallMethod(m_context, "DECIMAL128_to_decimal", "Si",
88-
int128_bytes, m_scale);
87+
PyObject* return_object = PyObject_CallMethod(
88+
m_context, "DECIMAL128_to_decimal", "Si", int128_bytes, m_scale);
89+
/**
90+
int128_bytes is a new referenced created by PyBytes_FromStringAndSize,
91+
to avoid memory leak we need to free it after usage
92+
check docs:
93+
https://docs.python.org/3/c-api/bytes.html#c.PyBytes_FromStringAndSize
94+
https://docs.python.org/3/c-api/refcounting.html#c.Py_XDECREF
95+
96+
*/
97+
Py_XDECREF(int128_bytes);
98+
return return_object;
8999
}
90100

91101
} // namespace sf

test/stress/README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
## quick start for performance testing
2+
3+
4+
### setup
5+
6+
note: you need to put your own credentials into parameters.py
7+
8+
```bash
9+
git clone [email protected]:snowflakedb/snowflake-connector-python.git
10+
cd snowflake-connector-python/test/stress
11+
pip install -r dev_requirements.txt
12+
touch parameters.py # set your own connection parameters
13+
```
14+
15+
### run unit perf test
16+
17+
This test will use the test dataset stored in the "stress_test_data" folder.
18+
check the read me in the folder to see what datasets are available.
19+
20+
```python
21+
python local_iterator.py
22+
```
23+
24+
### run e2e perf test
25+
26+
This test will run query against snowflake. update the script to prepare the data and run the test.
27+
28+
```python
29+
python e2e_iterator.py
30+
```

test/stress/e2e_iterator.py

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import argparse
1616

1717
import util as stress_util
18-
from util import task_memory_decorator, task_time_execution_decorator
18+
from util import task_execution_decorator
1919

2020
import snowflake.connector
2121
from parameters import CONNECTION_PARAMETERS
@@ -124,23 +124,31 @@ def execute_task(task, cursor, table_name, iteration_cnt):
124124
) as conn, conn.cursor() as cursor:
125125
test_table_name = args.test_table_name
126126

127-
memory_check_task = task_memory_decorator(task_fetch_arrow_batches)
128-
execute_task(memory_check_task, cursor, test_table_name, args.iteration_cnt)
129-
memory_records = stress_util.collect_memory_records()
130-
131-
perf_check_task = task_time_execution_decorator(task_fetch_arrow_batches)
132-
execute_task(perf_check_task, cursor, test_table_name, args.iteration_cnt)
133-
time_records = stress_util.collect_time_execution_records()
134-
135-
print("average time is", sum(time_records) / len(time_records))
127+
perf_record_file = "stress_perf_record"
128+
memory_record_file = "stress_memory_record"
129+
with open(perf_record_file, "w") as perf_file, open(
130+
memory_record_file, "w"
131+
) as memory_file:
132+
task = task_execution_decorator(
133+
task_fetch_arrow_batches, perf_file, memory_file
134+
)
135+
execute_task(task, cursor, test_table_name, args.iteration_cnt)
136136

137137
if can_draw:
138-
plt.plot([i for i in range(len(time_records))], time_records)
139-
plt.title("per iteration execution time")
140-
plt.show()
141-
plt.plot(
142-
[item[0] for item in memory_records],
143-
[item[1] for item in memory_records],
144-
)
145-
plt.title("memory usage")
146-
plt.show()
138+
with open(perf_record_file) as perf_file, open(
139+
memory_record_file
140+
) as memory_file:
141+
# sample rate
142+
perf_lines = perf_file.readlines()
143+
perf_records = [float(line) for line in perf_lines]
144+
145+
memory_lines = memory_file.readlines()
146+
memory_records = [float(line) for line in memory_lines]
147+
148+
plt.plot([i for i in range(len(perf_records))], perf_records)
149+
plt.title("per iteration execution time")
150+
plt.show(block=False)
151+
plt.figure()
152+
plt.plot([i for i in range(len(memory_records))], memory_records)
153+
plt.title("memory usage")
154+
plt.show(block=True)

test/stress/local_iterator.py

Lines changed: 48 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import secrets
2020

2121
import util as stress_util
22-
from util import task_memory_decorator, task_time_execution_decorator
22+
from util import task_execution_decorator
2323

2424
from snowflake.connector.arrow_context import ArrowConverterContext
2525
from snowflake.connector.nanoarrow_arrow_iterator import (
@@ -130,13 +130,13 @@ def execute_task(
130130
parser.add_argument(
131131
"--iteration_cnt",
132132
type=int,
133-
default=10,
133+
default=100000,
134134
help="how many times to run the test function, default is 100000",
135135
)
136136
parser.add_argument(
137137
"--data_file",
138138
type=str,
139-
default="test_data",
139+
default="stress_test_data/test_multi_column_row_decimal_data",
140140
help="a local file to read data from, the file contains base64 encoded string returned from snowflake",
141141
)
142142
parser.add_argument(
@@ -153,53 +153,59 @@ def execute_task(
153153

154154
args = parser.parse_args()
155155

156-
with open(args.data_file) as f:
157-
b64data = f.read()
156+
try:
157+
# file contains base64 encoded data
158+
with open(args.data_file) as f:
159+
b64data = f.read()
158160

159-
decode_bytes = base64.b64decode(b64data)
161+
decode_bytes = base64.b64decode(b64data)
162+
except UnicodeDecodeError:
163+
# file contains raw bytes data
164+
with open(args.data_file, "rb") as f:
165+
decode_bytes = f.read()
160166

161167
# if connector is pre-release, then it's nanoarrow based iterator
162168
print(
163169
"Testing connector version: ",
164170
".".join([str(v) for v in VERSION if v is not None]),
165171
)
166172

167-
perf_check_task_for_loop_iterator = task_time_execution_decorator(
168-
task_for_loop_iterator_expected_error
169-
if args.test_error_method
170-
else task_for_loop_iterator
171-
)
172-
memory_check_task_for_loop_iterator = task_memory_decorator(
173-
task_for_loop_iterator_expected_error
174-
if args.test_error_method
175-
else task_for_loop_iterator
176-
)
177-
178-
execute_task(
179-
memory_check_task_for_loop_iterator,
180-
decode_bytes,
181-
create_nanoarrow_pyarrow_iterator,
182-
args.iteration_cnt,
183-
args.use_table_unit,
184-
)
185-
memory_records = stress_util.collect_memory_records()
186-
execute_task(
187-
perf_check_task_for_loop_iterator,
188-
decode_bytes,
189-
create_nanoarrow_pyarrow_iterator,
190-
args.iteration_cnt,
191-
args.use_table_unit,
192-
)
193-
time_records = stress_util.collect_time_execution_records()
173+
perf_record_file = "stress_perf_record"
174+
memory_record_file = "stress_memory_record"
175+
with open(perf_record_file, "w") as perf_file, open(
176+
memory_record_file, "w"
177+
) as memory_file:
178+
task_for_loop_iterator = task_execution_decorator(
179+
task_for_loop_iterator_expected_error
180+
if args.test_error_method
181+
else task_for_loop_iterator,
182+
perf_file,
183+
memory_file,
184+
)
194185

195-
print("average time is", sum(time_records) / len(time_records))
186+
execute_task(
187+
task_for_loop_iterator,
188+
decode_bytes,
189+
create_nanoarrow_pyarrow_iterator,
190+
args.iteration_cnt,
191+
args.use_table_unit,
192+
)
196193

197194
if can_draw:
198-
plt.plot([i for i in range(len(time_records))], time_records)
199-
plt.title("per iteration execution time")
200-
plt.show()
201-
plt.plot(
202-
[item[0] for item in memory_records], [item[1] for item in memory_records]
203-
)
204-
plt.title("memory usage")
205-
plt.show()
195+
with open(perf_record_file) as perf_file, open(
196+
memory_record_file
197+
) as memory_file:
198+
# sample rate
199+
perf_lines = perf_file.readlines()
200+
perf_records = [float(line) for line in perf_lines]
201+
202+
memory_lines = memory_file.readlines()
203+
memory_records = [float(line) for line in memory_lines]
204+
205+
plt.plot([i for i in range(len(perf_records))], perf_records)
206+
plt.title("per iteration execution time")
207+
plt.show(block=False)
208+
plt.figure()
209+
plt.plot([i for i in range(len(memory_records))], memory_records)
210+
plt.title("memory usage")
211+
plt.show(block=True)
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# Test Data Description
2+
3+
## test_data_all_types
4+
5+
This dataset contains 1 batch, 100 rows of data, and the schema of the data contains 27 columns.
6+
7+
### sample row data:
8+
9+
```python
10+
(
11+
123456,
12+
bytearray(b'HELP'),
13+
True,
14+
'a',
15+
'b',
16+
datetime.date(2023, 7, 18),
17+
datetime.datetime(2023, 7, 18, 12, 51),
18+
Decimal('984.280'),
19+
Decimal('268.350'),
20+
123.456,
21+
738.132,
22+
6789,
23+
23456,
24+
12583,
25+
513.431,
26+
10,
27+
9,
28+
'fjisfsj',
29+
'wkdoajde131',
30+
datetime.time(12, 34, 56),
31+
datetime.datetime(2021, 1, 1, 0, 0),
32+
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=<UTC>),
33+
datetime.datetime(2020, 12, 31, 16, 0, tzinfo=<DstTzInfo 'America/Los_Angeles' PST-1 day, 16:00:00 STD>),
34+
datetime.datetime(2021, 1, 1, 0, 0),
35+
1,
36+
bytearray(b'HELP'),
37+
'vxlmls!21321#@!#!'
38+
)
39+
```
40+
41+
## test_multi_column_row_decimal_data
42+
43+
This dataset contains 9 batches, each batch has approximately ~1700 rows of data, and the schema of the data contains 19 columns.
44+
45+
### sample row data:
46+
```python
47+
(
48+
datetime.date(2021, 1, 3),
49+
8371,
50+
'segment_no_0',
51+
1,
52+
7,
53+
2,
54+
Decimal('0.285714'),
55+
Decimal('1.000'),
56+
Decimal('7.000'),
57+
Decimal('2.000'),
58+
Decimal('0.285714000'),
59+
Decimal('1.000'),
60+
Decimal('7.000'),
61+
Decimal('2.000'),
62+
Decimal('0.285714000'),
63+
Decimal('1.000'),
64+
Decimal('7.000'),
65+
Decimal('2.000'),
66+
Decimal('0.285714000')
67+
)
68+
```
File renamed without changes.
3.58 MB
Binary file not shown.

test/stress/util.py

Lines changed: 14 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -8,50 +8,26 @@
88

99
process = psutil.Process()
1010

11-
last_memory_record = None
12-
memory_records = []
13-
time_records = []
14-
memory_decoration_execution_time = 0
15-
print_to_console = False
11+
SAMPLE_RATE = 10 # record data evey SAMPLE_RATE execution
1612

1713

18-
def collect_memory_records():
19-
memory_records.append(last_memory_record)
20-
return memory_records
21-
22-
23-
def collect_time_execution_records():
24-
return time_records
25-
26-
27-
def task_memory_decorator(func):
28-
memory_records.clear()
29-
global memory_decoration_execution_time
30-
memory_decoration_execution_time = 0
31-
32-
def wrapper(*args, **kwargs):
33-
global memory_decoration_execution_time
34-
global print_to_console
35-
global last_memory_record
36-
func(*args, **kwargs)
37-
percent = process.memory_percent()
38-
if not memory_records or (memory_records and percent != memory_records[-1][1]):
39-
memory_records.append((memory_decoration_execution_time, percent))
40-
memory_decoration_execution_time += 1
41-
last_memory_record = (memory_decoration_execution_time, percent)
42-
if print_to_console:
43-
print(memory_decoration_execution_time, percent)
44-
45-
return wrapper
46-
47-
48-
def task_time_execution_decorator(func):
49-
time_records.clear()
14+
def task_execution_decorator(func, perf_file, memory_file):
15+
count = 0
5016

5117
def wrapper(*args, **kwargs):
5218
start = time.time()
5319
func(*args, **kwargs)
20+
memory_usage = (
21+
process.memory_info().rss / 1024 / 1024
22+
) # rss is of unit bytes, we get unit in MB
5423
period = time.time() - start
55-
time_records.append(period)
24+
nonlocal count
25+
if count % SAMPLE_RATE == 0:
26+
perf_file.write(str(period) + "\n")
27+
print(f"execution time {count}")
28+
print(f"memory usage: {memory_usage} MB")
29+
print(f"execution time: {period} s")
30+
memory_file.write(str(memory_usage) + "\n")
31+
count += 1
5632

5733
return wrapper

0 commit comments

Comments
 (0)