Skip to content

Commit fa41835

Browse files
authored
Merge pull request #34258 from teskje/alter-sink-from-check
adapter,sql: move sink FROM check into planning
2 parents 37e1c7b + 7ce0692 commit fa41835

File tree

6 files changed

+62
-19
lines changed

6 files changed

+62
-19
lines changed

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,24 +1400,6 @@ impl Coordinator {
14001400
owner_id: *ctx.session().current_role_id(),
14011401
}];
14021402

1403-
let from = self.catalog().get_entry_by_global_id(&catalog_sink.from);
1404-
if let Err(e) = self
1405-
.controller
1406-
.storage
1407-
.check_exists(sink.from)
1408-
.map_err(|e| match e {
1409-
StorageError::IdentifierMissing(_) => AdapterError::Unstructured(anyhow!(
1410-
"{} is a {}, which cannot be exported as a sink",
1411-
from.name().item.clone(),
1412-
from.item().typ(),
1413-
)),
1414-
e => AdapterError::Storage(e),
1415-
})
1416-
{
1417-
ctx.retire(Err(e));
1418-
return;
1419-
}
1420-
14211403
let result = self.catalog_transact(Some(ctx.session()), ops).await;
14221404

14231405
match result {

src/sql/src/plan/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ pub enum PlanError {
143143
name: String,
144144
version: String,
145145
},
146+
InvalidSinkFrom {
147+
name: String,
148+
item_type: CatalogItemType,
149+
},
146150
MangedReplicaName(String),
147151
ParserStatement(ParserStatementError),
148152
Parser(ParserError),
@@ -637,6 +641,9 @@ impl fmt::Display for PlanError {
637641
Self::InvalidVersion { name, version } => {
638642
write!(f, "invalid version {} for {}", version.quoted(), name.quoted())
639643
},
644+
Self::InvalidSinkFrom { name, item_type } => {
645+
write!(f, "{name} is a {item_type}, which cannot be exported as a sink")
646+
},
640647
Self::DropViewOnMaterializedView(name)
641648
| Self::AlterViewOnMaterializedView(name)
642649
| Self::ShowCreateViewOnMaterializedView(name)

src/sql/src/plan/statement/ddl.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3376,6 +3376,21 @@ fn plan_sink(
33763376

33773377
let from_name = &from;
33783378
let from = scx.get_item_by_resolved_name(&from)?;
3379+
3380+
{
3381+
use CatalogItemType::*;
3382+
match from.item_type() {
3383+
Table | Source | MaterializedView | ContinualTask => {}
3384+
Sink | View | Index | Type | Func | Secret | Connection => {
3385+
let name = scx.catalog.minimal_qualification(from.name());
3386+
return Err(PlanError::InvalidSinkFrom {
3387+
name: name.to_string(),
3388+
item_type: from.item_type(),
3389+
});
3390+
}
3391+
}
3392+
}
3393+
33793394
if from.id().is_system() {
33803395
bail_unsupported!("creating a sink directly on a catalog object");
33813396
}

test/testdrive/alter-sink.td

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,7 @@ $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=10s
124124

125125
$ kafka-verify-data format=json sink=materialize.public.sink key=false
126126
{"before": null, "after": {"created_post_name": "hundred", "created_post_value": 99}}
127+
128+
> CREATE TABLE t (s text)
129+
> CREATE VIEW v AS SELECT s, 1 FROM t
130+
> ALTER SINK sink SET FROM v

test/testdrive/github-9923.td

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the LICENSE file at the root of this repository.
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0.
9+
10+
# Regression test for database-issues#9923.
11+
12+
> CREATE CONNECTION kafka_conn
13+
TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT)
14+
> CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (
15+
URL '${testdrive.schema-registry-url}'
16+
)
17+
18+
> CREATE TABLE t (a int)
19+
> CREATE VIEW v AS SELECT a, 1 FROM t
20+
21+
! CREATE SINK sink
22+
FROM v
23+
INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-alter-sink-${testdrive.seed}')
24+
FORMAT JSON
25+
ENVELOPE DEBEZIUM
26+
contains:v is a view, which cannot be exported as a sink
27+
28+
> CREATE SINK sink
29+
FROM t
30+
INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-alter-sink-${testdrive.seed}')
31+
FORMAT JSON
32+
ENVELOPE DEBEZIUM
33+
34+
! ALTER SINK sink SET FROM v
35+
contains:v is a view, which cannot be exported as a sink

test/testdrive/kafka-sink-errors.td

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ contains:LEGACY IDs option is not supported
5858
INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-sink-errors-${testdrive.seed}')
5959
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
6060
ENVELOPE DEBEZIUM
61-
contains:catalog item 'materialize.public.s1' is a sink and so cannot be depended upon
61+
contains:s1 is a sink, which cannot be exported as a sink
6262

6363
! CREATE VIEW v2 AS SELECT * FROM s1
6464
contains:catalog item 'materialize.public.s1' is a sink and so cannot be depended upon

0 commit comments

Comments
 (0)