Skip to content

Commit 93efc0a

Browse files
CaideyipiJackieTien97
authored andcommitted
Implemented the alter encoding compression function for tree model (#16672)
* ifpermitted * reconstruct * final-prev * complete-dn * partial * very-partial * fix * partial-set * fix * test * fix * shop * fix * some * partial * bishop * fix * fix * grasia * fix * main * fix * partial * fix * minor * fix * fix * spotless * test * part * fix * bug-fix * fix * Revert "fix" This reverts commit da7a080. * Reapply "fix" This reverts commit 27de58a.
1 parent 9144305 commit 93efc0a

File tree

76 files changed

+1854
-158
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+1854
-158
lines changed
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.it.schema;
21+
22+
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.itbase.category.ClusterIT;
24+
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
25+
import org.apache.iotdb.util.AbstractSchemaIT;
26+
27+
import org.junit.After;
28+
import org.junit.Assert;
29+
import org.junit.Test;
30+
import org.junit.experimental.categories.Category;
31+
import org.junit.runners.Parameterized;
32+
33+
import java.sql.Connection;
34+
import java.sql.ResultSet;
35+
import java.sql.SQLException;
36+
import java.sql.Statement;
37+
38+
import static org.junit.Assert.assertEquals;
39+
import static org.junit.jupiter.api.Assertions.fail;
40+
41+
@Category({LocalStandaloneIT.class, ClusterIT.class})
42+
public class IoTDBAlterEncodingCompressorIT extends AbstractSchemaIT {
43+
44+
public IoTDBAlterEncodingCompressorIT(SchemaTestMode schemaTestMode) {
45+
super(schemaTestMode);
46+
}
47+
48+
@Parameterized.BeforeParam
49+
public static void before() throws Exception {
50+
setUpEnvironment();
51+
EnvFactory.getEnv().initClusterEnvironment();
52+
}
53+
54+
@Parameterized.AfterParam
55+
public static void after() throws Exception {
56+
EnvFactory.getEnv().cleanClusterEnvironment();
57+
tearDownEnvironment();
58+
}
59+
60+
@After
61+
public void tearDown() throws Exception {
62+
clearSchema();
63+
}
64+
65+
@Test
66+
public void alterEncodingAndCompressorTest() throws Exception {
67+
if (schemaTestMode.equals(SchemaTestMode.PBTree)) {
68+
return;
69+
}
70+
try (final Connection connection = EnvFactory.getEnv().getConnection();
71+
final Statement statement = connection.createStatement()) {
72+
statement.execute("create timeSeries root.vehicle.wind.a int32");
73+
74+
try {
75+
statement.execute("alter timeSeries root.nonExist.** set encoding=PLAIN");
76+
fail();
77+
} catch (final SQLException e) {
78+
Assert.assertEquals(
79+
"508: Timeseries [root.nonExist.**] does not exist or is represented by device template",
80+
e.getMessage());
81+
}
82+
83+
try {
84+
statement.execute("alter timeSeries if exists root.nonExist.** set encoding=PLAIN");
85+
} catch (final SQLException e) {
86+
fail(
87+
"Alter encoding & compressor shall not fail when timeSeries not exists if set if exists");
88+
}
89+
90+
try {
91+
statement.execute("alter timeSeries if exists root.vehicle.** set encoding=aaa");
92+
fail();
93+
} catch (final SQLException e) {
94+
Assert.assertEquals("701: Unsupported encoding: AAA", e.getMessage());
95+
}
96+
97+
try {
98+
statement.execute("alter timeSeries if exists root.vehicle.** set compressor=aaa");
99+
fail();
100+
} catch (final SQLException e) {
101+
Assert.assertEquals("701: Unsupported compressor: AAA", e.getMessage());
102+
}
103+
104+
try {
105+
statement.execute("alter timeSeries if exists root.vehicle.** set falseKey=aaa");
106+
fail();
107+
} catch (final SQLException e) {
108+
Assert.assertEquals("701: property falsekey is unsupported yet.", e.getMessage());
109+
}
110+
111+
try {
112+
statement.execute("alter timeSeries if exists root.vehicle.** set encoding=DICTIONARY");
113+
fail();
114+
} catch (final SQLException e) {
115+
Assert.assertTrue(e.getMessage().contains("encoding DICTIONARY does not support INT32"));
116+
}
117+
118+
statement.execute("alter timeSeries root.** set encoding=Plain, compressor=LZMA2");
119+
120+
try (final ResultSet resultSet = statement.executeQuery("SHOW TIMESERIES")) {
121+
while (resultSet.next()) {
122+
assertEquals("PLAIN", resultSet.getString(5));
123+
assertEquals("LZMA2", resultSet.getString(6));
124+
}
125+
}
126+
127+
statement.execute("create user IoTDBUser '!@#$!dfdfzvd343'");
128+
statement.execute("grant write on root.vehicle.wind.a to user IoTDBUser");
129+
statement.execute("create timeSeries root.vehicle.wind.b int32");
130+
}
131+
132+
try (final Connection connection =
133+
EnvFactory.getEnv().getConnection("IoTDBUser", "!@#$!dfdfzvd343");
134+
final Statement statement = connection.createStatement()) {
135+
try {
136+
statement.execute("alter timeSeries root.vechile.** set encoding=PLAIN, compressor=LZMA2");
137+
fail();
138+
} catch (final SQLException e) {
139+
Assert.assertEquals(
140+
"803: No permissions for this operation, please add privilege WRITE_SCHEMA on [root.vechile.**]",
141+
e.getMessage());
142+
}
143+
144+
try {
145+
statement.execute(
146+
"alter timeSeries root.vechile.wind.a, root.__audit.** set encoding=PLAIN, compressor=LZMA2");
147+
fail();
148+
} catch (final SQLException e) {
149+
Assert.assertEquals(
150+
"803: 'AUDIT' permission is needed to alter the encoding and compressor of database root.__audit",
151+
e.getMessage());
152+
}
153+
154+
try {
155+
statement.execute(
156+
"alter timeSeries if permitted root.vehicle.**, root.__audit.** set encoding=GORILLA, compressor=GZIP");
157+
} catch (final SQLException e) {
158+
fail("Alter encoding & compressor shall not fail when no privileges if set if permitted");
159+
}
160+
}
161+
162+
try (final Connection connection = EnvFactory.getEnv().getConnection();
163+
final Statement statement = connection.createStatement()) {
164+
try (final ResultSet resultSet =
165+
statement.executeQuery("SHOW TIMESERIES root.__audit.**._0.password")) {
166+
while (resultSet.next()) {
167+
assertEquals("PLAIN", resultSet.getString(5));
168+
assertEquals("LZMA2", resultSet.getString(6));
169+
}
170+
}
171+
172+
try (final ResultSet resultSet =
173+
statement.executeQuery("SHOW TIMESERIES root.vehicle.wind.b")) {
174+
resultSet.next();
175+
assertEquals("TS_2DIFF", resultSet.getString(5));
176+
assertEquals("LZ4", resultSet.getString(6));
177+
}
178+
179+
try (final ResultSet resultSet =
180+
statement.executeQuery("SHOW TIMESERIES root.vehicle.wind.a")) {
181+
resultSet.next();
182+
assertEquals("GORILLA", resultSet.getString(5));
183+
assertEquals("GZIP", resultSet.getString(6));
184+
}
185+
}
186+
}
187+
}

integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeInclusionIT.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,71 @@ public void testPureSchemaInclusion() throws Exception {
4949
final String receiverIp = receiverDataNode.getIp();
5050
final int receiverPort = receiverDataNode.getPort();
5151

52+
try (final SyncConfigNodeIServiceClient client =
53+
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
54+
final Map<String, String> sourceAttributes = new HashMap<>();
55+
final Map<String, String> processorAttributes = new HashMap<>();
56+
final Map<String, String> sinkAttributes = new HashMap<>();
57+
58+
sourceAttributes.put("source.inclusion", "schema");
59+
60+
sinkAttributes.put("sink", "iotdb-thrift-sink");
61+
sinkAttributes.put("sink.ip", receiverIp);
62+
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
63+
64+
final TSStatus status =
65+
client.createPipe(
66+
new TCreatePipeReq("testPipe", sinkAttributes)
67+
.setExtractorAttributes(sourceAttributes)
68+
.setProcessorAttributes(processorAttributes));
69+
70+
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
71+
72+
Assert.assertEquals(
73+
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());
74+
75+
// Do not fail if the failure has nothing to do with pipe
76+
// Because the failures will randomly generate due to resource limitation
77+
if (!TestUtils.tryExecuteNonQueriesWithRetry(
78+
senderEnv,
79+
Arrays.asList(
80+
// TODO: add database creation after the database auto creating on receiver can be
81+
// banned
82+
"create timeSeries root.ln.wf01.wt01.status with datatype=BOOLEAN,encoding=PLAIN",
83+
"ALTER timeSeries root.ln.wf01.wt01.status ADD TAGS tag3=v3",
84+
"ALTER timeSeries root.ln.wf01.wt01.status ADD ATTRIBUTES attr4=v4",
85+
"ALTER timeSeries root.** set compressor=ZSTD"),
86+
null)) {
87+
return;
88+
}
89+
90+
TestUtils.assertDataEventuallyOnEnv(
91+
receiverEnv,
92+
"show timeseries root.ln.**",
93+
"Timeseries,Alias,Database,DataType,Encoding,Compression,Tags,Attributes,Deadband,DeadbandParameters,ViewType,",
94+
Collections.singleton(
95+
"root.ln.wf01.wt01.status,null,root.ln,BOOLEAN,PLAIN,ZSTD,{\"tag3\":\"v3\"},{\"attr4\":\"v4\"},null,null,BASE,"));
96+
97+
if (!TestUtils.tryExecuteNonQueriesWithRetry(
98+
senderEnv,
99+
Arrays.asList(
100+
"insert into root.ln.wf01.wt01(time, status) values(now(), false)", "flush"),
101+
null)) {
102+
return;
103+
}
104+
105+
TestUtils.assertDataAlwaysOnEnv(
106+
receiverEnv, "select * from root.ln.**", "Time,", Collections.emptySet());
107+
}
108+
}
109+
110+
@Test
111+
public void testPureSchemaInclusionWithMultiplePattern() throws Exception {
112+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
113+
114+
final String receiverIp = receiverDataNode.getIp();
115+
final int receiverPort = receiverDataNode.getPort();
116+
52117
try (final SyncConfigNodeIServiceClient client =
53118
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
54119
final Map<String, String> extractorAttributes = new HashMap<>();

iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ ddlStatement
3939
// Database
4040
: createDatabase | dropDatabase | dropPartition | alterDatabase | showDatabases | countDatabases
4141
// Timeseries & Path
42-
| createTimeseries | dropTimeseries | alterTimeseries
42+
| createTimeseries | dropTimeseries | alterTimeseries | alterEncodingCompressor
4343
| showDevices | showTimeseries | showChildPaths | showChildNodes | countDevices | countTimeseries | countNodes
4444
// Device Template
4545
| createSchemaTemplate | createTimeseriesUsingSchemaTemplate | dropSchemaTemplate | dropTimeseriesOfSchemaTemplate
@@ -177,6 +177,10 @@ alterClause
177177
| UPSERT aliasClause? tagClause? attributeClause?
178178
;
179179

180+
alterEncodingCompressor
181+
: ALTER TIMESERIES (IF EXISTS)? (IF PERMITTED)? prefixPath (COMMA prefixPath)* SET attributePair (COMMA attributePair)*
182+
;
183+
180184
aliasClause
181185
: ALIAS operator_eq alias
182186
;

iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,10 @@ PATHS
606606
: P A T H S
607607
;
608608

609+
PERMITTED
610+
: P E R M I T T E D
611+
;
612+
609613
PIPE
610614
: P I P E
611615
;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public enum CnToDnAsyncRequestType {
9090
DELETE_DATA_FOR_DELETE_SCHEMA,
9191
DELETE_TIMESERIES,
9292

93+
ALTER_ENCODING_COMPRESSOR,
94+
9395
CONSTRUCT_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
9496
ROLLBACK_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
9597
DEACTIVATE_TEMPLATE,

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
4949
import org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
5050
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
51+
import org.apache.iotdb.mpp.rpc.thrift.TAlterEncodingCompressorReq;
5152
import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq;
5253
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateReq;
5354
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq;
@@ -308,6 +309,11 @@ protected void initActionMapBuilder() {
308309
CnToDnAsyncRequestType.DELETE_TIMESERIES,
309310
(req, client, handler) ->
310311
client.deleteTimeSeries((TDeleteTimeSeriesReq) req, (SchemaUpdateRPCHandler) handler));
312+
actionMapBuilder.put(
313+
CnToDnAsyncRequestType.ALTER_ENCODING_COMPRESSOR,
314+
(req, client, handler) ->
315+
client.alterEncodingCompressor(
316+
(TAlterEncodingCompressorReq) req, (SchemaUpdateRPCHandler) handler));
311317
actionMapBuilder.put(
312318
CnToDnAsyncRequestType.CONSTRUCT_SCHEMA_BLACK_LIST_WITH_TEMPLATE,
313319
(req, client, handler) ->

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public static DataNodeAsyncRequestRPCHandler<?> buildHandler(
8383
case ROLLBACK_SCHEMA_BLACK_LIST:
8484
case DELETE_DATA_FOR_DELETE_SCHEMA:
8585
case DELETE_TIMESERIES:
86+
case ALTER_ENCODING_COMPRESSOR:
8687
case CONSTRUCT_SCHEMA_BLACK_LIST_WITH_TEMPLATE:
8788
case ROLLBACK_SCHEMA_BLACK_LIST_WITH_TEMPLATE:
8889
case DEACTIVATE_TEMPLATE:

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
6363
import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan;
6464
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
65+
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeAlterEncodingCompressorPlan;
6566
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTableOrViewPlan;
6667
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeactivateTemplatePlan;
6768
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteDevicesPlan;
@@ -546,6 +547,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
546547
case PipeDeleteDevices:
547548
plan = new PipeDeleteDevicesPlan();
548549
break;
550+
case PipeAlterEncodingCompressor:
551+
plan = new PipeAlterEncodingCompressorPlan();
552+
break;
549553
case UpdateTriggersOnTransferNodes:
550554
plan = new UpdateTriggersOnTransferNodesPlan();
551555
break;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ public enum ConfigPhysicalPlanType {
301301
PipeSetTTL((short) 1705),
302302
PipeCreateTableOrView((short) 1706),
303303
PipeDeleteDevices((short) 1707),
304+
PipeAlterEncodingCompressor((short) 1708),
304305

305306
/** Subscription */
306307
CreateTopic((short) 1800),

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanVisitor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
2525
import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan;
2626
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
27+
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeAlterEncodingCompressorPlan;
2728
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTableOrViewPlan;
2829
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeactivateTemplatePlan;
2930
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteDevicesPlan;
@@ -189,6 +190,8 @@ public R process(final ConfigPhysicalPlan plan, final C context) {
189190
return visitRenameTable((RenameTablePlan) plan, context);
190191
case RenameView:
191192
return visitRenameView((RenameViewPlan) plan, context);
193+
case PipeAlterEncodingCompressor:
194+
return visitPipeAlterEncodingCompressor((PipeAlterEncodingCompressorPlan) plan, context);
192195
default:
193196
return visitPlan(plan, context);
194197
}
@@ -500,4 +503,9 @@ public R visitRenameTable(final RenameTablePlan renameTablePlan, final C context
500503
public R visitRenameView(final RenameViewPlan renameViewPlan, final C context) {
501504
return visitRenameTable(renameViewPlan, context);
502505
}
506+
507+
public R visitPipeAlterEncodingCompressor(
508+
final PipeAlterEncodingCompressorPlan pipeAlterEncodingCompressorPlan, final C context) {
509+
return visitPlan(pipeAlterEncodingCompressorPlan, context);
510+
}
503511
}

0 commit comments

Comments
 (0)