|
8 | 8 | import java.util.List; |
9 | 9 | import java.util.Map; |
10 | 10 | import java.util.Optional; |
| 11 | +import java.util.function.Function; |
11 | 12 | import java.util.stream.Collectors; |
12 | 13 |
|
13 | 14 | import org.apache.commons.lang3.StringUtils; |
14 | 15 | import org.apache.kafka.clients.consumer.ConsumerRecord; |
15 | 16 | import org.apache.kafka.clients.consumer.ConsumerRecords; |
16 | 17 | import org.apache.kafka.clients.consumer.KafkaConsumer; |
| 18 | +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; |
17 | 19 | import org.apache.kafka.common.TopicPartition; |
18 | 20 | import org.apache.kafka.common.utils.Bytes; |
19 | 21 | import org.springframework.stereotype.Service; |
@@ -101,8 +103,9 @@ private static class RecordEmitter { |
101 | 103 |
|
102 | 104 | public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) { |
103 | 105 | try (KafkaConsumer<Bytes, Bytes> consumer = kafkaService.createConsumer(cluster)) { |
104 | | - assignPartitions(consumer); |
105 | | - seekOffsets(consumer); |
| 106 | +// assignPartitions(consumer); |
| 107 | +// seekOffsets(consumer); |
| 108 | + assignAndSeek(consumer); |
106 | 109 | int pollsCount = 0; |
107 | 110 | while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) { |
108 | 111 | ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS); |
@@ -131,39 +134,55 @@ private List<TopicPartition> getRequestedPartitions() { |
131 | 134 | .collect(Collectors.toList()); |
132 | 135 | } |
133 | 136 |
|
134 | | - private void assignPartitions(KafkaConsumer<Bytes, Bytes> consumer) { |
135 | | - List<TopicPartition> partitions = getRequestedPartitions(); |
136 | | - |
137 | | - consumer.assign(partitions); |
138 | | - } |
139 | | - |
140 | | - private void seekOffsets(KafkaConsumer<Bytes, Bytes> consumer) { |
| 137 | + private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) { |
141 | 138 | SeekType seekType = consumerPosition.getSeekType(); |
142 | 139 | switch (seekType) { |
143 | 140 | case OFFSET: |
144 | | - consumerPosition.getSeekTo().forEach((partition, offset) -> { |
145 | | - TopicPartition topicPartition = new TopicPartition(topic, partition); |
146 | | - consumer.seek(topicPartition, offset); |
147 | | - }); |
| 141 | + assignAndSeekForOffset(consumer); |
148 | 142 | break; |
149 | 143 | case TIMESTAMP: |
150 | | - Map<TopicPartition, Long> timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream() |
151 | | - .collect(Collectors.toMap( |
152 | | - partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()), |
153 | | - Map.Entry::getValue |
154 | | - )); |
155 | | - consumer.offsetsForTimes(timestampsToSearch) |
156 | | - .forEach((topicPartition, offsetAndTimestamp) -> |
157 | | - consumer.seek(topicPartition, offsetAndTimestamp.offset()) |
158 | | - ); |
| 144 | + assignAndSeekForTimestamp(consumer); |
159 | 145 | break; |
160 | 146 | case BEGINNING: |
161 | | - List<TopicPartition> partitions = getRequestedPartitions(); |
162 | | - consumer.seekToBeginning(partitions); |
| 147 | + assignAndSeekFromBeginning(consumer); |
163 | 148 | break; |
164 | 149 | default: |
165 | 150 | throw new IllegalArgumentException("Unknown seekType: " + seekType); |
166 | 151 | } |
167 | 152 | } |
| 153 | + |
| 154 | + private void assignAndSeekForOffset(KafkaConsumer<Bytes, Bytes> consumer) { |
| 155 | + List<TopicPartition> partitions = getRequestedPartitions(); |
| 156 | + consumer.assign(partitions); |
| 157 | + consumerPosition.getSeekTo().forEach((partition, offset) -> { |
| 158 | + TopicPartition topicPartition = new TopicPartition(topic, partition); |
| 159 | + consumer.seek(topicPartition, offset); |
| 160 | + }); |
| 161 | + } |
| 162 | + |
| 163 | + private void assignAndSeekForTimestamp(KafkaConsumer<Bytes, Bytes> consumer) { |
| 164 | + Map<TopicPartition, Long> timestampsToSearch = consumerPosition.getSeekTo().entrySet().stream() |
| 165 | + .collect(Collectors.toMap( |
| 166 | + partitionPosition -> new TopicPartition(topic, partitionPosition.getKey()), |
| 167 | + Map.Entry::getValue |
| 168 | + )); |
| 169 | + Map<TopicPartition, Long> offsetsForTimestamps = consumer.offsetsForTimes(timestampsToSearch) |
| 170 | + .entrySet().stream() |
| 171 | + .filter(e -> e.getValue() != null) |
| 172 | + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); |
| 173 | + |
| 174 | + if (offsetsForTimestamps.isEmpty()) { |
| 175 | + throw new IllegalArgumentException("No offsets were found for requested timestamps"); |
| 176 | + } |
| 177 | + |
| 178 | + consumer.assign(offsetsForTimestamps.keySet()); |
| 179 | + offsetsForTimestamps.forEach(consumer::seek); |
| 180 | + } |
| 181 | + |
| 182 | + private void assignAndSeekFromBeginning(KafkaConsumer<Bytes, Bytes> consumer) { |
| 183 | + List<TopicPartition> partitions = getRequestedPartitions(); |
| 184 | + consumer.assign(partitions); |
| 185 | + consumer.seekToBeginning(partitions); |
| 186 | + } |
168 | 187 | } |
169 | 188 | } |
0 commit comments