Skip to content

Commit a0a25c5

Browse files
al13n321zvonand
authored andcommitted
Merge pull request ClickHouse#68249 from ClickHouse/rset
Fix 'Refresh set entry already exists'
1 parent bde8c7e commit a0a25c5

File tree

9 files changed

+77
-63
lines changed

9 files changed

+77
-63
lines changed

src/Interpreters/InterpreterSystemQuery.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -666,13 +666,16 @@ BlockIO InterpreterSystemQuery::execute()
666666
startStopAction(ActionLocks::ViewRefresh, false);
667667
break;
668668
case Type::REFRESH_VIEW:
669-
getRefreshTask()->run();
669+
for (const auto & task : getRefreshTasks())
670+
task->run();
670671
break;
671672
case Type::CANCEL_VIEW:
672-
getRefreshTask()->cancel();
673+
for (const auto & task : getRefreshTasks())
674+
task->cancel();
673675
break;
674676
case Type::TEST_VIEW:
675-
getRefreshTask()->setFakeTime(query.fake_time_for_view);
677+
for (const auto & task : getRefreshTasks())
678+
task->setFakeTime(query.fake_time_for_view);
676679
break;
677680
case Type::DROP_REPLICA:
678681
dropReplica(query);
@@ -1245,15 +1248,15 @@ void InterpreterSystemQuery::flushDistributed(ASTSystemQuery & query)
12451248
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "SYSTEM RESTART DISK is not supported");
12461249
}
12471250

1248-
RefreshTaskHolder InterpreterSystemQuery::getRefreshTask()
1251+
RefreshTaskList InterpreterSystemQuery::getRefreshTasks()
12491252
{
12501253
auto ctx = getContext();
12511254
ctx->checkAccess(AccessType::SYSTEM_VIEWS);
1252-
auto task = ctx->getRefreshSet().getTask(table_id);
1253-
if (!task)
1255+
auto tasks = ctx->getRefreshSet().findTasks(table_id);
1256+
if (tasks.empty())
12541257
throw Exception(
12551258
ErrorCodes::BAD_ARGUMENTS, "Refreshable view {} doesn't exist", table_id.getNameForLogs());
1256-
return task;
1259+
return tasks;
12571260
}
12581261

12591262

src/Interpreters/InterpreterSystemQuery.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class InterpreterSystemQuery : public IInterpreter, WithMutableContext
7474
void flushDistributed(ASTSystemQuery & query);
7575
[[noreturn]] void restartDisk(String & name);
7676

77-
RefreshTaskHolder getRefreshTask();
77+
RefreshTaskList getRefreshTasks();
7878

7979
AccessRightsElements getRequiredAccessForDDLOnCluster() const;
8080
void startStopAction(StorageActionBlockType action_type, bool start);

src/Storages/MaterializedView/RefreshSet.cpp

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,6 @@ namespace CurrentMetrics
99
namespace DB
1010
{
1111

12-
namespace ErrorCodes
13-
{
14-
extern const int LOGICAL_ERROR;
15-
}
16-
1712
RefreshSet::Handle::Handle(Handle && other) noexcept
1813
{
1914
*this = std::move(other);
@@ -27,6 +22,7 @@ RefreshSet::Handle & RefreshSet::Handle::operator=(Handle && other) noexcept
2722
parent_set = std::exchange(other.parent_set, nullptr);
2823
id = std::move(other.id);
2924
dependencies = std::move(other.dependencies);
25+
iter = std::move(other.iter);
3026
metric_increment = std::move(other.metric_increment);
3127
return *this;
3228
}
@@ -39,21 +35,21 @@ RefreshSet::Handle::~Handle()
3935
void RefreshSet::Handle::rename(StorageID new_id)
4036
{
4137
std::lock_guard lock(parent_set->mutex);
42-
parent_set->removeDependenciesLocked(id, dependencies);
43-
auto it = parent_set->tasks.find(id);
44-
auto task = it->second;
45-
parent_set->tasks.erase(it);
38+
RefreshTaskHolder task = *iter;
39+
parent_set->removeDependenciesLocked(task, dependencies);
40+
parent_set->removeTaskLocked(id, iter);
4641
id = new_id;
47-
parent_set->tasks.emplace(id, task);
48-
parent_set->addDependenciesLocked(id, dependencies);
42+
iter = parent_set->addTaskLocked(id, task);
43+
parent_set->addDependenciesLocked(task, dependencies);
4944
}
5045

5146
void RefreshSet::Handle::changeDependencies(std::vector<StorageID> deps)
5247
{
5348
std::lock_guard lock(parent_set->mutex);
54-
parent_set->removeDependenciesLocked(id, dependencies);
49+
RefreshTaskHolder task = *iter;
50+
parent_set->removeDependenciesLocked(task, dependencies);
5551
dependencies = std::move(deps);
56-
parent_set->addDependenciesLocked(id, dependencies);
52+
parent_set->addDependenciesLocked(task, dependencies);
5753
}
5854

5955
void RefreshSet::Handle::reset()
@@ -63,8 +59,8 @@ void RefreshSet::Handle::reset()
6359

6460
{
6561
std::lock_guard lock(parent_set->mutex);
66-
parent_set->removeDependenciesLocked(id, dependencies);
67-
parent_set->tasks.erase(id);
62+
parent_set->removeDependenciesLocked(*iter, dependencies);
63+
parent_set->removeTaskLocked(id, iter);
6864
}
6965

7066
parent_set = nullptr;
@@ -76,37 +72,50 @@ RefreshSet::RefreshSet() = default;
7672
void RefreshSet::emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task)
7773
{
7874
std::lock_guard guard(mutex);
79-
auto [it, is_inserted] = tasks.emplace(id, task);
80-
if (!is_inserted)
81-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Refresh set entry already exists for table {}", id.getFullTableName());
82-
addDependenciesLocked(id, dependencies);
75+
const auto iter = addTaskLocked(id, task);
76+
addDependenciesLocked(task, dependencies);
77+
78+
task->setRefreshSetHandleUnlock(Handle(this, id, iter, dependencies));
79+
}
8380

84-
task->setRefreshSetHandleUnlock(Handle(this, id, dependencies));
81+
RefreshTaskList::iterator RefreshSet::addTaskLocked(StorageID id, RefreshTaskHolder task)
82+
{
83+
RefreshTaskList & list = tasks[id];
84+
list.push_back(task);
85+
return std::prev(list.end());
8586
}
8687

87-
void RefreshSet::addDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies)
88+
void RefreshSet::removeTaskLocked(StorageID id, RefreshTaskList::iterator iter)
89+
{
90+
const auto it = tasks.find(id);
91+
it->second.erase(iter);
92+
if (it->second.empty())
93+
tasks.erase(it);
94+
}
95+
96+
void RefreshSet::addDependenciesLocked(RefreshTaskHolder task, const std::vector<StorageID> & dependencies)
8897
{
8998
for (const StorageID & dep : dependencies)
90-
dependents[dep].insert(id);
99+
dependents[dep].insert(task);
91100
}
92101

93-
void RefreshSet::removeDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies)
102+
void RefreshSet::removeDependenciesLocked(RefreshTaskHolder task, const std::vector<StorageID> & dependencies)
94103
{
95104
for (const StorageID & dep : dependencies)
96105
{
97106
auto & set = dependents[dep];
98-
set.erase(id);
107+
set.erase(task);
99108
if (set.empty())
100109
dependents.erase(dep);
101110
}
102111
}
103112

104-
RefreshTaskHolder RefreshSet::getTask(const StorageID & id) const
113+
RefreshTaskList RefreshSet::findTasks(const StorageID & id) const
105114
{
106115
std::lock_guard lock(mutex);
107-
if (auto task = tasks.find(id); task != tasks.end())
108-
return task->second;
109-
return nullptr;
116+
if (auto it = tasks.find(id); it != tasks.end())
117+
return it->second;
118+
return {};
110119
}
111120

112121
RefreshSet::InfoContainer RefreshSet::getInfo() const
@@ -116,26 +125,23 @@ RefreshSet::InfoContainer RefreshSet::getInfo() const
116125
lock.unlock();
117126

118127
InfoContainer res;
119-
for (const auto & [id, task] : tasks_copy)
120-
res.push_back(task->getInfo());
128+
for (const auto & [id, list] : tasks_copy)
129+
for (const auto & task : list)
130+
res.push_back(task->getInfo());
121131
return res;
122132
}
123133

124134
std::vector<RefreshTaskHolder> RefreshSet::getDependents(const StorageID & id) const
125135
{
126136
std::lock_guard lock(mutex);
127-
std::vector<RefreshTaskHolder> res;
128137
auto it = dependents.find(id);
129138
if (it == dependents.end())
130139
return {};
131-
for (const StorageID & dep_id : it->second)
132-
if (auto task = tasks.find(dep_id); task != tasks.end())
133-
res.push_back(task->second);
134-
return res;
140+
return std::vector<RefreshTaskHolder>(it->second.begin(), it->second.end());
135141
}
136142

137-
RefreshSet::Handle::Handle(RefreshSet * parent_set_, StorageID id_, std::vector<StorageID> dependencies_)
143+
RefreshSet::Handle::Handle(RefreshSet * parent_set_, StorageID id_, RefreshTaskList::iterator iter_, std::vector<StorageID> dependencies_)
138144
: parent_set(parent_set_), id(std::move(id_)), dependencies(std::move(dependencies_))
139-
, metric_increment(CurrentMetrics::Increment(CurrentMetrics::RefreshableViews)) {}
145+
, iter(iter_), metric_increment(CurrentMetrics::Increment(CurrentMetrics::RefreshableViews)) {}
140146

141147
}

src/Storages/MaterializedView/RefreshSet.h

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@
55
#include <Storages/IStorage.h>
66
#include <Storages/MaterializedView/RefreshTask_fwd.h>
77
#include <Common/CurrentMetrics.h>
8+
#include <list>
89

910
namespace DB
1011
{
1112

12-
using DatabaseAndTableNameSet = std::unordered_set<StorageID, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
13-
1413
enum class RefreshState : RefreshTaskStateUnderlying
1514
{
1615
Disabled = 0,
@@ -46,8 +45,7 @@ struct RefreshInfo
4645
class RefreshSet
4746
{
4847
public:
49-
/// RAII thing that unregisters a task and its dependencies in destructor.
50-
/// Storage IDs must be unique. Not thread safe.
48+
/// RAII thing that unregisters a task and its dependencies in destructor. Not thread safe.
5149
class Handle
5250
{
5351
friend class RefreshSet;
@@ -73,9 +71,10 @@ class RefreshSet
7371
RefreshSet * parent_set = nullptr;
7472
StorageID id = StorageID::createEmpty();
7573
std::vector<StorageID> dependencies;
74+
RefreshTaskList::iterator iter; // in parent_set->tasks[id]
7675
std::optional<CurrentMetrics::Increment> metric_increment;
7776

78-
Handle(RefreshSet * parent_set_, StorageID id_, std::vector<StorageID> dependencies_);
77+
Handle(RefreshSet * parent_set_, StorageID id_, RefreshTaskList::iterator iter_, std::vector<StorageID> dependencies_);
7978
};
8079

8180
using InfoContainer = std::vector<RefreshInfo>;
@@ -84,16 +83,18 @@ class RefreshSet
8483

8584
void emplace(StorageID id, const std::vector<StorageID> & dependencies, RefreshTaskHolder task);
8685

87-
RefreshTaskHolder getTask(const StorageID & id) const;
86+
/// Finds active refreshable view(s) by database and table name.
87+
/// Normally there's at most one, but we allow name collisions here, just in case.
88+
RefreshTaskList findTasks(const StorageID & id) const;
8889

8990
InfoContainer getInfo() const;
9091

9192
/// Get tasks that depend on the given one.
9293
std::vector<RefreshTaskHolder> getDependents(const StorageID & id) const;
9394

9495
private:
95-
using TaskMap = std::unordered_map<StorageID, RefreshTaskHolder, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
96-
using DependentsMap = std::unordered_map<StorageID, DatabaseAndTableNameSet, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
96+
using TaskMap = std::unordered_map<StorageID, RefreshTaskList, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
97+
using DependentsMap = std::unordered_map<StorageID, std::unordered_set<RefreshTaskHolder>, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
9798

9899
/// Protects the two maps below, not locked for any nontrivial operations (e.g. operations that
99100
/// block or lock other mutexes).
@@ -102,8 +103,10 @@ class RefreshSet
102103
TaskMap tasks;
103104
DependentsMap dependents;
104105

105-
void addDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies);
106-
void removeDependenciesLocked(const StorageID & id, const std::vector<StorageID> & dependencies);
106+
RefreshTaskList::iterator addTaskLocked(StorageID id, RefreshTaskHolder task);
107+
void removeTaskLocked(StorageID id, RefreshTaskList::iterator iter);
108+
void addDependenciesLocked(RefreshTaskHolder task, const std::vector<StorageID> & dependencies);
109+
void removeDependenciesLocked(RefreshTaskHolder task, const std::vector<StorageID> & dependencies);
107110
};
108111

109112
}

src/Storages/MaterializedView/RefreshTask.cpp

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ RefreshTask::RefreshTask(
3333
{}
3434

3535
RefreshTaskHolder RefreshTask::create(
36-
const StorageMaterializedView & view,
3736
ContextMutablePtr context,
3837
const DB::ASTRefreshStrategy & strategy)
3938
{
@@ -46,12 +45,9 @@ RefreshTaskHolder RefreshTask::create(
4645
t->refreshTask();
4746
});
4847

49-
std::vector<StorageID> deps;
5048
if (strategy.dependencies)
5149
for (auto && dependency : strategy.dependencies->children)
52-
deps.emplace_back(dependency->as<const ASTTableIdentifier &>());
53-
54-
context->getRefreshSet().emplace(view.getStorageID(), deps, task);
50+
task->initial_dependencies.emplace_back(dependency->as<const ASTTableIdentifier &>());
5551

5652
return task;
5753
}
@@ -61,6 +57,7 @@ void RefreshTask::initializeAndStart(std::shared_ptr<StorageMaterializedView> vi
6157
view_to_refresh = view;
6258
if (view->getContext()->getSettingsRef().stop_refreshable_materialized_views_on_startup)
6359
stop_requested = true;
60+
view->getContext()->getRefreshSet().emplace(view->getStorageID(), initial_dependencies, shared_from_this());
6461
populateDependencies();
6562
advanceNextRefreshTime(currentTime());
6663
refresh_task->schedule();
@@ -69,7 +66,8 @@ void RefreshTask::initializeAndStart(std::shared_ptr<StorageMaterializedView> vi
6966
void RefreshTask::rename(StorageID new_id)
7067
{
7168
std::lock_guard guard(mutex);
72-
set_handle.rename(new_id);
69+
if (set_handle)
70+
set_handle.rename(new_id);
7371
}
7472

7573
void RefreshTask::alterRefreshParams(const DB::ASTRefreshStrategy & new_strategy)

src/Storages/MaterializedView/RefreshTask.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ class RefreshTask : public std::enable_shared_from_this<RefreshTask>
2626

2727
/// The only proper way to construct task
2828
static RefreshTaskHolder create(
29-
const StorageMaterializedView & view,
3029
ContextMutablePtr context,
3130
const DB::ASTRefreshStrategy & strategy);
3231

@@ -84,9 +83,11 @@ class RefreshTask : public std::enable_shared_from_this<RefreshTask>
8483

8584
RefreshSchedule refresh_schedule;
8685
RefreshSettings refresh_settings; // TODO: populate, use, update on alter
86+
std::vector<StorageID> initial_dependencies;
8787
RefreshSet::Handle set_handle;
8888

8989
/// StorageIDs of our dependencies that we're waiting for.
90+
using DatabaseAndTableNameSet = std::unordered_set<StorageID, StorageID::DatabaseAndTableNameHash, StorageID::DatabaseAndTableNameEqual>;
9091
DatabaseAndTableNameSet remaining_dependencies;
9192
bool time_arrived = false;
9293

src/Storages/MaterializedView/RefreshTask_fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,6 @@ class RefreshTask;
1111
using RefreshTaskStateUnderlying = UInt8;
1212
using RefreshTaskHolder = std::shared_ptr<RefreshTask>;
1313
using RefreshTaskObserver = std::weak_ptr<RefreshTask>;
14+
using RefreshTaskList = std::list<RefreshTaskHolder>;
1415

1516
}

src/Storages/StorageMaterializedView.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ StorageMaterializedView::StorageMaterializedView(
203203
{
204204
fixed_uuid = false;
205205
refresher = RefreshTask::create(
206-
*this,
207206
getContext(),
208207
*query.refresh_strategy);
209208
refresh_on_start = mode < LoadingStrictnessLevel::ATTACH && !query.is_create_empty;

src/Storages/System/StorageSystemViewRefreshes.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <DataTypes/DataTypeDateTime.h>
66
#include <DataTypes/DataTypeNullable.h>
77
#include <DataTypes/DataTypeString.h>
8+
#include <DataTypes/DataTypeUUID.h>
89
#include <DataTypes/DataTypesNumber.h>
910
#include <Interpreters/Context.h>
1011
#include <Storages/MaterializedView/RefreshSet.h>
@@ -19,6 +20,7 @@ ColumnsDescription StorageSystemViewRefreshes::getColumnsDescription()
1920
{
2021
{"database", std::make_shared<DataTypeString>(), "The name of the database the table is in."},
2122
{"view", std::make_shared<DataTypeString>(), "Table name."},
23+
{"uuid", std::make_shared<DataTypeUUID>(), "Table uuid (Atomic database)."},
2224
{"status", std::make_shared<DataTypeString>(), "Current state of the refresh."},
2325
{"last_refresh_result", std::make_shared<DataTypeString>(), "Outcome of the latest refresh attempt."},
2426
{"last_refresh_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()),
@@ -63,6 +65,7 @@ void StorageSystemViewRefreshes::fillData(
6365
std::size_t i = 0;
6466
res_columns[i++]->insert(refresh.view_id.getDatabaseName());
6567
res_columns[i++]->insert(refresh.view_id.getTableName());
68+
res_columns[i++]->insert(refresh.view_id.uuid);
6669
res_columns[i++]->insert(toString(refresh.state));
6770
res_columns[i++]->insert(toString(refresh.last_refresh_result));
6871

0 commit comments

Comments
 (0)