|
| 1 | +// Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +// or more contributor license agreements. See the NOTICE file |
| 3 | +// distributed with this work for additional information |
| 4 | +// regarding copyright ownership. The ASF licenses this file |
| 5 | +// to you under the Apache License, Version 2.0 (the |
| 6 | +// "License"); you may not use this file except in compliance |
| 7 | +// with the License. You may obtain a copy of the License at |
| 8 | +// |
| 9 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +// |
| 11 | +// Unless required by applicable law or agreed to in writing, |
| 12 | +// software distributed under the License is distributed on an |
| 13 | +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +// KIND, either express or implied. See the License for the |
| 15 | +// specific language governing permissions and limitations |
| 16 | +// under the License. |
| 17 | + |
| 18 | +suite("test_variant_compaction_empty_path_bug", "nonConcurrent") { |
| 19 | + def tableName = "test_variant_empty_path_compaction" |
| 20 | + |
| 21 | + try { |
| 22 | + sql "DROP TABLE IF EXISTS ${tableName}" |
| 23 | + |
| 24 | + // Create table with variant column |
| 25 | + // Set variant_max_subcolumns_count to 3, so any columns beyond the top 3 will become sparse |
| 26 | + // This triggers the sparse column merge logic during compaction |
| 27 | + sql """ |
| 28 | + CREATE TABLE IF NOT EXISTS ${tableName} ( |
| 29 | + k bigint, |
| 30 | + v variant< properties("variant_max_subcolumns_count" = "3")> |
| 31 | + ) |
| 32 | + DUPLICATE KEY(`k`) |
| 33 | + DISTRIBUTED BY HASH(k) BUCKETS 1 |
| 34 | + properties("replication_num" = "1", "disable_auto_compaction" = "true"); |
| 35 | + """ |
| 36 | + |
| 37 | + logger.info("Testing variant compaction with empty path in sparse columns") |
| 38 | + |
| 39 | + // Insert data with multiple different subcolumns |
| 40 | + // Strategy: Insert 6+ different subcolumns to exceed the limit of 3 |
| 41 | + // The most frequently used 3 columns will be materialized, others will be sparse |
| 42 | + |
| 43 | + // First batch: establish column usage patterns |
| 44 | + sql """INSERT INTO ${tableName} VALUES |
| 45 | + (1, '{"a": 1, "b": 2, "c": 3}'), |
| 46 | + (2, '{"a": 10, "b": 20, "c": 30}'), |
| 47 | + (3, '{"a": 100, "b": 200, "c": 300}') |
| 48 | + """ |
| 49 | + |
| 50 | + // Second batch: introduce additional columns that will become sparse |
| 51 | + sql """INSERT INTO ${tableName} VALUES |
| 52 | + (4, '{"a": 1, "d": 4, "e": 5, "f": 6}'), |
| 53 | + (5, '{"b": 2, "d": 40, "e": 50, "f": 60}'), |
| 54 | + (6, '{"c": 3, "d": 400, "e": 500, "f": 600}') |
| 55 | + """ |
| 56 | + |
| 57 | + // Third batch: more sparse columns |
| 58 | + sql """INSERT INTO ${tableName} VALUES |
| 59 | + (7, '{"a": 7, "g": 70, "h": 700}'), |
| 60 | + (8, '{"b": 8, "g": 80, "h": 800}'), |
| 61 | + (9, '{"c": 9, "g": 90, "h": 900}') |
| 62 | + """ |
| 63 | + |
| 64 | + // Fourth batch: edge case - JSON with empty key |
| 65 | + // This creates a scenario where statistics might contain empty path |
| 66 | + sql """INSERT INTO ${tableName} VALUES |
| 67 | + (10, '{"": "empty_key_value", "a": 1000}'), |
| 68 | + (11, '{"": "empty_key_value2", "b": 2000}'), |
| 69 | + (12, '{"": "empty_key_value3", "c": 3000}') |
| 70 | + """ |
| 71 | + |
| 72 | + // Additional inserts to create more rowsets for compaction |
| 73 | + sql """INSERT INTO ${tableName} VALUES |
| 74 | + (13, '{"a": 13, "d": 130}'), |
| 75 | + (14, '{"b": 14, "e": 140}'), |
| 76 | + (15, '{"c": 15, "f": 150}') |
| 77 | + """ |
| 78 | + |
| 79 | + sql """INSERT INTO ${tableName} VALUES |
| 80 | + (16, '{"d": 16, "g": 160}'), |
| 81 | + (17, '{"e": 17, "h": 170}'), |
| 82 | + (18, '{"f": 18, "a": 180}') |
| 83 | + """ |
| 84 | + |
| 85 | + // Verify data before compaction |
| 86 | + def count_before = sql "SELECT COUNT(*) FROM ${tableName}" |
| 87 | + logger.info("Row count before compaction: ${count_before[0][0]}") |
| 88 | + assertEquals(18, count_before[0][0]) |
| 89 | + |
| 90 | + // Query to verify data integrity before compaction |
| 91 | + qt_before_compaction "SELECT k, cast(v as string) FROM ${tableName} ORDER BY k" |
| 92 | + |
| 93 | + // Test specific column access |
| 94 | + qt_col_a_before "SELECT k, v['a'] FROM ${tableName} WHERE v['a'] IS NOT NULL ORDER BY k" |
| 95 | + qt_col_d_before "SELECT k, v['d'] FROM ${tableName} WHERE v['d'] IS NOT NULL ORDER BY k" |
| 96 | + |
| 97 | + logger.info("Data inserted, now triggering compaction...") |
| 98 | + logger.info("Expected behavior: columns a,b,c materialized, d,e,f,g,h as sparse") |
| 99 | + logger.info("Bug scenario: if root node (empty path) is not skipped in _create_sparse_merge_reader") |
| 100 | + logger.info(" it will call VariantColumnReader::new_iterator with 3 params") |
| 101 | + logger.info(" which returns NOT_IMPLEMENTED_ERROR") |
| 102 | + |
| 103 | + // Trigger compaction - this may reproduce the NOT_IMPLEMENTED_ERROR bug |
| 104 | + def tablets = sql_return_maparray "SHOW TABLETS FROM ${tableName}" |
| 105 | + |
| 106 | + try { |
| 107 | + trigger_and_wait_compaction(tableName, "cumulative") |
| 108 | + logger.info("Compaction completed successfully") |
| 109 | + |
| 110 | + // Verify data after compaction |
| 111 | + def count_after = sql "SELECT COUNT(*) FROM ${tableName}" |
| 112 | + logger.info("Row count after compaction: ${count_after[0][0]}") |
| 113 | + assertEquals(18, count_after[0][0]) |
| 114 | + |
| 115 | + // Query to verify data integrity after compaction |
| 116 | + qt_after_compaction "SELECT k, cast(v as string) FROM ${tableName} ORDER BY k" |
| 117 | + |
| 118 | + // Test specific column access after compaction |
| 119 | + qt_col_a_after "SELECT k, v['a'] FROM ${tableName} WHERE v['a'] IS NOT NULL ORDER BY k" |
| 120 | + qt_col_d_after "SELECT k, v['d'] FROM ${tableName} WHERE v['d'] IS NOT NULL ORDER BY k" |
| 121 | + |
| 122 | + // Test empty key access if supported |
| 123 | + qt_empty_key "SELECT k, v[''] FROM ${tableName} WHERE v[''] IS NOT NULL ORDER BY k" |
| 124 | + |
| 125 | + } catch (Exception e) { |
| 126 | + logger.error("Compaction failed with error: ${e.getMessage()}", e) |
| 127 | + |
| 128 | + // Check if the error is the expected NOT_IMPLEMENTED_ERROR |
| 129 | + if (e.getMessage().contains("NOT_IMPLEMENTED_ERROR") || |
| 130 | + e.getMessage().contains("Not implemented")) { |
| 131 | + logger.error("BUG REPRODUCED: Compaction failed with NOT_IMPLEMENTED_ERROR") |
| 132 | + throw e |
| 133 | + } else { |
| 134 | + // Different error, rethrow |
| 135 | + throw e |
| 136 | + } |
| 137 | + } |
| 138 | + |
| 139 | + } finally { |
| 140 | + sql "DROP TABLE IF EXISTS ${tableName}" |
| 141 | + } |
| 142 | +} |
0 commit comments