1+ import org.apache.doris.regression.suite.ClusterOptions
2+ import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
3+ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
4+ import software.amazon.awssdk.regions.Region
5+ import software.amazon.awssdk.services.kms.KmsClient
6+ import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionRequest
7+
8+ suite(" test_restart_when_rotating" , " docker" ) {
9+ def tdeAlgorithm = [" AES256" , " SM4" ]
10+ def cloudMode = [/* false,*/ true ]
11+ cloudMode. each { mode ->
12+ tdeAlgorithm. each { algorithm ->
13+ def options = new ClusterOptions ()
14+ options. cloudMode = mode
15+ options. enableDebugPoints()
16+ options. feConfigs + = [
17+ ' cloud_cluster_check_interval_second=1' ,
18+ ' sys_log_verbose_modules=org' ,
19+ " doris_tde_key_endpoint=${ context.config.tdeKeyEndpoint} " ,
20+ " doris_tde_key_region=${ context.config.tdeKeyRegion} " ,
21+ " doris_tde_key_provider=${ context.config.tdeKeyProvider} " ,
22+ " doris_tde_algorithm=${ algorithm} " ,
23+ " doris_tde_key_id=${ context.config.tdeKeyId} "
24+ ]
25+ options. tdeAk = context. config. tdeAk
26+ options. tdeSk = context. config. tdeSk
27+ options. enableDebugPoints()
28+
29+ docker(options) {
30+ def tblName = " test_restart_when_rotating"
31+ sql """ DROP TABLE IF EXISTS ${ tblName} """
32+ sql """
33+ CREATE TABLE IF NOT EXISTS ${ tblName} (
34+ `k` int NOT NULL,
35+ `v` varchar(10) NOT NULL)
36+ DUPLICATE KEY(`k`)
37+ DISTRIBUTED BY HASH(`k`) BUCKETS 8
38+ PROPERTIES (
39+ "replication_allocation" = "tag.location.default: 1"
40+ )
41+ """ ;
42+
43+ (1 .. 10 ). each { i ->
44+ sql """ INSERT INTO ${ tblName} VALUES (${ i} , "${ i} ") """
45+ }
46+ qt_sql """ SELECT * FROM ${ tblName} ORDER BY `k` """
47+
48+ def keys = sql """ SELECT * FROM information_schema.encryption_keys """
49+ {
50+ def credProvider = StaticCredentialsProvider . create(
51+ AwsBasicCredentials . create(context. config. tdeAk, context. config. tdeSk)
52+ );
53+ def client = KmsClient . builder()
54+ .region(Region . of(context. config. tdeKeyRegion))
55+ .endpointOverride(URI . create(context. config. tdeKeyEndpoint))
56+ .credentialsProvider(credProvider)
57+ .build();
58+
59+ def resp = client. createKey()
60+ def keyId = resp. keyMetadata(). keyId()
61+ try {
62+ GetDebugPoint (). enableDebugPointForAllFEs(" KeyManager.stopAfterOneMasterKeyChanged" )
63+ def t = Thread . start {
64+ try {
65+ sql """ ADMIN ROTATE TDE ROOT KEY PROPERTIES(
66+ "doris_tde_key_provider" = "aws_kms",
67+ "doris_tde_key_id" = "${ keyId} ",
68+ "doris_tde_key_endpoint" = "${ context.config.tdeKeyEndpoint} ",
69+ "doris_tde_key_region" = "${ context.config.tdeKeyRegion} "
70+ )
71+ """
72+ } catch (Exception ignored) {
73+ // do nothing
74+ }
75+ }
76+ sleep(3000 )
77+
78+ cluster. restartFrontends()
79+ sleep(30000 )
80+ context. reconnectFe()
81+
82+ (1 .. 10 ). each { i ->
83+ sql """ INSERT INTO ${ tblName} VALUES (${ i} , "${ i} ") """
84+ }
85+ qt_sql """ SELECT * FROM ${ tblName} ORDER BY `k` """
86+
87+ def newKeys = sql """ SELECT * FROM information_schema.encryption_keys """
88+ assertEquals (keys[0 ][6 ], newKeys[0 ][6 ])
89+ keys = newKeys
90+ } finally {
91+ // delete cmk id
92+ def deleteReq = ScheduleKeyDeletionRequest . builder(). keyId(keyId). build();
93+ client. scheduleKeyDeletion((ScheduleKeyDeletionRequest ) deleteReq)
94+ GetDebugPoint (). disableDebugPointForAllFEs(" KeyManager.stopAfterOneMasterKeyChanged" )
95+ }
96+ }
97+
98+ {
99+ def credProvider = StaticCredentialsProvider . create(
100+ AwsBasicCredentials . create(context. config. tdeAk, context. config. tdeSk)
101+ );
102+ def client = KmsClient . builder()
103+ .region(Region . of(context. config. tdeKeyRegion))
104+ .endpointOverride(URI . create(context. config. tdeKeyEndpoint))
105+ .credentialsProvider(credProvider)
106+ .build();
107+
108+ def resp = client. createKey()
109+ def keyId = resp. keyMetadata(). keyId()
110+ try {
111+ GetDebugPoint (). enableDebugPointForAllFEs(" KeyManager.stopAfterAllMasterKeyChanged" )
112+ def t = Thread . start {
113+ try {
114+ sql """ ADMIN ROTATE TDE ROOT KEY PROPERTIES(
115+ "doris_tde_key_provider" = "aws_kms",
116+ "doris_tde_key_id" = "${ keyId} ",
117+ "doris_tde_key_endpoint" = "${ context.config.tdeKeyEndpoint} ",
118+ "doris_tde_key_region" = "${ context.config.tdeKeyRegion} "
119+ )
120+ """
121+ } catch (Exception ignored) {
122+ // do nothing
123+ }
124+ }
125+ sleep(3000 )
126+
127+ cluster. restartFrontends()
128+ sleep(30000 )
129+ context. reconnectFe()
130+
131+ (1 .. 10 ). each { i ->
132+ sql """ INSERT INTO ${ tblName} VALUES (${ i} , "${ i} ") """
133+ }
134+ qt_sql """ SELECT * FROM ${ tblName} ORDER BY `k` """
135+
136+ def newKeys = sql """ SELECT * FROM information_schema.encryption_keys """
137+ assertEquals (keys[0 ][6 ], newKeys[0 ][6 ])
138+ keys = newKeys
139+ } finally {
140+ // delete cmk id
141+ def deleteReq = ScheduleKeyDeletionRequest . builder(). keyId(keyId). build();
142+ client. scheduleKeyDeletion((ScheduleKeyDeletionRequest ) deleteReq)
143+ GetDebugPoint (). disableDebugPointForAllFEs(" KeyManager.stopAfterAllMasterKeyChanged" )
144+ }
145+ }
146+
147+ {
148+ def credProvider = StaticCredentialsProvider . create(
149+ AwsBasicCredentials . create(context. config. tdeAk, context. config. tdeSk)
150+ );
151+ def client = KmsClient . builder()
152+ .region(Region . of(context. config. tdeKeyRegion))
153+ .endpointOverride(URI . create(context. config. tdeKeyEndpoint))
154+ .credentialsProvider(credProvider)
155+ .build();
156+
157+ def resp = client. createKey()
158+ def keyId = resp. keyMetadata(). keyId()
159+ try {
160+ GetDebugPoint (). enableDebugPointForAllFEs(" KeyManager.stopAfterRotateEditLogWritten" )
161+ def t = Thread . start {
162+ try {
163+ sql """ ADMIN ROTATE TDE ROOT KEY PROPERTIES(
164+ "doris_tde_key_provider" = "aws_kms",
165+ "doris_tde_key_id" = "${ keyId} ",
166+ "doris_tde_key_endpoint" = "${ context.config.tdeKeyEndpoint} ",
167+ "doris_tde_key_region" = "${ context.config.tdeKeyRegion} "
168+ )
169+ """
170+ } catch (Exception ignored) {
171+ // do nothing
172+ }
173+ }
174+ sleep(3000 )
175+
176+ cluster. restartFrontends()
177+ sleep(30000 )
178+ context. reconnectFe()
179+
180+ (1 .. 10 ). each { i ->
181+ sql """ INSERT INTO ${ tblName} VALUES (${ i} , "${ i} ") """
182+ }
183+ qt_sql """ SELECT * FROM ${ tblName} ORDER BY `k` """
184+
185+ def newKeys = sql """ SELECT * FROM information_schema.encryption_keys """
186+ assertNotEquals(keys[0 ][6 ], newKeys[0 ][6 ])
187+ } finally {
188+ // delete cmk id
189+ def deleteReq = ScheduleKeyDeletionRequest . builder(). keyId(keyId). build();
190+ client. scheduleKeyDeletion((ScheduleKeyDeletionRequest ) deleteReq)
191+ GetDebugPoint (). disableDebugPointForAllFEs(" KeyManager.stopAfterRotateEditLogWritten" )
192+ }
193+ }
194+
195+ qt_sql """ SELECT * FROM ${ tblName} ORDER BY `k` """
196+ }
197+ }
198+ }
199+ }
0 commit comments