Skip to content

Commit d2e2ce1

Browse files
authored
Merge pull request #32 from chdb-io/fixAggSlowCSV
Fix common bug of aggregation slow
2 parents 222ba88 + f3b21ce commit d2e2ce1

File tree

4 files changed

+275
-5
lines changed

4 files changed

+275
-5
lines changed

src/Client/ClientBase.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,12 @@ void ClientBase::onData(Block & block, ASTPtr parsed_query)
449449
}
450450

451451
/// Received data block is immediately displayed to the user.
452-
output_format->flush();
452+
// FIXME: this is a hack to avoid flush the writebuffer when we are inside chdb
453+
// which will cause memory doubled every call of onData.
454+
// Another possible solution is fix the logic in WriteBufferFromVector.nextImpl()
455+
// We could use `if (&inside_chdb == nullptr || !inside_chdb)` here but it will cause
456+
// clickhouse-local has different behavior with chdb, which will make debugging harder.
457+
// output_format->flush();
453458

454459
/// Restore progress bar after data block.
455460
if (need_render_progress && tty_buf)
@@ -526,8 +531,7 @@ try
526531
}
527532
else
528533
{
529-
// query_result_memory.resize(DBMS_DEFAULT_BUFFER_SIZE);
530-
query_result_memory = std::make_unique<std::vector<char>>();
534+
query_result_memory = std::make_unique<std::vector<char>>(DBMS_DEFAULT_BUFFER_SIZE);
531535
query_result_buf = std::make_shared<WriteBufferFromVector<std::vector<char>>>(*query_result_memory.get());
532536

533537
out_buf = query_result_buf.get();
@@ -620,7 +624,8 @@ try
620624
else
621625
output_format = global_context->getOutputFormat(current_format, out_file_buf ? *out_file_buf : *out_buf, block);
622626

623-
output_format->setAutoFlush();
627+
// See comment above `output_format->flush();`
628+
// output_format->setAutoFlush();
624629
}
625630
}
626631
catch (...)

tests/run_all.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
test_loader = unittest.TestLoader()
66
test_suite = test_loader.discover('./')
77

8-
test_runner = unittest.TextTestRunner()
8+
test_runner = unittest.TextTestRunner(verbosity=2)
99
ret = test_runner.run(test_suite)
1010

1111
# if any test fails, exit with non-zero code

tests/test_issue31.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#!python3
2+
3+
import os
4+
import time
5+
import hashlib
6+
import unittest
7+
import chdb
8+
import zipfile
9+
import urllib.request
10+
11+
from timeout_decorator import timeout
12+
13+
csv_url = "https://media.githubusercontent.com/media/datablist/sample-csv-files/main/files/organizations/organizations-2000000.zip"
14+
15+
16+
# download csv file, and unzip it
17+
def download_and_extract(url, save_path):
18+
print("\nDownloading file...")
19+
urllib.request.urlretrieve(url, save_path)
20+
21+
print("Extracting file...")
22+
with zipfile.ZipFile(save_path, "r") as zip_ref:
23+
zip_ref.extractall(os.path.dirname(save_path))
24+
25+
print("Done!")
26+
27+
28+
# @timeout(60, use_signals=False)
29+
30+
import signal
31+
32+
33+
def payload():
34+
now = time.time()
35+
res = chdb.query(
36+
'select Name, count(*) cnt from file("organizations-2000000.csv", CSVWithNames) group by Name order by cnt desc, Name asc limit 10000',
37+
"CSV",
38+
)
39+
# calculate md5 of the result
40+
hash_out = hashlib.md5(res.get_memview().tobytes()).hexdigest()
41+
print("output length: ", len(res.get_memview().tobytes()))
42+
if hash_out != "423570bd700ba230ccd2b720b7976626":
43+
print(res.get_memview().tobytes().decode("utf-8"))
44+
raise Exception(f"md5 not match {hash_out}")
45+
used_time = time.time() - now
46+
print("used time: ", used_time)
47+
48+
49+
class TimeoutTestRunner(unittest.TextTestRunner):
50+
def __init__(self, timeout=60, *args, **kwargs):
51+
super().__init__(*args, **kwargs)
52+
self.timeout = timeout
53+
54+
def run(self, test):
55+
class TimeoutException(Exception):
56+
pass
57+
58+
def handler(signum, frame):
59+
print("Timeout after {} seconds".format(self.timeout))
60+
raise TimeoutException("Timeout after {} seconds".format(self.timeout))
61+
62+
old_handler = signal.signal(signal.SIGALRM, handler)
63+
signal.alarm(self.timeout)
64+
65+
result = super().run(test)
66+
67+
signal.alarm(0)
68+
signal.signal(signal.SIGALRM, old_handler)
69+
return result
70+
71+
72+
class TestAggOnCSVSpeed(unittest.TestCase):
73+
def setUp(self):
74+
download_and_extract(csv_url, "organizations-2000000.zip")
75+
76+
def tearDown(self):
77+
os.remove("organizations-2000000.csv")
78+
os.remove("organizations-2000000.zip")
79+
80+
def _test_agg(self, arg=None):
81+
payload()
82+
83+
def test_agg(self):
84+
result = TimeoutTestRunner(timeout=10).run(self._test_agg)
85+
self.assertTrue(result.wasSuccessful(), "Test failed: took too long to execute")
86+
87+
88+
if __name__ == "__main__":
89+
unittest.main()

tests/timeout_decorator.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
"""
2+
Timeout decorator.
3+
4+
:copyright: (c) 2012-2013 by PN.
5+
:license: MIT, see LICENSE for more details.
6+
"""
7+
8+
from __future__ import print_function
9+
from __future__ import unicode_literals
10+
from __future__ import division
11+
12+
import sys
13+
import time
14+
import multiprocessing
15+
import signal
16+
from functools import wraps
17+
18+
############################################################
19+
# Timeout
20+
############################################################
21+
22+
# http://www.saltycrane.com/blog/2010/04/using-python-timeout-decorator-uploading-s3/
23+
# Used work of Stephen "Zero" Chappell <[email protected]>
24+
# in https://code.google.com/p/verse-quiz/source/browse/trunk/timeout.py
25+
26+
27+
class TimeoutError(AssertionError):
28+
29+
"""Thrown when a timeout occurs in the `timeout` context manager."""
30+
31+
def __init__(self, value="Timed Out"):
32+
self.value = value
33+
34+
def __str__(self):
35+
return repr(self.value)
36+
37+
38+
def _raise_exception(exception, exception_message):
39+
""" This function checks if a exception message is given.
40+
41+
If there is no exception message, the default behaviour is maintained.
42+
If there is an exception message, the message is passed to the exception with the 'value' keyword.
43+
"""
44+
if exception_message is None:
45+
raise exception()
46+
else:
47+
raise exception(exception_message)
48+
49+
50+
def timeout(seconds=None, use_signals=True, timeout_exception=TimeoutError, exception_message=None):
51+
"""Add a timeout parameter to a function and return it.
52+
53+
:param seconds: optional time limit in seconds or fractions of a second. If None is passed, no timeout is applied.
54+
This adds some flexibility to the usage: you can disable timing out depending on the settings.
55+
:type seconds: float
56+
:param use_signals: flag indicating whether signals should be used for timing function out or the multiprocessing
57+
When using multiprocessing, timeout granularity is limited to 10ths of a second.
58+
:type use_signals: bool
59+
60+
:raises: TimeoutError if time limit is reached
61+
62+
It is illegal to pass anything other than a function as the first
63+
parameter. The function is wrapped and returned to the caller.
64+
"""
65+
def decorate(function):
66+
67+
if use_signals:
68+
def handler(signum, frame):
69+
_raise_exception(timeout_exception, exception_message)
70+
71+
@wraps(function)
72+
def new_function(*args, **kwargs):
73+
new_seconds = kwargs.pop('timeout', seconds)
74+
if new_seconds:
75+
old = signal.signal(signal.SIGALRM, handler)
76+
signal.setitimer(signal.ITIMER_REAL, new_seconds)
77+
78+
if not seconds:
79+
return function(*args, **kwargs)
80+
81+
try:
82+
return function(*args, **kwargs)
83+
finally:
84+
if new_seconds:
85+
signal.setitimer(signal.ITIMER_REAL, 0)
86+
signal.signal(signal.SIGALRM, old)
87+
return new_function
88+
else:
89+
@wraps(function)
90+
def new_function(*args, **kwargs):
91+
timeout_wrapper = _Timeout(function, timeout_exception, exception_message, seconds)
92+
return timeout_wrapper(*args, **kwargs)
93+
return new_function
94+
95+
return decorate
96+
97+
98+
def _target(queue, function, *args, **kwargs):
99+
"""Run a function with arguments and return output via a queue.
100+
101+
This is a helper function for the Process created in _Timeout. It runs
102+
the function with positional arguments and keyword arguments and then
103+
returns the function's output by way of a queue. If an exception gets
104+
raised, it is returned to _Timeout to be raised by the value property.
105+
"""
106+
try:
107+
queue.put((True, function(*args, **kwargs)))
108+
except:
109+
queue.put((False, sys.exc_info()[1]))
110+
111+
112+
class _Timeout(object):
113+
114+
"""Wrap a function and add a timeout (limit) attribute to it.
115+
116+
Instances of this class are automatically generated by the add_timeout
117+
function defined above. Wrapping a function allows asynchronous calls
118+
to be made and termination of execution after a timeout has passed.
119+
"""
120+
121+
def __init__(self, function, timeout_exception, exception_message, limit):
122+
"""Initialize instance in preparation for being called."""
123+
self.__limit = limit
124+
self.__function = function
125+
self.__timeout_exception = timeout_exception
126+
self.__exception_message = exception_message
127+
self.__name__ = function.__name__
128+
self.__doc__ = function.__doc__
129+
self.__timeout = time.time()
130+
self.__process = multiprocessing.Process()
131+
self.__queue = multiprocessing.Queue()
132+
133+
def __call__(self, *args, **kwargs):
134+
"""Execute the embedded function object asynchronously.
135+
136+
The function given to the constructor is transparently called and
137+
requires that "ready" be intermittently polled. If and when it is
138+
True, the "value" property may then be checked for returned data.
139+
"""
140+
self.__limit = kwargs.pop('timeout', self.__limit)
141+
self.__queue = multiprocessing.Queue(1)
142+
args = (self.__queue, self.__function) + args
143+
self.__process = multiprocessing.Process(target=_target,
144+
args=args,
145+
kwargs=kwargs)
146+
self.__process.daemon = True
147+
self.__process.start()
148+
if self.__limit is not None:
149+
self.__timeout = self.__limit + time.time()
150+
while not self.ready:
151+
time.sleep(0.01)
152+
return self.value
153+
154+
def cancel(self):
155+
"""Terminate any possible execution of the embedded function."""
156+
if self.__process.is_alive():
157+
print("Terminating process: %s" % self.__process, file=sys.stderr)
158+
self.__process.kill()
159+
160+
_raise_exception(self.__timeout_exception, self.__exception_message)
161+
162+
@property
163+
def ready(self):
164+
"""Read-only property indicating status of "value" property."""
165+
if self.__limit and self.__timeout < time.time():
166+
self.cancel()
167+
return self.__queue.full() and not self.__queue.empty()
168+
169+
@property
170+
def value(self):
171+
"""Read-only property containing data returned from function."""
172+
if self.ready is True:
173+
flag, load = self.__queue.get()
174+
if flag:
175+
return load
176+
raise load

0 commit comments

Comments
 (0)