Skip to content

Join Concurrency with object_storage_cluster_join_mode='global' #1591

@alsugiliazova

Description

@alsugiliazova

Issue Summary

swarms/tests/joins.py fails only when scenarios run concurrently (Pool(), pool size > 1).
The same scenarios are stable with serial execution (pool size = 1).

Primary errors:

  • Code: 32. DB::Exception: Attempt to read after eof ... (ATTEMPT_TO_READ_AFTER_EOF)
  • Followed by Code: 210. DB::NetException: Connection refused (localhost:9000) after server instability/crash cascade.

Root Cause

In PR #1527 branch code (src/Storages/IStorageCluster.*), a mutable member field is used to control per-query behavior:

  • IStorageCluster::send_external_tables (class field)
  • Set in updateQueryWithJoinToSendIfNeeded(...) when GLOBAL rewrite path is taken
  • Read later in read(...) to decide whether to forward temporary external tables

This field is query-specific but stored on a shared storage instance, so concurrent reads can race and leak state across unrelated queries.

Risk Pattern

  • Query A enables send_external_tables = true
  • Query B interleaves on same storage object and reads stale/shared flag state
  • External table forwarding becomes nondeterministic per query
  • GLOBAL join execution on shards receives inconsistent temporary table state
  • Results in protocol/stream failures (ATTEMPT_TO_READ_AFTER_EOF) and possible crash/connection refusal cascade

Code-Level Defect

Defect class: cross-query mutable state + data race in a concurrent read path.

Affected logic:

  • src/Storages/IStorageCluster.h: bool send_external_tables = false;
  • src/Storages/IStorageCluster.cpp:
    • GLOBAL branch sets send_external_tables = true
    • read(...) checks if (send_external_tables && planner_context ...)

Proposed Fix

Make external-table forwarding state local to each read(...) invocation:

  1. Change updateQueryWithJoinToSendIfNeeded(...) to return bool should_send_external_tables.
  2. Remove class member send_external_tables.
  3. In read(...), capture:
    • const bool send_external_tables = updateQueryWithJoinToSendIfNeeded(...);
  4. Use this local bool when extracting planner external tables.

This scopes the decision to the current query and eliminates cross-query races.

Validation Plan

  1. Re-run swarms/tests/joins.py with pool > 1 (same workload that currently fails).
  2. Confirm absence of:
    • ATTEMPT_TO_READ_AFTER_EOF
    • Connection refused (localhost:9000)
  3. Compare with pool=1 baseline behavior.
  4. Run targeted GLOBAL-mode join combinations (s3, s3Cluster, iceberg, icebergS3Cluster) under concurrency.

Severity

High (P1) for GLOBAL join mode in concurrent workloads:

  • Can cause query failures
  • Can destabilize server under stress
  • Produces nondeterministic behavior depending on timing/interleaving

Recommendation

Fix should be merged before relying on GLOBAL join mode in parallel regression runs.
This is a server-side correctness/concurrency issue, not a test harness issue.

Trace:

2026.03.27 15:42:15.473719 [ 74 ] {e88529f8-17de-4863-8cca-8b617c66605b} <Error> executeQuery: Code: 32. DB::Exception: Attempt to read after eof. (ATTEMPT_TO_READ_AFTER_EOF) (version 26.1.4.20001.altinityantalya) (from 172.18.0.12:44308) (query 1, line 2) (in query: SELECT __table1.boolean_col AS boolean_col, __table1.long_col AS long_col, __table1.double_col AS double_col, __table1.string_col AS string_col, __table1.timestamp_col AS timestamp_col, __table1.date_col AS date_col, __table1.time_col AS time_col, __table1.timestamptz_col AS timestamptz_col, __table1.integer_col AS integer_col, __table1.float_col AS float_col, __table1.decimal_col AS decimal_col, __table2.boolean_col AS `t2.boolean_col`, __table2.long_col AS `t2.long_col`, __table2.double_col AS `t2.double_col`, __table2.string_col AS `t2.string_col`, __table2.timestamp_col AS `t2.timestamp_col`, __table2.date_col AS `t2.date_col`, __table2.time_col AS `t2.time_col`, __table2.timestamptz_col AS `t2.timestamptz_col`, __table2.integer_col AS `t2.integer_col`, __table2.float_col AS `t2.float_col`, __table2.decimal_col AS `t2.decimal_col` FROM s3Cluster('replicated_cluster', 'http://minio:9000/warehouse/data1/data/**.parquet', 'admin', '[HIDDEN]', 'Parquet', '`boolean_col` Nullable(Bool), `long_col` Nullable(Int64), `double_col` Nullable(Float64), `string_col` Nullable(String), `timestamp_col` Nullable(DateTime64(6, \'UTC\')), `date_col` Nullable(Date32), `time_col` Nullable(DateTime64(6, \'UTC\')), `timestamptz_col` Nullable(DateTime64(6, \'UTC\')), `integer_col` Nullable(Int32), `float_col` Nullable(Float32), `decimal_col` Nullable(Decimal(10, 2))') AS __table1 GLOBAL ALL INNER JOIN _data_11397359597041654076_4211412118624154458 AS __table2 ON __table1.integer_col = __table2.integer_col ORDER BY __table1.boolean_col ASC, __table1.long_col ASC, __table1.double_col ASC, __table1.string_col ASC, __table1.timestamp_col ASC, __table1.date_col ASC, __table1.time_col ASC, __table1.timestamptz_col ASC, __table1.integer_col ASC, __table1.float_col ASC, __table1.decimal_col ASC, __table2.boolean_col ASC, __table2.long_col ASC, __table2.double_col ASC, __table2.string_col ASC, __table2.timestamp_col ASC, __table2.date_col ASC, __table2.time_col ASC, __table2.timestamptz_col ASC, __table2.integer_col ASC, __table2.float_col ASC, __table2.decimal_col ASC), Stack trace (when copying this message, always include the lines below):

0. ./ci/tmp/build/./base/poco/Foundation/src/Exception.cpp:28: Poco::Exception::Exception(String const&, int) @ 0x000000001e6253f8
1. ./ci/tmp/build/./src/Common/Exception.cpp:136: DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x0000000010f20c4c
2. ./src/Common/Exception.h:172: DB::Exception::Exception(String&&, int, String, bool) @ 0x000000000ab91b18
3. ./src/Common/Exception.h:58: DB::Exception::Exception(PreformattedMessage&&, int) @ 0x000000000ab915cc
4. ./src/Common/Exception.h:190: DB::Exception::Exception<>(int, FormatStringHelperImpl<>) @ 0x0000000010f28a14
5. ./ci/tmp/build/./src/IO/VarInt.cpp:13: DB::throwReadAfterEOF() @ 0x000000001101be44
6. ./src/IO/VarInt.h:83: DB::TCPHandler::receiveClusterFunctionReadTaskResponse(DB::QueryState&) @ 0x000000001997f598
7. ./ci/tmp/build/./src/Server/TCPHandler.cpp:714: std::shared_ptr<DB::ClusterFunctionReadTaskResponse> std::__function::__policy_func<std::shared_ptr<DB::ClusterFunctionReadTaskResponse> ()>::__call_func[abi:ne210105]<DB::TCPHandler::runImpl()::$_5>(std::__function::__policy_storage const*) @ 0x0000000019982700
8. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x00000000150ff764
9. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x000000001510ca4c
10. ./contrib/llvm-project/libcxx/include/future:1601: std::packaged_task<std::shared_ptr<DB::ObjectInfo> ()>::operator()() @ 0x000000001510ce50
11. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x000000001105b728
12. ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:0: ThreadFromGlobalPoolImpl<false, true>::ThreadFromGlobalPoolImpl<void (ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool::*)(), ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool*>(void (ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool::*&&)(), ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool*&&)::'lambda'()::operator()() @ 0x000000001105ffa0
13. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x000000001105966c
14. ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:0: void* std::__thread_proxy[abi:ne210105]<std::tuple<std::unique_ptr<std::__thread_struct, std::default_delete<std::__thread_struct>>, void (ThreadPoolImpl<std::thread>::ThreadFromThreadPool::*)(), ThreadPoolImpl<std::thread>::ThreadFromThreadPool*>>(void*) @ 0x000000001105ea08
15. ? @ 0x000000000007d5b8
16. ? @ 0x00000000000e5edc

2026.03.27 15:42:15.473761 [ 837 ] {f0793fe7-1cb5-49e6-a47a-2a88e3526cf6} <Error> executeQuery: Code: 32. DB::Exception: Attempt to read after eof. (ATTEMPT_TO_READ_AFTER_EOF) (version 26.1.4.20001.altinityantalya) (from 172.18.0.12:44324) (query 1, line 2) (in query: SELECT __table1.boolean_col AS boolean_col, __table1.long_col AS long_col, __table1.double_col AS double_col, __table1.string_col AS string_col, __table1.timestamp_col AS timestamp_col, __table1.date_col AS date_col, __table1.time_col AS time_col, __table1.timestamptz_col AS timestamptz_col, __table1.integer_col AS integer_col, __table1.float_col AS float_col, __table1.decimal_col AS decimal_col, __table2.boolean_col AS `t2.boolean_col`, __table2.long_col AS `t2.long_col`, __table2.double_col AS `t2.double_col`, __table2.string_col AS `t2.string_col`, __table2.timestamp_col AS `t2.timestamp_col`, __table2.date_col AS `t2.date_col`, __table2.time_col AS `t2.time_col`, __table2.timestamptz_col AS `t2.timestamptz_col`, __table2.integer_col AS `t2.integer_col`, __table2.float_col AS `t2.float_col`, __table2.decimal_col AS `t2.decimal_col` FROM icebergS3Cluster('replicated_cluster', 'http://minio:9000/warehouse/data1/', 'admin', '[HIDDEN]', 'Parquet', '`boolean_col` Nullable(Bool), `long_col` Nullable(Int64), `double_col` Nullable(Float64), `string_col` Nullable(String), `timestamp_col` Nullable(DateTime64(6)), `date_col` Nullable(Date32), `time_col` Nullable(Int64), `timestamptz_col` Nullable(DateTime64(6, \'UTC\')), `integer_col` Nullable(Int32), `float_col` Nullable(Float32), `decimal_col` Nullable(Decimal(10, 2))', SETTINGS iceberg_metadata_file_path = 'metadata/00004-f57d3eb2-ec84-4f0d-bd91-4bbb443cf557.metadata.json') AS __table1 GLOBAL ALL INNER JOIN _data_13883430509704433540_14308560915511735178 AS __table2 ON __table1.decimal_col = __table2.decimal_col ORDER BY __table1.boolean_col ASC, __table1.long_col ASC, __table1.double_col ASC, __table1.string_col ASC, __table1.timestamp_col ASC, __table1.date_col ASC, __table1.time_col ASC, __table1.timestamptz_col ASC, __table1.integer_col ASC, __table1.float_col ASC, __table1.decimal_col ASC, __table2.boolean_col ASC, __table2.long_col ASC, __table2.double_col ASC, __table2.string_col ASC, __table2.timestamp_col ASC, __table2.date_col ASC, __table2.time_col ASC, __table2.timestamptz_col ASC, __table2.integer_col ASC, __table2.float_col ASC, __table2.decimal_col ASC), Stack trace (when copying this message, always include the lines below):

0. ./ci/tmp/build/./base/poco/Foundation/src/Exception.cpp:28: Poco::Exception::Exception(String const&, int) @ 0x000000001e6253f8
1. ./ci/tmp/build/./src/Common/Exception.cpp:136: DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x0000000010f20c4c
2. ./src/Common/Exception.h:172: DB::Exception::Exception(String&&, int, String, bool) @ 0x000000000ab91b18
3. ./src/Common/Exception.h:58: DB::Exception::Exception(PreformattedMessage&&, int) @ 0x000000000ab915cc
4. ./src/Common/Exception.h:190: DB::Exception::Exception<>(int, FormatStringHelperImpl<>) @ 0x0000000010f28a14
5. ./ci/tmp/build/./src/IO/VarInt.cpp:13: DB::throwReadAfterEOF() @ 0x000000001101be44
6. ./src/IO/VarInt.h:83: DB::TCPHandler::receiveClusterFunctionReadTaskResponse(DB::QueryState&) @ 0x000000001997f598
7. ./ci/tmp/build/./src/Server/TCPHandler.cpp:714: std::shared_ptr<DB::ClusterFunctionReadTaskResponse> std::__function::__policy_func<std::shared_ptr<DB::ClusterFunctionReadTaskResponse> ()>::__call_func[abi:ne210105]<DB::TCPHandler::runImpl()::$_5>(std::__function::__policy_storage const*) @ 0x0000000019982700
8. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x00000000150ff764
9. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x000000001510ca4c
10. ./contrib/llvm-project/libcxx/include/future:1601: std::packaged_task<std::shared_ptr<DB::ObjectInfo> ()>::operator()() @ 0x000000001510ce50
11. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x000000001105b728
12. ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:0: ThreadFromGlobalPoolImpl<false, true>::ThreadFromGlobalPoolImpl<void (ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool::*)(), ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool*>(void (ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool::*&&)(), ThreadPoolImpl<ThreadFromGlobalPoolImpl<false, true>>::ThreadFromThreadPool*&&)::'lambda'()::operator()() @ 0x000000001105ffa0
13. ./contrib/llvm-project/libcxx/include/__functional/function.h:508: ? @ 0x000000001105966c
14. ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:0: void* std::__thread_proxy[abi:ne210105]<std::tuple<std::unique_ptr<std::__thread_struct, std::default_delete<std::__thread_struct>>, void (ThreadPoolImpl<std::thread>::ThreadFromThreadPool::*)(), ThreadPoolImpl<std::thread>::ThreadFromThreadPool*>>(void*) @ 0x000000001105ea08
15. ? @ 0x000000000007d5b8
16. ? @ 0x00000000000e5edc

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions