Skip to content

Commit 632eb5b

Browse files
authored
Revert "Refine cancel for read thread stream (#8511)" (#8541) (#8544)
close #8539
1 parent 3d5c741 commit 632eb5b

File tree

1 file changed

+5
-15
lines changed

1 file changed

+5
-15
lines changed

dbms/src/Storages/DeltaMerge/ReadThread/UnorderedInputStream.h

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,25 +56,17 @@ class UnorderedInputStream : public IProfilingBlockInputStream
5656
LOG_DEBUG(log, "Created, pool_id={} ref_no={}", task_pool->poolId(), ref_no);
5757
}
5858

59-
void cancel(bool /*kill*/) override { decreaseRefCount(true); }
60-
61-
~UnorderedInputStream() override { decreaseRefCount(false); }
59+
~UnorderedInputStream() override
60+
{
61+
task_pool->decreaseUnorderedInputStreamRefCount();
62+
LOG_DEBUG(log, "Destroy, pool_id={} ref_no={}", task_pool->poolId(), ref_no);
63+
}
6264

6365
String getName() const override { return NAME; }
6466

6567
Block getHeader() const override { return header; }
6668

6769
protected:
68-
void decreaseRefCount(bool is_cancel)
69-
{
70-
bool ori = false;
71-
if (is_stopped.compare_exchange_strong(ori, true))
72-
{
73-
task_pool->decreaseUnorderedInputStreamRefCount();
74-
LOG_DEBUG(log, "{}, pool_id={} ref_no={}", is_cancel ? "Cancel" : "Destroy", task_pool->poolId(), ref_no);
75-
}
76-
}
77-
7870
Block readImpl() override
7971
{
8072
FilterPtr filter_ignored;
@@ -137,7 +129,5 @@ class UnorderedInputStream : public IProfilingBlockInputStream
137129
LoggerPtr log;
138130
int64_t ref_no;
139131
bool task_pool_added;
140-
141-
std::atomic_bool is_stopped = false;
142132
};
143133
} // namespace DB::DM

0 commit comments

Comments
 (0)