Skip to content

Commit e44af54

Browse files
Merge pull request ClickHouse#77484 from ianton-ru/write_to_merge
Write to Merge storage
2 parents b28b58a + 394f88a commit e44af54

File tree

8 files changed

+374
-6
lines changed

8 files changed

+374
-6
lines changed

docs/en/engines/table-engines/special/merge.md

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Reading is automatically parallelized. Writing to a table is not supported. When
1616
## Creating a Table {#creating-a-table}
1717

1818
```sql
19-
CREATE TABLE ... Engine=Merge(db_name, tables_regexp)
19+
CREATE TABLE ... Engine=Merge(db_name, tables_regexp [, table_to_write])
2020
```
2121

2222
## Engine Parameters {#engine-parameters}
@@ -35,6 +35,14 @@ CREATE TABLE ... Engine=Merge(db_name, tables_regexp)
3535
Regular expressions — [re2](https://github.com/google/re2) (supports a subset of PCRE), case-sensitive.
3636
See the notes about escaping symbols in regular expressions in the "match" section.
3737

38+
### table_to_write {#table_to_write}
39+
40+
`table_to_write` - Table name to write during inserts into `Merge` table.
41+
Possible values:
42+
- `'db_name.table_name'` - insert into the specific table in the specific database.
43+
- `'table_name'` - insert into table `db_name.table_name`. Allowed only when the first parameter `db_name` is not a regular expression.
44+
- `auto` - insert into the last table passed to `tables_regexp` in lexicographical order. Allowed only when the first parameter `db_name` is not a regular expression.
45+
3846
## Usage {#usage}
3947

4048
When selecting tables to read, the `Merge` table itself is not selected, even if it matches the regex. This is to avoid loops.
@@ -65,7 +73,7 @@ CREATE TABLE WatchLog_new(date Date, UserId Int64, EventType String, Cnt UInt64)
6573
ENGINE=MergeTree PARTITION BY date ORDER BY (UserId, EventType) SETTINGS index_granularity=8192;
6674
INSERT INTO WatchLog_new VALUES ('2018-01-02', 2, 'hit', 3);
6775

68-
CREATE TABLE WatchLog as WatchLog_old ENGINE=Merge(currentDatabase(), '^WatchLog');
76+
CREATE TABLE WatchLog as WatchLog_old ENGINE=Merge(currentDatabase(), '^WatchLog', 'WatchLog_new');
6977

7078
SELECT * FROM WatchLog;
7179
```
@@ -79,6 +87,22 @@ SELECT * FROM WatchLog;
7987
└────────────┴────────┴───────────┴─────┘
8088
```
8189

90+
Insert to table `WatchLog` is going into table `WatchLog_new`
91+
```sql
92+
INSERT INTO WatchLog VALUES ('2018-01-03', 3, 'hit', 3);
93+
94+
SELECT * FROM WatchLog_New;
95+
```
96+
97+
```text
98+
┌───────date─┬─UserId─┬─EventType─┬─Cnt─┐
99+
│ 2018-01-02 │ 2 │ hit │ 3 │
100+
└────────────┴────────┴───────────┴─────┘
101+
┌───────date─┬─UserId─┬─EventType─┬─Cnt─┐
102+
│ 2018-01-03 │ 3 │ hit │ 3 │
103+
└────────────┴────────┴───────────┴─────┘
104+
```
105+
82106
## Virtual Columns {#virtual-columns}
83107

84108
- `_table` — Contains the name of the table from which data was read. Type: [String](../../../sql-reference/data-types/string.md).

src/Storages/StorageMerge.cpp

Lines changed: 130 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include <Processors/QueryPlan/ExpressionStep.h>
4242
#include <Processors/QueryPlan/QueryPlan.h>
4343
#include <Processors/QueryPlan/ReadFromMergeTree.h>
44+
#include <Processors/Sinks/SinkToStorage.h>
4445
#include <Processors/Sources/NullSource.h>
4546
#include <Processors/Transforms/FilterTransform.h>
4647
#include <Processors/Transforms/MaterializingTransform.h>
@@ -83,6 +84,9 @@ extern const int SAMPLING_NOT_SUPPORTED;
8384
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
8485
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
8586
extern const int STORAGE_REQUIRES_PARAMETER;
87+
extern const int UNKNOWN_TABLE;
88+
extern const int ACCESS_DENIED;
89+
extern const int TABLE_IS_READ_ONLY;
8690
}
8791

8892
namespace
@@ -143,6 +147,8 @@ StorageMerge::StorageMerge(
143147
const String & source_database_name_or_regexp_,
144148
bool database_is_regexp_,
145149
const DBToTableSetMap & source_databases_and_tables_,
150+
const std::optional<String> & table_to_write_,
151+
bool table_to_write_auto_,
146152
ContextPtr context_)
147153
: IStorage(table_id_)
148154
, WithContext(context_->getGlobalContext())
@@ -151,6 +157,7 @@ StorageMerge::StorageMerge(
151157
database_is_regexp_,
152158
source_database_name_or_regexp_, {},
153159
source_databases_and_tables_)
160+
, table_to_write_auto(table_to_write_auto_)
154161
{
155162
StorageInMemoryMetadata storage_metadata;
156163
storage_metadata.setColumns(columns_.empty()
@@ -159,6 +166,8 @@ StorageMerge::StorageMerge(
159166
storage_metadata.setComment(comment);
160167
setInMemoryMetadata(storage_metadata);
161168
setVirtuals(createVirtuals());
169+
if (!table_to_write_auto)
170+
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
162171
}
163172

164173
StorageMerge::StorageMerge(
@@ -168,6 +177,8 @@ StorageMerge::StorageMerge(
168177
const String & source_database_name_or_regexp_,
169178
bool database_is_regexp_,
170179
const String & source_table_regexp_,
180+
const std::optional<String> & table_to_write_,
181+
bool table_to_write_auto_,
171182
ContextPtr context_)
172183
: IStorage(table_id_)
173184
, WithContext(context_->getGlobalContext())
@@ -176,6 +187,7 @@ StorageMerge::StorageMerge(
176187
database_is_regexp_,
177188
source_database_name_or_regexp_,
178189
source_table_regexp_, {})
190+
, table_to_write_auto(table_to_write_auto_)
179191
{
180192
StorageInMemoryMetadata storage_metadata;
181193
storage_metadata.setColumns(columns_.empty()
@@ -184,6 +196,8 @@ StorageMerge::StorageMerge(
184196
storage_metadata.setComment(comment);
185197
setInMemoryMetadata(storage_metadata);
186198
setVirtuals(createVirtuals());
199+
if (!table_to_write_auto)
200+
setTableToWrite(table_to_write_, source_database_name_or_regexp_, database_is_regexp_);
187201
}
188202

189203
StorageMerge::DatabaseTablesIterators StorageMerge::getDatabaseIterators(ContextPtr context_) const
@@ -293,6 +307,29 @@ void StorageMerge::forEachTable(F && func) const
293307
});
294308
}
295309

310+
template <typename F>
311+
void StorageMerge::forEachTableName(F && func) const
312+
{
313+
auto database_table_iterators = database_name_or_regexp.getDatabaseIterators(getContext());
314+
315+
for (auto & iterator : database_table_iterators)
316+
{
317+
while (iterator->isValid())
318+
{
319+
const auto & table = iterator->table();
320+
if (table.get() != this)
321+
{
322+
QualifiedTableName table_name;
323+
table_name.database = iterator->databaseName();
324+
table_name.table = iterator->name();
325+
func(table_name);
326+
}
327+
328+
iterator->next();
329+
}
330+
}
331+
}
332+
296333
bool StorageMerge::isRemote() const
297334
{
298335
auto first_remote_table = traverseTablesUntil([](const StoragePtr & table) { return table && table->isRemote(); });
@@ -1696,6 +1733,77 @@ std::optional<UInt64> StorageMerge::totalRowsOrBytes(F && func) const
16961733
return first_table ? std::nullopt : std::make_optional(total_rows_or_bytes);
16971734
}
16981735

1736+
void StorageMerge::setTableToWrite(
1737+
const std::optional<String> & table_to_write_,
1738+
const String & source_database_name_or_regexp_,
1739+
bool database_is_regexp_)
1740+
{
1741+
if (!table_to_write_.has_value())
1742+
{
1743+
table_to_write = std::nullopt;
1744+
return;
1745+
}
1746+
1747+
auto qualified_name = QualifiedTableName::parseFromString(*table_to_write_);
1748+
1749+
if (qualified_name.database.empty())
1750+
{
1751+
if (database_is_regexp_)
1752+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument 'table_to_write' must contain database if 'db_name' is regular expression");
1753+
1754+
qualified_name.database = source_database_name_or_regexp_;
1755+
}
1756+
1757+
table_to_write = qualified_name;
1758+
}
1759+
1760+
SinkToStoragePtr StorageMerge::write(
1761+
const ASTPtr & query,
1762+
const StorageMetadataPtr & metadata_snapshot,
1763+
ContextPtr context_,
1764+
bool async_insert)
1765+
{
1766+
const auto & access = context_->getAccess();
1767+
1768+
if (table_to_write_auto)
1769+
{
1770+
table_to_write = std::nullopt;
1771+
bool any_table_found = false;
1772+
forEachTableName([&](const auto & table_name)
1773+
{
1774+
any_table_found = true;
1775+
if (!table_to_write.has_value() || table_to_write->getFullName() < table_name.getFullName())
1776+
{
1777+
if (access->isGranted(AccessType::INSERT, table_name.database, table_name.table))
1778+
table_to_write = table_name;
1779+
}
1780+
});
1781+
if (!table_to_write.has_value())
1782+
{
1783+
if (any_table_found)
1784+
throw Exception(ErrorCodes::ACCESS_DENIED, "Not allowed to write in any suitable table for storage {}", getName());
1785+
else
1786+
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Can't find any table to write for storage {}", getName());
1787+
}
1788+
}
1789+
else
1790+
{
1791+
if (!table_to_write.has_value())
1792+
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Method write is not allowed in storage {} without described table to write", getName());
1793+
1794+
access->checkAccess(AccessType::INSERT, table_to_write->database, table_to_write->table);
1795+
}
1796+
1797+
auto database = DatabaseCatalog::instance().getDatabase(table_to_write->database);
1798+
auto table = database->getTable(table_to_write->table, context_);
1799+
auto table_lock = table->lockForShare(
1800+
context_->getInitialQueryId(),
1801+
context_->getSettingsRef()[Setting::lock_acquire_timeout]);
1802+
auto sink = table->write(query, metadata_snapshot, context_, async_insert);
1803+
sink->addTableLock(table_lock);
1804+
return sink;
1805+
}
1806+
16991807
void registerStorageMerge(StorageFactory & factory)
17001808
{
17011809
factory.registerStorage("Merge", [](const StorageFactory::Arguments & args)
@@ -1706,10 +1814,12 @@ void registerStorageMerge(StorageFactory & factory)
17061814

17071815
ASTs & engine_args = args.engine_args;
17081816

1709-
if (engine_args.size() != 2)
1817+
size_t size = engine_args.size();
1818+
1819+
if (size < 2 || size > 3)
17101820
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
1711-
"Storage Merge requires exactly 2 parameters - name "
1712-
"of source database and regexp for table names.");
1821+
"Storage Merge requires 2 or 3 parameters - name "
1822+
"of source database, regexp for table names, and optional table name for writing");
17131823

17141824
auto [is_regexp, database_ast] = StorageMerge::evaluateDatabaseName(engine_args[0], args.getLocalContext());
17151825

@@ -1721,8 +1831,24 @@ void registerStorageMerge(StorageFactory & factory)
17211831
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.getLocalContext());
17221832
String table_name_regexp = checkAndGetLiteralArgument<String>(engine_args[1], "table_name_regexp");
17231833

1834+
std::optional<String> table_to_write = std::nullopt;
1835+
bool table_to_write_auto = false;
1836+
if (size == 3)
1837+
{
1838+
bool is_identifier = engine_args[2]->as<ASTIdentifier>();
1839+
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.getLocalContext());
1840+
table_to_write = checkAndGetLiteralArgument<String>(engine_args[2], "table_to_write");
1841+
if (is_identifier && table_to_write == "auto")
1842+
{
1843+
if (is_regexp)
1844+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "RegExp for database with auto table_to_write is forbidden");
1845+
table_to_write_auto = true;
1846+
}
1847+
}
1848+
17241849
return std::make_shared<StorageMerge>(
1725-
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp, table_name_regexp, args.getLocalContext());
1850+
args.table_id, args.columns, args.comment, source_database_name_or_regexp, is_regexp,
1851+
table_name_regexp, table_to_write, table_to_write_auto, args.getLocalContext());
17261852
},
17271853
{
17281854
.supports_schema_inference = true

src/Storages/StorageMerge.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ class StorageMerge final : public IStorage, WithContext
3030
const String & source_database_name_or_regexp_,
3131
bool database_is_regexp_,
3232
const DBToTableSetMap & source_databases_and_tables_,
33+
const std::optional<String> & table_to_write_,
34+
bool table_to_write_auto_,
3335
ContextPtr context_);
3436

3537
StorageMerge(
@@ -39,6 +41,8 @@ class StorageMerge final : public IStorage, WithContext
3941
const String & source_database_name_or_regexp_,
4042
bool database_is_regexp_,
4143
const String & source_table_regexp_,
44+
const std::optional<String> & table_to_write_,
45+
bool table_to_write_auto_,
4246
ContextPtr context_);
4347

4448
std::string getName() const override { return "Merge"; }
@@ -70,6 +74,12 @@ class StorageMerge final : public IStorage, WithContext
7074
size_t max_block_size,
7175
size_t num_streams) override;
7276

77+
SinkToStoragePtr write(
78+
const ASTPtr & query,
79+
const StorageMetadataPtr & metadata_snapshot,
80+
ContextPtr context,
81+
bool async_insert) override;
82+
7383
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;
7484

7585
/// you need to add and remove columns in the sub-tables manually
@@ -122,12 +132,18 @@ class StorageMerge final : public IStorage, WithContext
122132

123133
DatabaseNameOrRegexp database_name_or_regexp;
124134

135+
std::optional<QualifiedTableName> table_to_write;
136+
bool table_to_write_auto = false;
137+
125138
template <typename F>
126139
StoragePtr traverseTablesUntil(F && predicate) const;
127140

128141
template <typename F>
129142
void forEachTable(F && func) const;
130143

144+
template <typename F>
145+
void forEachTableName(F && func) const;
146+
131147
template <typename F>
132148
static StoragePtr traverseTablesUntilImpl(const ContextPtr & query_context, const IStorage * ignore_self, const DatabaseNameOrRegexp & database_name_or_regexp, F && predicate);
133149

@@ -149,6 +165,11 @@ class StorageMerge final : public IStorage, WithContext
149165
template <typename F>
150166
std::optional<UInt64> totalRowsOrBytes(F && func) const;
151167

168+
void setTableToWrite(
169+
const std::optional<String> & table_to_write_,
170+
const String & source_database_name_or_regexp_,
171+
bool database_is_regexp_);
172+
152173
friend class ReadFromMerge;
153174
};
154175

src/TableFunctions/TableFunctionMerge.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,16 @@ ColumnsDescription TableFunctionMerge::getActualTableStructure(ContextPtr contex
141141

142142
StoragePtr TableFunctionMerge::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
143143
{
144+
std::optional<std::string> table_to_write = std::nullopt;
144145
auto res = std::make_shared<StorageMerge>(
145146
StorageID(getDatabaseName(), table_name),
146147
ColumnsDescription{},
147148
String{},
148149
source_database_name_or_regexp,
149150
database_is_regexp,
150151
source_table_regexp,
152+
table_to_write,
153+
false,
151154
context);
152155

153156
res->startup();
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
2 1
2+
2 2
3+
2 3
4+
1 1
5+
2 1
6+
2 2
7+
2 3
8+
1 1
9+
2 1
10+
2 2
11+
2 3
12+
1 1
13+
2 1
14+
2 2
15+
2 3
16+
1 1
17+
2 1
18+
2 2
19+
2 3
20+
1 1
21+
2 1
22+
2 2
23+
2 3
24+
4
25+
2 1
26+
2 2
27+
2 3
28+
3 1
29+
4
30+
2 1
31+
2 2
32+
2 3
33+
3 1
34+
1
35+
3 2
36+
4
37+
2 1
38+
2 2
39+
2 3
40+
3 1
41+
0
42+
2
43+
3 2
44+
3 3

0 commit comments

Comments
 (0)