|
13 | 13 | import lombok.Data; |
14 | 14 | import org.apache.kafka.clients.admin.ConfigEntry; |
15 | 15 | import org.apache.kafka.clients.admin.TopicDescription; |
| 16 | +import org.apache.kafka.common.TopicPartition; |
16 | 17 | import org.apache.kafka.common.config.TopicConfig; |
17 | 18 |
|
18 | 19 | @Data |
@@ -148,91 +149,25 @@ public static InternalTopic from(TopicDescription topicDescription, |
148 | 149 |
|
149 | 150 | public static InternalTopic from(ScrapedClusterState.TopicState topicState, |
150 | 151 | @Nullable String internalTopicPrefix) { |
151 | | - var topic = InternalTopic.builder(); |
152 | | - TopicDescription topicDescription = topicState.description(); |
153 | | - |
154 | | - internalTopicPrefix = internalTopicPrefix == null || internalTopicPrefix.isEmpty() |
155 | | - ? "_" |
156 | | - : internalTopicPrefix; |
157 | | - |
158 | | - topic.internal( |
159 | | - topicDescription.isInternal() || topicDescription.name().startsWith(internalTopicPrefix) |
| 152 | + Map<TopicPartition, InternalPartitionsOffsets.Offsets> offsets = |
| 153 | + topicState.description().partitions().stream().map(p -> Map.entry( |
| 154 | + new TopicPartition(topicState.name(), p.partition()), |
| 155 | + new InternalPartitionsOffsets.Offsets( |
| 156 | + topicState.startOffsets().get(p.partition()), |
| 157 | + topicState.endOffsets().get(p.partition()) |
| 158 | + ) |
| 159 | + ) |
| 160 | + ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); |
| 161 | + |
| 162 | + return from( |
| 163 | + topicState.description(), |
| 164 | + topicState.configs(), |
| 165 | + new InternalPartitionsOffsets(offsets), |
| 166 | + null, |
| 167 | + topicState.segmentStats(), |
| 168 | + topicState.partitionsSegmentStats(), |
| 169 | + internalTopicPrefix |
160 | 170 | ); |
161 | | - topic.name(topicDescription.name()); |
162 | | - |
163 | | - List<InternalPartition> partitions = topicDescription.partitions().stream() |
164 | | - .map(partition -> { |
165 | | - var partitionDto = InternalPartition.builder(); |
166 | | - |
167 | | - partitionDto.leader(partition.leader() != null ? partition.leader().id() : null); |
168 | | - partitionDto.partition(partition.partition()); |
169 | | - partitionDto.inSyncReplicasCount(partition.isr().size()); |
170 | | - partitionDto.replicasCount(partition.replicas().size()); |
171 | | - List<InternalReplica> replicas = partition.replicas().stream() |
172 | | - .map(r -> |
173 | | - InternalReplica.builder() |
174 | | - .broker(r.id()) |
175 | | - .inSync(partition.isr().contains(r)) |
176 | | - .leader(partition.leader() != null && partition.leader().id() == r.id()) |
177 | | - .build()) |
178 | | - .collect(Collectors.toList()); |
179 | | - partitionDto.replicas(replicas); |
180 | | - |
181 | | - Optional.ofNullable( |
182 | | - topicState.startOffsets().get(partition.partition()) |
183 | | - ).ifPresent(partitionDto::offsetMin); |
184 | | - |
185 | | - Optional.ofNullable( |
186 | | - topicState.endOffsets().get(partition.partition()) |
187 | | - ).ifPresent(partitionDto::offsetMax); |
188 | | - |
189 | | - Optional.ofNullable(topicState.partitionsSegmentStats()) |
190 | | - .flatMap(s -> Optional.ofNullable(s.get(partition.partition()))) |
191 | | - .ifPresent(stats -> { |
192 | | - partitionDto.segmentCount(stats.getSegmentsCount()); |
193 | | - partitionDto.segmentSize(stats.getSegmentSize()); |
194 | | - }); |
195 | | - |
196 | | - |
197 | | - return partitionDto.build(); |
198 | | - }) |
199 | | - .toList(); |
200 | | - |
201 | | - topic.partitions(partitions.stream().collect( |
202 | | - Collectors.toMap(InternalPartition::getPartition, t -> t))); |
203 | | - |
204 | | - var partitionsStats = new PartitionsStats(topicDescription); |
205 | | - topic.replicas(partitionsStats.getReplicasCount()); |
206 | | - topic.partitionCount(partitionsStats.getPartitionsCount()); |
207 | | - topic.inSyncReplicas(partitionsStats.getInSyncReplicasCount()); |
208 | | - topic.underReplicatedPartitions(partitionsStats.getUnderReplicatedPartitionCount()); |
209 | | - |
210 | | - topic.replicationFactor( |
211 | | - topicDescription.partitions().isEmpty() |
212 | | - ? 0 |
213 | | - : topicDescription.partitions().get(0).replicas().size() |
214 | | - ); |
215 | | - |
216 | | - Optional.ofNullable(topicState.segmentStats()) |
217 | | - .ifPresent(stats -> { |
218 | | - topic.segmentCount(stats.getSegmentsCount()); |
219 | | - topic.segmentSize(stats.getSegmentSize()); |
220 | | - }); |
221 | | - |
222 | | - topic.topicConfigs( |
223 | | - topicState.configs().stream().map(InternalTopicConfig::from).collect(Collectors.toList()) |
224 | | - ); |
225 | | - |
226 | | - topic.cleanUpPolicy( |
227 | | - topicState.configs().stream() |
228 | | - .filter(config -> config.name().equals(CLEANUP_POLICY_CONFIG)) |
229 | | - .findFirst() |
230 | | - .map(ConfigEntry::value) |
231 | | - .map(CleanupPolicy::fromString) |
232 | | - .orElse(CleanupPolicy.UNKNOWN) |
233 | | - ); |
234 | | - |
235 | | - return topic.build(); |
236 | 171 | } |
237 | 172 |
|
238 | 173 | public @Nullable Long getMessagesCount() { |
|
0 commit comments