-
Notifications
You must be signed in to change notification settings - Fork 17
Description
Issue Summary
swarms/tests/joins.py fails only when scenarios run concurrently (Pool(), pool size > 1).
The same scenarios are stable with serial execution (pool size = 1).
Primary errors:
Code: 32. DB::Exception: Attempt to read after eof ... (ATTEMPT_TO_READ_AFTER_EOF)- Followed by
Code: 210. DB::NetException: Connection refused (localhost:9000)after server instability/crash cascade.
Root Cause
In PR #1527 branch code (src/Storages/IStorageCluster.*), a mutable member field is used to control per-query behavior:
IStorageCluster::send_external_tables(class field)- Set in
updateQueryWithJoinToSendIfNeeded(...)when GLOBAL rewrite path is taken - Read later in
read(...)to decide whether to forward temporary external tables
This field is query-specific but stored on a shared storage instance, so concurrent reads can race and leak state across unrelated queries.
Risk Pattern
- Query A enables
send_external_tables = true - Query B interleaves on same storage object and reads stale/shared flag state
- External table forwarding becomes nondeterministic per query
- GLOBAL join execution on shards receives inconsistent temporary table state
- Results in protocol/stream failures (
ATTEMPT_TO_READ_AFTER_EOF) and possible crash/connection refusal cascade
Code-Level Defect
Defect class: cross-query mutable state + data race in a concurrent read path.
Affected logic:
src/Storages/IStorageCluster.h:bool send_external_tables = false;src/Storages/IStorageCluster.cpp:- GLOBAL branch sets
send_external_tables = true read(...)checksif (send_external_tables && planner_context ...)
- GLOBAL branch sets
Proposed Fix
Make external-table forwarding state local to each read(...) invocation:
- Change
updateQueryWithJoinToSendIfNeeded(...)to returnbool should_send_external_tables. - Remove class member
send_external_tables. - In
read(...), capture:const bool send_external_tables = updateQueryWithJoinToSendIfNeeded(...);
- Use this local bool when extracting planner external tables.
This scopes the decision to the current query and eliminates cross-query races.
Validation Plan
- Re-run
swarms/tests/joins.pywith pool > 1 (same workload that currently fails). - Confirm absence of:
ATTEMPT_TO_READ_AFTER_EOFConnection refused (localhost:9000)
- Compare with pool=1 baseline behavior.
- Run targeted GLOBAL-mode join combinations (
s3,s3Cluster,iceberg,icebergS3Cluster) under concurrency.
Severity
High (P1) for GLOBAL join mode in concurrent workloads:
- Can cause query failures
- Can destabilize server under stress
- Produces nondeterministic behavior depending on timing/interleaving
Recommendation
Fix should be merged before relying on GLOBAL join mode in parallel regression runs.
This is a server-side correctness/concurrency issue, not a test harness issue.
Trace:
2026.03.27 15:42:15.473719 [ 74 ] {e88529f8-17de-4863-8cca-8b617c66605b} <Error> executeQuery: Code: 32. DB::Exception: Attempt to read after eof. (ATTEMPT_TO_READ_AFTER_EOF) (version 26.1.4.20001.altinityantalya) (from 172.18.0.12:44308) (query 1, line 2) (in query: SELECT __table1.boolean_col AS boolean_col, __table1.long_col AS long_col, __table1.double_col AS double_col, __table1.string_col AS string_col, __table1.timestamp_col AS timestamp_col, __table1.date_col AS date_col, __table1.time_col AS time_col, __table1.timestamptz_col AS timestamptz_col, __table1.integer_col AS integer_col, __table1.float_col AS float_col, __table1.decimal_col AS decimal_col, __table2.boolean_col AS `t2.boolean_col`, __table2.long_col AS `t2.long_col`, __table2.double_col AS `t2.double_col`, __table2.string_col AS `t2.string_col`, __table2.timestamp_col AS `t2.timestamp_col`, __table2.date_col AS `t2.date_col`, __table2.time_col AS `t2.time_col`, __table2.timestamptz_col AS `t2.timestamptz_col`, __table2.integer_col AS `t2.integer_col`, __table2.float_col AS `t2.float_col`, __table2.decimal_col AS `t2.decimal_col` FROM s3Cluster('replicated_cluster', 'http://minio:9000/warehouse/data1/data/**.parquet', 'admin', '[HIDDEN]', 'Parquet', '`boolean_col` Nullable(Bool), `long_col` Nullable(Int64), `double_col` Nullable(Float64), `string_col` Nullable(String), `timestamp_col` Nullable(DateTime64(6, \'UTC\')), `date_col` Nullable(Date32), `time_col` Nullable(DateTime64(6, \'UTC\')), `timestamptz_col` Nullable(DateTime64(6, \'UTC\')), `integer_col` Nullable(Int32), `float_col` Nullable(Float32), `decimal_col` Nullable(Decimal(10, 2))') AS __table1 GLOBAL ALL INNER JOIN _data_11397359597041654076_4211412118624154458 AS __table2 ON __table1.integer_col = __table2.integer_col ORDER BY __table1.boolean_col ASC, __table1.long_col ASC, __table1.double_col ASC, __table1.string_col ASC, __table1.timestamp_col ASC, __table1.date_col ASC, __table1.time_col ASC, __table1.timestamptz_col ASC, __table1.integer_col ASC, __table1.float_col ASC, __table1.decimal_col ASC, __table2.boolean_col ASC, __table2.long_col ASC, __table2.double_col ASC, __table2.string_col ASC, __table2.timestamp_col ASC, __table2.date_col ASC, __table2.time_col ASC, __table2.timestamptz_col ASC, __table2.integer_col ASC, __table2.float_col ASC, __table2.decimal_col ASC), Stack trace (when copying this message, always include the lines below):
0. ./ci/tmp/build/./base/poco/Foundation/src/Exception.cpp:28: Poco::Exception::Exception(String const&, int) @ 0x000000001e6253f8
1. ./ci/tmp/build/./src/Common/Exception.cpp:136: DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x0000000010f20c4c
2. ./src/Common/Exception.h:172: DB::Exception::Exception(String&&, int, String, bool) @ 0x000000000ab91b18
3. ./src/Common/Exception.h:58: DB::Exception::Exception(PreformattedMessage&&, int) @ 0x000000000ab915cc
4. ./src/Common/Exception.h:190: DB::Exception::Exception<>(int, FormatStringHelperImpl<>) @ 0x0000000010f28a14
5. ./ci/tmp/build/./src/IO/VarInt.cpp:13: DB::throwReadAfterEOF() @ 0x000000001101be44
6. ./src/IO/VarInt.h:83: DB::TCPHandler::receiveClusterFunctionReadTaskResponse(DB::QueryState&) @ 0x000000001997f598
7. ./ci/tmp/build/./src/Server/TCPHandler.cpp:714: std::shared_ptr<DB::ClusterFunctionReadTaskResponse> std::__function::__policy_func<std::shared_ptr<DB::ClusterFunctionReadTaskResponse> ()>::__call_func[abi:ne210105]<DB::TCPHandler::runImpl()::$_5>(std::__function::__policy_storage const*) @ 0x0000000019982700
8. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x00000000150ff764
9. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x000000001510ca4c
10. ./contrib/llvm-project/libcxx/include/future:1601: std::packaged_task<std::shared_ptr<DB::ObjectInfo> ()>::operator()() @ 0x000000001510ce50
11. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x000000001105b728
12. ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:0: ThreadFromGlobalPoolImpl<false, true>::ThreadFromGlobalPoolImpl<void (ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool::*)(), ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool*>(void (ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool::*&&)(), ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool*&&)::'lambda'()::operator()() @ 0x000000001105ffa0
13. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x000000001105966c
14. ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:0: void* std::__thread_proxy[abi:ne210105]<std::tuple<std::unique_ptr<std::__thread_struct, std::default_delete<std::__thread_struct>>, void (ThreadPoolImpl<std::thread>::ThreadFromThreadPool::*)(), ThreadPoolImpl<std::thread>::ThreadFromThreadPool*>>(void*) @ 0x000000001105ea08
15. ? @ 0x000000000007d5b8
16. ? @ 0x00000000000e5edc
2026.03.27 15:42:15.473761 [ 837 ] {f0793fe7-1cb5-49e6-a47a-2a88e3526cf6} <Error> executeQuery: Code: 32. DB::Exception: Attempt to read after eof. (ATTEMPT_TO_READ_AFTER_EOF) (version 26.1.4.20001.altinityantalya) (from 172.18.0.12:44324) (query 1, line 2) (in query: SELECT __table1.boolean_col AS boolean_col, __table1.long_col AS long_col, __table1.double_col AS double_col, __table1.string_col AS string_col, __table1.timestamp_col AS timestamp_col, __table1.date_col AS date_col, __table1.time_col AS time_col, __table1.timestamptz_col AS timestamptz_col, __table1.integer_col AS integer_col, __table1.float_col AS float_col, __table1.decimal_col AS decimal_col, __table2.boolean_col AS `t2.boolean_col`, __table2.long_col AS `t2.long_col`, __table2.double_col AS `t2.double_col`, __table2.string_col AS `t2.string_col`, __table2.timestamp_col AS `t2.timestamp_col`, __table2.date_col AS `t2.date_col`, __table2.time_col AS `t2.time_col`, __table2.timestamptz_col AS `t2.timestamptz_col`, __table2.integer_col AS `t2.integer_col`, __table2.float_col AS `t2.float_col`, __table2.decimal_col AS `t2.decimal_col` FROM icebergS3Cluster('replicated_cluster', 'http://minio:9000/warehouse/data1/', 'admin', '[HIDDEN]', 'Parquet', '`boolean_col` Nullable(Bool), `long_col` Nullable(Int64), `double_col` Nullable(Float64), `string_col` Nullable(String), `timestamp_col` Nullable(DateTime64(6)), `date_col` Nullable(Date32), `time_col` Nullable(Int64), `timestamptz_col` Nullable(DateTime64(6, \'UTC\')), `integer_col` Nullable(Int32), `float_col` Nullable(Float32), `decimal_col` Nullable(Decimal(10, 2))', SETTINGS iceberg_metadata_file_path = 'metadata/00004-f57d3eb2-ec84-4f0d-bd91-4bbb443cf557.metadata.json') AS __table1 GLOBAL ALL INNER JOIN _data_13883430509704433540_14308560915511735178 AS __table2 ON __table1.decimal_col = __table2.decimal_col ORDER BY __table1.boolean_col ASC, __table1.long_col ASC, __table1.double_col ASC, __table1.string_col ASC, __table1.timestamp_col ASC, __table1.date_col ASC, __table1.time_col ASC, __table1.timestamptz_col ASC, __table1.integer_col ASC, __table1.float_col ASC, __table1.decimal_col ASC, __table2.boolean_col ASC, __table2.long_col ASC, __table2.double_col ASC, __table2.string_col ASC, __table2.timestamp_col ASC, __table2.date_col ASC, __table2.time_col ASC, __table2.timestamptz_col ASC, __table2.integer_col ASC, __table2.float_col ASC, __table2.decimal_col ASC), Stack trace (when copying this message, always include the lines below):
0. ./ci/tmp/build/./base/poco/Foundation/src/Exception.cpp:28: Poco::Exception::Exception(String const&, int) @ 0x000000001e6253f8
1. ./ci/tmp/build/./src/Common/Exception.cpp:136: DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x0000000010f20c4c
2. ./src/Common/Exception.h:172: DB::Exception::Exception(String&&, int, String, bool) @ 0x000000000ab91b18
3. ./src/Common/Exception.h:58: DB::Exception::Exception(PreformattedMessage&&, int) @ 0x000000000ab915cc
4. ./src/Common/Exception.h:190: DB::Exception::Exception<>(int, FormatStringHelperImpl<>) @ 0x0000000010f28a14
5. ./ci/tmp/build/./src/IO/VarInt.cpp:13: DB::throwReadAfterEOF() @ 0x000000001101be44
6. ./src/IO/VarInt.h:83: DB::TCPHandler::receiveClusterFunctionReadTaskResponse(DB::QueryState&) @ 0x000000001997f598
7. ./ci/tmp/build/./src/Server/TCPHandler.cpp:714: std::shared_ptr<DB::ClusterFunctionReadTaskResponse> std::__function::__policy_func<std::shared_ptr<DB::ClusterFunctionReadTaskResponse> ()>::__call_func[abi:ne210105]<DB::TCPHandler::runImpl()::$_5>(std::__function::__policy_storage const*) @ 0x0000000019982700
8. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x00000000150ff764
9. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x000000001510ca4c
10. ./contrib/llvm-project/libcxx/include/future:1601: std::packaged_task<std::shared_ptr<DB::ObjectInfo> ()>::operator()() @ 0x000000001510ce50
11. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x000000001105b728
12. ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:0: ThreadFromGlobalPoolImpl<false, true>::ThreadFromGlobalPoolImpl<void (ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool::*)(), ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool*>(void (ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool::*&&)(), ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool*&&)::'lambda'()::operator()() @ 0x000000001105ffa0
13. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x000000001105966c
14. ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:0: void* std::__thread_proxy[abi:ne210105]<std::tuple<std::unique_ptr<std::__thread_struct, std::default_delete<std::__thread_struct>>, void (ThreadPoolImpl<std::thread>::ThreadFromThreadPool::*)(), ThreadPoolImpl<std::thread>::ThreadFromThreadPool*>>(void*) @ 0x000000001105ea08
15. ? @ 0x000000000007d5b8
16. ? @ 0x00000000000e5edc