|
8 | 8 | #include <IO/ReadBufferFromString.h> |
9 | 9 | #include <Common/Arena.h> |
10 | 10 | #include <Common/SipHash.h> |
| 11 | +#include <Common/logger_useful.h> |
11 | 12 |
|
12 | 13 | namespace DB |
13 | 14 | { |
@@ -728,10 +729,8 @@ void ColumnObject::insertFromSharedDataAndFillRemainingDynamicPaths(const DB::Co |
728 | 729 | { |
729 | 730 | /// Deserialize binary value into dynamic column from shared data. |
730 | 731 | if (it->second->size() != current_size) |
731 | | - { |
732 | | - src_object_column.validateDynamicPathsAndSharedData(); |
733 | 732 | throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected size of dynamic path {}: {} != {}", path, it->second->size(), current_size); |
734 | | - } |
| 733 | + |
735 | 734 | deserializeValueFromSharedData(src_shared_data_values, i, *it->second); |
736 | 735 | } |
737 | 736 | else if (auto * dynamic_path_column = tryToAddNewDynamicPath(path)) |
@@ -2070,28 +2069,121 @@ int ColumnObject::doCompareAt(size_t n, size_t m, const IColumn & rhs, int nan_d |
2070 | 2069 | return 1; |
2071 | 2070 | } |
2072 | 2071 |
|
2073 | | -void ColumnObject::validateDynamicPathsAndSharedData(size_t shared_data_offset) const |
| 2072 | +void ColumnObject::repairDuplicatesInDynamicPathsAndSharedData(size_t offset) |
2074 | 2073 | { |
2075 | 2074 | if (dynamic_paths.empty()) |
2076 | 2075 | return; |
2077 | 2076 |
|
| 2077 | + /// First, check if all dynamic paths have correct sizes, just in case. |
2078 | 2078 | size_t expected_size = shared_data->size(); |
2079 | 2079 | for (const auto & [path, column] : dynamic_paths) |
2080 | 2080 | { |
2081 | 2081 | if (column->size() != expected_size) |
2082 | 2082 | throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected size of dynamic path {}: {} != {}", path, column->size(), expected_size); |
2083 | 2083 | } |
2084 | 2084 |
|
| 2085 | + /// Second, iterate over paths in shared data and check if we have any path that is also present in dynamic paths. |
2085 | 2086 | const auto & shared_data_offsets = getSharedDataOffsets(); |
2086 | | - const auto [shared_data_paths, _] = getSharedDataPathsAndValues(); |
2087 | | - size_t shared_data_paths_start = shared_data_offsets[ssize_t(shared_data_offset) - 1]; |
2088 | | - size_t shared_data_paths_end = shared_data_offsets.back(); |
2089 | | - for (size_t i = shared_data_paths_start; i != shared_data_paths_end; ++i) |
| 2087 | + const auto [shared_data_paths, shared_data_values] = getSharedDataPathsAndValues(); |
| 2088 | + /// Remember the first row with duplicates if any. We will start repair from this row. |
| 2089 | + std::optional<size_t> first_row_with_duplicates = std::nullopt; |
| 2090 | + size_t size = shared_data_offsets.size(); |
| 2091 | + for (size_t i = offset; i < size; ++i) |
| 2092 | + { |
| 2093 | + size_t shared_data_start = shared_data_offsets[i - 1]; |
| 2094 | + size_t shared_data_end = shared_data_offsets[i]; |
| 2095 | + for (size_t j = shared_data_start; j < shared_data_end; ++j) |
| 2096 | + { |
| 2097 | + if (dynamic_paths.contains(shared_data_paths->getDataAt(j))) |
| 2098 | + { |
| 2099 | + /// Duplicate is found, no need to iterate further, we need to start repair. |
| 2100 | + first_row_with_duplicates = i; |
| 2101 | + break; |
| 2102 | + } |
| 2103 | + } |
| 2104 | + |
| 2105 | + if (first_row_with_duplicates) |
| 2106 | + break; |
| 2107 | + } |
| 2108 | + |
| 2109 | + if (!first_row_with_duplicates) |
| 2110 | + return; |
| 2111 | + |
| 2112 | + LOG_TRACE(getLogger("ColumnObject"), "Repair duplicates in Object column starting from row {}", *first_row_with_duplicates); |
| 2113 | + |
| 2114 | + /// During repair we create new shared data without duplicated dynamic paths |
| 2115 | + /// update corresponding dynamic paths with values from shared data. |
| 2116 | + auto new_shared_data = shared_data->cloneResized(*first_row_with_duplicates); |
| 2117 | + const auto [new_shared_data_paths, new_shared_data_values, new_shared_data_offsets] = getSharedDataPathsValuesAndOffsets(*new_shared_data); |
| 2118 | + new_shared_data_offsets->reserve(size); |
| 2119 | + PathToColumnMap new_dynamic_paths; |
| 2120 | + for (size_t i = *first_row_with_duplicates; i < size; ++i) |
2090 | 2121 | { |
2091 | | - auto path = shared_data_paths->getDataAt(i); |
2092 | | - if (dynamic_paths.contains(path)) |
2093 | | - throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is present both in dynamic paths and in shared data", path.toString()); |
| 2122 | + size_t shared_data_start = shared_data_offsets[i - 1]; |
| 2123 | + size_t shared_data_end = shared_data_offsets[i]; |
| 2124 | + for (size_t j = shared_data_start; j < shared_data_end; ++j) |
| 2125 | + { |
| 2126 | + auto path = shared_data_paths->getDataAt(j).toView(); |
| 2127 | + auto it = dynamic_paths.find(path); |
| 2128 | + if (it == dynamic_paths.end()) |
| 2129 | + { |
| 2130 | + new_shared_data_paths->insertFrom(*shared_data_paths, j); |
| 2131 | + new_shared_data_values->insertFrom(*shared_data_values, j); |
| 2132 | + } |
| 2133 | + /// We update dynamic path with value from shared data only if dynamic path has NULL at this row. |
| 2134 | + else if (it->second->isNullAt(i)) |
| 2135 | + { |
| 2136 | + auto new_it = new_dynamic_paths.find(path); |
| 2137 | + if (new_it == new_dynamic_paths.end()) |
| 2138 | + { |
| 2139 | + new_it = new_dynamic_paths.emplace(path, it->second->cloneResized(i)).first; |
| 2140 | + new_it->second->reserve(size); |
| 2141 | + } |
| 2142 | + |
| 2143 | + deserializeValueFromSharedData(shared_data_values, j, *new_it->second); |
| 2144 | + } |
| 2145 | + /// Situation when both values in dynamic path and shared data are non-NULL should not be possible |
| 2146 | + /// and we cannot repair it anyhow. Throw logical error exception in this case. |
| 2147 | + else |
| 2148 | + { |
| 2149 | + auto value = shared_data_values->getDataAt(j).toView(); |
| 2150 | + ReadBufferFromMemory buf(value.data(), value.size()); |
| 2151 | + auto type_from_shared_data = decodeDataType(buf); |
| 2152 | + if (!isNothing(type_from_shared_data)) |
| 2153 | + { |
| 2154 | + auto type_from_dynamic_path = dynamic_paths_ptrs.find(path)->second->getTypeAt(i); |
| 2155 | + throw Exception( |
| 2156 | + ErrorCodes::LOGICAL_ERROR, |
| 2157 | + "Path {} is present both in dynamic paths and shared data and has two non-null values at the row {}." |
| 2158 | + "Value type in dynamic paths: {}. Value type in shared data: {}", |
| 2159 | + path, |
| 2160 | + i, |
| 2161 | + type_from_dynamic_path->getName(), |
| 2162 | + type_from_shared_data->getName()); |
| 2163 | + } |
| 2164 | + } |
| 2165 | + } |
| 2166 | + |
| 2167 | + new_shared_data_offsets->push_back(new_shared_data_paths->size()); |
| 2168 | + /// Update new dynamic paths that were not updated with value from shared data in this row. |
| 2169 | + for (auto & [path, column] : new_dynamic_paths) |
| 2170 | + { |
| 2171 | + if (column->size() == i) |
| 2172 | + column->insertFrom(*dynamic_paths.at(path), i); |
| 2173 | + } |
2094 | 2174 | } |
| 2175 | + |
| 2176 | + for (auto & [path, column] : new_dynamic_paths) |
| 2177 | + { |
| 2178 | + if (column->size() != size) |
| 2179 | + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected size of new dynamic path {}: {} != {}", path, column->size(), size); |
| 2180 | + dynamic_paths_ptrs[path] = assert_cast<ColumnDynamic *>(column.get()); |
| 2181 | + dynamic_paths[path] = std::move(column); |
| 2182 | + } |
| 2183 | + |
| 2184 | + if (new_shared_data->size() != size) |
| 2185 | + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected size of new shared data: {} != {}", new_shared_data->size(), size); |
| 2186 | + shared_data = std::move(new_shared_data); |
2095 | 2187 | } |
2096 | 2188 |
|
2097 | 2189 | } |
0 commit comments