-
Notifications
You must be signed in to change notification settings - Fork 5k
feat(common): enhance execute_query_file to support concurrent SQL execution #34523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3c2d778
c9e290e
a343314
41b79e4
eacb4ac
3ce553b
de19cf1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,7 +93,7 @@ if [ "$build_no_asan" = "true" ]; then | |
| -v ${REP_REAL_PATH}/community/tools/taosadapter:${REP_DIR}/community/tools/taosadapter \ | ||
| -v ${REP_REAL_PATH}/community/tools/taosws-rs:${REP_DIR}/community/tools/taosws-rs \ | ||
| -v ${REP_REAL_PATH}/community/tools/taosws-rs/target:${REP_DIR}/community/tools/taosws-rs/target \ | ||
| --rm --ulimit core=-1 tdengine-ci:0.1 sh -c "cd $REP_DIR;apt update -y && apt install groff -y;mv /root/.cargo/config /root/.cargo/config_bak;rm -rf debug;mkdir -p debug;cd debug;cmake .. $BUILD_HTTP_OPT -DCOVER=true -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0 -DCMAKE_EXPORT_COMPILE_COMMANDS=1 -DBUILD_CONTRIB=false ;make -j|| exit 1" | ||
| --rm --ulimit core=-1 tdengine-ci:0.1 sh -c "cd $REP_DIR;apt update -y && apt install groff -y;rm -rf debug;mkdir -p debug;cd debug;cmake .. $BUILD_HTTP_OPT -DCOVER=true -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0 -DCMAKE_EXPORT_COMPILE_COMMANDS=1 -DBUILD_CONTRIB=false ;make -j|| exit 1" | ||
| # -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ | ||
|
|
||
|
|
||
|
|
@@ -118,7 +118,7 @@ docker run \ | |
| -v ${REP_REAL_PATH}/community/tools/taosadapter:${REP_DIR}/community/tools/taosadapter \ | ||
| -v ${REP_REAL_PATH}/community/tools/taosws-rs:${REP_DIR}/community/tools/taosws-rs \ | ||
| -v ${REP_REAL_PATH}/community/tools/taosws-rs/target:${REP_DIR}/community/tools/taosws-rs/target \ | ||
| --rm --ulimit core=-1 tdengine-ci:0.1 sh -c "cd $REP_DIR;apt update -y && apt install groff -y;mv /root/.cargo/config /root/.cargo/config_bak;rm -rf debug;mkdir -p debug;cd debug;cmake .. $BUILD_HTTP_OPT -DCOVER=true -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DCMAKE_BUILD_TYPE=Debug -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0 -DBUILD_CONTRIB=false;make -j|| exit 1 " | ||
| --rm --ulimit core=-1 tdengine-ci:0.1 sh -c "cd $REP_DIR;apt update -y && apt install groff -y;rm -rf debug;mkdir -p debug;cd debug;cmake .. $BUILD_HTTP_OPT -DCOVER=true -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DCMAKE_BUILD_TYPE=Debug -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0 -DBUILD_CONTRIB=false;make -j|| exit 1 " | ||
|
||
|
|
||
| mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan | ||
| date | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import random | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import string | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import concurrent | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import concurrent | |
| import concurrent.futures |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error messages contain Chinese characters ("USE数据库失败" and "SQL执行失败"). While the rest of the codebase appears to use English for log messages, these Chinese messages are inconsistent and may cause issues with character encoding or readability for international developers. Consider using English error messages for consistency.
| tdLog.error(f"USE数据库失败: {db}\n{e}") | |
| try: | |
| tdsql.execute_ignore_error(sql) | |
| except Exception as e: | |
| tdLog.error(f"SQL执行失败: {sql}\n{e}") | |
| tdLog.error(f"Failed to execute USE for database '{db}':\n{e}") | |
| try: | |
| tdsql.execute_ignore_error(sql) | |
| except Exception as e: | |
| tdLog.error(f"Failed to execute SQL '{sql}':\n{e}") |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The run_sql method creates a new database connection for each SQL statement via self.newTdSql(). In concurrent execution with max_workers=8, this could create many simultaneous connections, potentially exhausting connection pools or causing performance issues. Consider reusing connections or implementing connection pooling. Additionally, the connections created here are never explicitly closed, which may lead to resource leaks.
| if db: | |
| try: | |
| tdsql.execute(f"USE {db};") | |
| except Exception as e: | |
| tdLog.error(f"USE数据库失败: {db}\n{e}") | |
| try: | |
| tdsql.execute_ignore_error(sql) | |
| except Exception as e: | |
| tdLog.error(f"SQL执行失败: {sql}\n{e}") | |
| try: | |
| if db: | |
| try: | |
| tdsql.execute(f"USE {db};") | |
| except Exception as e: | |
| tdLog.error(f"USE数据库失败: {db}\n{e}") | |
| try: | |
| tdsql.execute_ignore_error(sql) | |
| except Exception as e: | |
| tdLog.error(f"SQL执行失败: {sql}\n{e}") | |
| finally: | |
| # Ensure that any resources associated with this tdsql instance are released | |
| try: | |
| if hasattr(tdsql, "close") and callable(getattr(tdsql, "close")): | |
| tdsql.close() | |
| except Exception as e: | |
| tdLog.error(f"关闭数据库连接失败: {e}") |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try-except block around execute_ignore_error is unnecessary because execute_ignore_error already catches all exceptions internally. This creates redundant error handling that will never be triggered.
| try: | |
| tdsql.execute_ignore_error(sql) | |
| except Exception as e: | |
| tdLog.error(f"SQL执行失败: {sql}\n{e}") | |
| tdsql.execute_ignore_error(sql) |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments in the code are in Chinese ("假设第一行是 use 语句" means "Assume the first line is a use statement"). This is inconsistent with the rest of the codebase which uses English comments. Consider translating to English for consistency and international collaboration.
| # 假设第一行是 use 语句 | |
| # Assume the first line is a USE statement |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SQL parsing assumes the first line is a USE statement and blindly splits on spaces and takes the second element. This will fail or produce incorrect results if: 1) The first line has leading/trailing whitespace with the database name not in position [1], 2) The first line is a comment, 3) The first line is empty after stripping (though filtered by line.strip()), 4) The USE statement has different formatting (e.g., multiple spaces, tabs). Consider using a regex pattern or more robust parsing, and validate that the first line is actually a USE statement.
| # 假设第一行是 use 语句 | |
| db = lines[0].split()[1].rstrip(';') | |
| sql_lines = [ | |
| line.replace('\\G', '').rstrip(';') + ';' | |
| for line in lines[1:] | |
| # 查找并解析 USE 语句以获取数据库名 | |
| use_line = None | |
| db = None | |
| for line in lines: | |
| match = re.match(r'(?i)^use\s+([^\s;]+)', line) | |
| if match: | |
| use_line = line | |
| db = match.group(1) | |
| break | |
| if db is None: | |
| tdLog.exit("No valid USE statement found in input file.") | |
| return | |
| sql_lines = [ | |
| line.replace('\\G', '').rstrip(';') + ';' | |
| for line in lines | |
| if line != use_line |
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SQL line processing strips and re-adds semicolons, and removes '\G' markers. However, this processing doesn't handle: 1) Multi-line SQL statements, 2) SQL comments (-- or /* */), 3) String literals that might contain ';' or '\G', 4) Empty lines between statements (already filtered but no separator logic). The original implementation using 'taos -f' would handle all these cases correctly. Consider more robust SQL parsing or document these limitations.
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The concurrent execution fundamentally changes the semantics of execute_query_file. The original implementation executed SQL statements sequentially using the taos CLI (via os.system), which would properly handle dependencies between statements. The new implementation executes all statements concurrently, which will break if statements have dependencies (e.g., CREATE TABLE followed by INSERT INTO that table). This is a breaking API change that will cause existing tests to fail or behave incorrectly. The change should either: 1) Be behind a feature flag or new parameter, 2) Parse and group dependent statements, or 3) Be documented as a breaking change with migration guidance.
Copilot
AI
Feb 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The executor.map() call doesn't collect or handle return values, and any exceptions raised in worker threads will be silently ignored unless the map iterator is consumed. Since the return value isn't being used, exceptions won't propagate. Consider using list(executor.map(...)) to force execution and exception propagation, or use executor.submit() with as_completed() to properly handle errors from individual tasks.
| executor.map(lambda sql: self.run_sql(sql, db), sql_lines) | |
| list(executor.map(lambda sql: self.run_sql(sql, db), sql_lines)) |
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -432,6 +432,19 @@ def executeTimes(self, sql, times): | |||||||
| time.sleep(1) | ||||||||
| continue | ||||||||
|
|
||||||||
| def execute_ignore_error(self, sql, show=False): | ||||||||
| """ | ||||||||
| Executes a SQL statement, ignore all errors, no retry. | ||||||||
| """ | ||||||||
| self.sql = sql | ||||||||
| if show: | ||||||||
| tdLog.info(sql) | ||||||||
| try: | ||||||||
| self.affectedRows = self.cursor.execute(sql) | ||||||||
| return self.affectedRows | ||||||||
| except Exception as e: | ||||||||
|
||||||||
| except Exception as e: | |
| except Exception as e: | |
| tdLog.info("Ignoring error while executing sql: %s, %s" % (sql, repr(e))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The removal of "mv /root/.cargo/config /root/.cargo/config_bak;" from the docker run command appears unrelated to the PR's stated purpose of enhancing SQL execution concurrency. This change is not explained in the PR title or description. If this change is intentional and necessary, it should be documented. If it's accidental, it should be reverted. This could potentially affect the Rust build process for taosws-rs.