Skip to content

Commit 4fafb07

Browse files
committed
fix race condition in spatial join with CTE
1 parent 4931644 commit 4fafb07

File tree

2 files changed

+34
-3
lines changed

2 files changed

+34
-3
lines changed

src/spatial/operators/spatial_join_physical.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,8 @@ class SpatialJoinGlobalState final : public GlobalSinkState {
452452

453453
// This is initialized in the finalize state
454454
unique_ptr<FlatRTree> rtree = nullptr;
455+
456+
mutex combine_lock;
455457
};
456458

457459
unique_ptr<GlobalSinkState> PhysicalSpatialJoin::GetGlobalSinkState(ClientContext &context) const {
@@ -580,6 +582,7 @@ SinkCombineResultType PhysicalSpatialJoin::Combine(ExecutionContext &context, Op
580582
lstate.collection->FinalizePinState(lstate.append_state.pin_state);
581583

582584
// Append the local collection to the global collection
585+
lock_guard<mutex> lock(gstate.combine_lock);
583586
gstate.collection->Combine(*lstate.collection);
584587

585588
// Merge the non-null and non-empty count
@@ -1019,14 +1022,13 @@ class SpatialJoinGlobalSourceState final : public GlobalSourceState {
10191022
column_ids.push_back(op.build_side_key_types.size() + op.build_side_payload_types.size());
10201023

10211024
// We dont need to keep the tuples aroun after scanning
1022-
state.collection->InitializeScan(scan_state, std::move(column_ids), TupleDataPinProperties::DESTROY_AFTER_DONE);
1025+
state.collection->InitializeScan(scan_state, std::move(column_ids), TupleDataPinProperties::KEEP_EVERYTHING_PINNED);
10231026

10241027
tuples_maximum = state.collection->Count();
10251028
}
10261029

10271030
const PhysicalSpatialJoin &op;
10281031
TupleDataParallelScanState scan_state;
1029-
10301032
// How many tuples we have scanned so far
10311033
idx_t tuples_maximum = 0;
10321034
atomic<idx_t> tuples_scanned = {0};
@@ -1056,7 +1058,7 @@ class SpatialJoinLocalSourceState final : public LocalSourceState {
10561058
column_ids.push_back(op.build_side_key_types.size() + op.build_side_payload_types.size());
10571059

10581060
// We dont need to keep the tuples aroun after scanning
1059-
state.collection->InitializeScan(scan_state, std::move(column_ids), TupleDataPinProperties::DESTROY_AFTER_DONE);
1061+
state.collection->InitializeScan(scan_state, std::move(column_ids));
10601062
state.collection->InitializeScanChunk(scan_state, scan_chunk);
10611063
}
10621064

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# name: test/sql/join/spatial_join_cte.test
2+
# group: [join]
3+
4+
require spatial
5+
6+
query IIII
7+
with bigger as (
8+
select 'a' as region, ST_GeomFromText('POLYGON ((0 0, 0 10, 10 10, 10 0 , 0 0 ))') as geom
9+
union all
10+
select 'b' as region, ST_GeomFromText('POLYGON ((50 50, 50 60, 60 60, 60 50 , 50 50 ))') as geom
11+
union all
12+
select 'c' as region, ST_GeomFromText('POLYGON ((50 50, 50 60, 60 60, 60 50 , 50 50 ))') as geom
13+
union all
14+
select 'd' as region, ST_GeomFromText('POLYGON ((50 50, 50 60, 60 60, 60 50 , 50 50 ))') as geom
15+
union all
16+
select 'e' as region, ST_GeomFromText('POLYGON ((50 50, 50 60, 60 60, 60 50 , 50 50 ))') as geom
17+
union all
18+
select 'f' as region, ST_GeomFromText('POLYGON ((50 50, 50 60, 60 60, 60 50 , 50 50 ))') as geom
19+
union all
20+
select 'g' as region, ST_GeomFromText('POLYGON ((50 50, 50 60, 60 60, 60 50 , 50 50 ))') as geom
21+
), smaller as (
22+
select '1' as region, ST_GeomFromText('POLYGON ((2 2, 2 8, 8 8, 8 2 , 2 2 ))') as geom
23+
union all
24+
select '2' as region, ST_GeomFromText('POLYGON ((3 3, 3 5, 5 5, 5 3 , 3 3 ))') as geom
25+
)
26+
select * from smaller left join bigger on st_intersects(bigger.geom, smaller.geom) order by all;
27+
----
28+
1 POLYGON ((2 2, 2 8, 8 8, 8 2, 2 2)) a POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))
29+
2 POLYGON ((3 3, 3 5, 5 5, 5 3, 3 3)) a POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))

0 commit comments

Comments
 (0)