Skip to content

Commit 56c7a99

Browse files
committed
add test for partitioned table
1 parent 686e9b0 commit 56c7a99

File tree

1 file changed

+182
-1
lines changed

1 file changed

+182
-1
lines changed

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/LiquidClusterProcedureTest.scala

Lines changed: 182 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class LiquidClusterProcedureTest extends PaimonSparkTestBase with StreamTest {
103103

104104
var clusteredTable = loadTable("T")
105105
checkSnapshot(clusteredTable)
106-
var dataSplits = clusteredTable.newSnapshotReader().read().dataSplits();
106+
var dataSplits = clusteredTable.newSnapshotReader().read().dataSplits()
107107
Assertions.assertThat(dataSplits.size()).isEqualTo(1)
108108
Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(1)
109109
Assertions.assertThat(dataSplits.get(0).dataFiles().get(0).level()).isEqualTo(5)
@@ -183,6 +183,187 @@ class LiquidClusterProcedureTest extends PaimonSparkTestBase with StreamTest {
183183
}
184184
}
185185

186+
test("Paimon Procedure: cluster for partitioned table") {
187+
failAfter(streamingTimeout) {
188+
withTempDir {
189+
checkpointDir =>
190+
spark.sql(
191+
s"""
192+
|CREATE TABLE T (a INT, b INT, c STRING, pt INT)
193+
|PARTITIONED BY (pt)
194+
|TBLPROPERTIES ('bucket'='-1', 'num-levels'='6', 'num-sorted-run.compaction-trigger'='2', 'liquid-clustering.columns'='a,b', 'clustering.strategy'='zorder')
195+
|""".stripMargin)
196+
val location = loadTable("T").location().toString
197+
198+
val inputData = MemoryStream[(Int, Int, String, Int)]
199+
val stream = inputData
200+
.toDS()
201+
.toDF("a", "b", "c", "pt")
202+
.writeStream
203+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
204+
.foreachBatch {
205+
(batch: Dataset[Row], _: Long) =>
206+
batch.write.format("paimon").mode("append").save(location)
207+
}
208+
.start()
209+
210+
val query = () => spark.sql("SELECT * FROM T ORDER BY pt")
211+
212+
try {
213+
val random = new Random()
214+
val randomStr = random.nextString(50)
215+
// first write
216+
for (pt <- 0 until 2) {
217+
val c = if (pt == 0) randomStr else null
218+
inputData.addData((0, 0, c, pt))
219+
inputData.addData((0, 1, c, pt))
220+
inputData.addData((0, 2, c, pt))
221+
inputData.addData((1, 0, c, pt))
222+
inputData.addData((1, 1, c, pt))
223+
inputData.addData((1, 2, c, pt))
224+
inputData.addData((2, 0, c, pt))
225+
inputData.addData((2, 1, c, pt))
226+
inputData.addData((2, 2, c, pt))
227+
}
228+
stream.processAllAvailable()
229+
230+
val result = new util.ArrayList[Row]()
231+
for (pt <- 0 until 2) {
232+
for (a <- 0 until 3) {
233+
for (b <- 0 until 3) {
234+
val c = if (pt == 0) randomStr else null
235+
result.add(Row(a, b, c, pt))
236+
}
237+
}
238+
}
239+
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result)
240+
241+
// first cluster, the outputLevel should be 5
242+
checkAnswer(spark.sql("CALL paimon.sys.cluster(table => 'T')"), Row(true) :: Nil)
243+
244+
// first cluster result
245+
val result2 = new util.ArrayList[Row]()
246+
for (pt <- 0 until 2) {
247+
val c = if (pt == 0) randomStr else null
248+
result2.add(Row(0, 0, c, pt))
249+
result2.add(Row(0, 1, c, pt))
250+
result2.add(Row(1, 0, c, pt))
251+
result2.add(Row(1, 1, c, pt))
252+
result2.add(Row(0, 2, c, pt))
253+
result2.add(Row(1, 2, c, pt))
254+
result2.add(Row(2, 0, c, pt))
255+
result2.add(Row(2, 1, c, pt))
256+
result2.add(Row(2, 2, c, pt))
257+
}
258+
259+
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2)
260+
261+
var clusteredTable = loadTable("T")
262+
checkSnapshot(clusteredTable)
263+
var dataSplits = clusteredTable.newSnapshotReader().read().dataSplits()
264+
Assertions.assertThat(dataSplits.size()).isEqualTo(2)
265+
dataSplits.forEach(
266+
dataSplit => {
267+
Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(1)
268+
Assertions.assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5)
269+
})
270+
271+
// second write
272+
for (pt <- 0 until 2) {
273+
inputData.addData((0, 3, null, pt), (1, 3, null, pt), (2, 3, null, pt))
274+
inputData.addData(
275+
(3, 0, null, pt),
276+
(3, 1, null, pt),
277+
(3, 2, null, pt),
278+
(3, 3, null, pt))
279+
}
280+
stream.processAllAvailable()
281+
282+
val result3 = new util.ArrayList[Row]()
283+
for (pt <- 0 until 2) {
284+
val c = if (pt == 0) randomStr else null
285+
result3.add(Row(0, 0, c, pt))
286+
result3.add(Row(0, 1, c, pt))
287+
result3.add(Row(1, 0, c, pt))
288+
result3.add(Row(1, 1, c, pt))
289+
result3.add(Row(0, 2, c, pt))
290+
result3.add(Row(1, 2, c, pt))
291+
result3.add(Row(2, 0, c, pt))
292+
result3.add(Row(2, 1, c, pt))
293+
result3.add(Row(2, 2, c, pt))
294+
for (a <- 0 until 3) {
295+
result3.add(Row(a, 3, null, pt))
296+
}
297+
for (b <- 0 until 4) {
298+
result3.add(Row(3, b, null, pt))
299+
}
300+
}
301+
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3)
302+
303+
// second cluster
304+
checkAnswer(spark.sql("CALL paimon.sys.cluster(table => 'T')"), Row(true) :: Nil)
305+
val result4 = new util.ArrayList[Row]()
306+
// for partition-0: only file in level-0 will be picked for clustering, outputLevel is 4
307+
result4.add(Row(0, 0, randomStr, 0))
308+
result4.add(Row(0, 1, randomStr, 0))
309+
result4.add(Row(1, 0, randomStr, 0))
310+
result4.add(Row(1, 1, randomStr, 0))
311+
result4.add(Row(0, 2, randomStr, 0))
312+
result4.add(Row(1, 2, randomStr, 0))
313+
result4.add(Row(2, 0, randomStr, 0))
314+
result4.add(Row(2, 1, randomStr, 0))
315+
result4.add(Row(2, 2, randomStr, 0))
316+
result4.add(Row(0, 3, null, 0))
317+
result4.add(Row(1, 3, null, 0))
318+
result4.add(Row(3, 0, null, 0))
319+
result4.add(Row(3, 1, null, 0))
320+
result4.add(Row(2, 3, null, 0))
321+
result4.add(Row(3, 2, null, 0))
322+
result4.add(Row(3, 3, null, 0))
323+
// for partition-1:all files will be picked for clustering, outputLevel is 5
324+
result4.add(Row(0, 0, null, 1))
325+
result4.add(Row(0, 1, null, 1))
326+
result4.add(Row(1, 0, null, 1))
327+
result4.add(Row(1, 1, null, 1))
328+
result4.add(Row(0, 2, null, 1))
329+
result4.add(Row(0, 3, null, 1))
330+
result4.add(Row(1, 2, null, 1))
331+
result4.add(Row(1, 3, null, 1))
332+
result4.add(Row(2, 0, null, 1))
333+
result4.add(Row(2, 1, null, 1))
334+
result4.add(Row(3, 0, null, 1))
335+
result4.add(Row(3, 1, null, 1))
336+
result4.add(Row(2, 2, null, 1))
337+
result4.add(Row(2, 3, null, 1))
338+
result4.add(Row(3, 2, null, 1))
339+
result4.add(Row(3, 3, null, 1))
340+
341+
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result4)
342+
343+
clusteredTable = loadTable("T")
344+
checkSnapshot(clusteredTable)
345+
dataSplits = clusteredTable.newSnapshotReader().read().dataSplits()
346+
Assertions.assertThat(dataSplits.size()).isEqualTo(2)
347+
dataSplits.forEach(
348+
dataSplit => {
349+
if (dataSplit.partition().getInt(0) == 1) {
350+
// partition-1
351+
Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(1)
352+
Assertions.assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5)
353+
} else {
354+
// partition-0
355+
Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(2)
356+
Assertions.assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5)
357+
Assertions.assertThat(dataSplit.dataFiles().get(1).level()).isEqualTo(4)
358+
}
359+
})
360+
} finally {
361+
stream.stop()
362+
}
363+
}
364+
}
365+
}
366+
186367
def checkSnapshot(table: FileStoreTable): Unit = {
187368
Assertions
188369
.assertThat(table.latestSnapshot().get().commitKind().toString)

0 commit comments

Comments
 (0)