Skip to content

Commit b54f0c1

Browse files
authored
KAFKA-17476 Delete kafka.common.OffsetAndMetadata (apache#17553)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 57abfc2 commit b54f0c1

File tree

11 files changed

+190
-239
lines changed

11 files changed

+190
-239
lines changed

core/src/main/scala/kafka/common/OffsetAndMetadata.scala

Lines changed: 0 additions & 48 deletions
This file was deleted.

core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package kafka.coordinator.group
1818

1919
import java.util.{OptionalInt, Properties}
2020
import java.util.concurrent.atomic.AtomicBoolean
21-
import kafka.common.OffsetAndMetadata
2221
import kafka.server._
2322
import kafka.utils.Logging
2423
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
@@ -32,7 +31,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
3231
import org.apache.kafka.common.record.RecordBatch
3332
import org.apache.kafka.common.requests._
3433
import org.apache.kafka.common.utils.Time
35-
import org.apache.kafka.coordinator.group.{Group, OffsetConfig}
34+
import org.apache.kafka.coordinator.group.{Group, OffsetAndMetadata, OffsetConfig}
3635
import org.apache.kafka.server.common.RequestLocal
3736
import org.apache.kafka.server.record.BrokerCompressionType
3837
import org.apache.kafka.storage.internals.log.VerificationGuard

core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package kafka.coordinator.group
1818

19-
import kafka.common.OffsetAndMetadata
2019
import kafka.server.{KafkaConfig, ReplicaManager}
2120
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
2221
import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupDescribeResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
@@ -26,13 +25,14 @@ import org.apache.kafka.common.record.RecordBatch
2625
import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext, TransactionResult}
2726
import org.apache.kafka.common.utils.{BufferSupplier, Time}
2827
import org.apache.kafka.coordinator.group
28+
import org.apache.kafka.coordinator.group.OffsetAndMetadata
2929
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
3030
import org.apache.kafka.server.common.RequestLocal
3131
import org.apache.kafka.server.util.FutureUtils
3232

3333
import java.time.Duration
3434
import java.util
35-
import java.util.{Optional, OptionalInt, Properties}
35+
import java.util.{Optional, OptionalInt, OptionalLong, Properties}
3636
import java.util.concurrent.CompletableFuture
3737
import java.util.function.IntSupplier
3838
import scala.collection.{immutable, mutable}
@@ -504,20 +504,23 @@ private[group] class GroupCoordinatorAdapter(
504504
expireTimestamp: Option[Long]
505505
): OffsetAndMetadata = {
506506
new OffsetAndMetadata(
507-
offset = offset,
508-
leaderEpoch = leaderEpoch match {
509-
case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer]
510-
case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch)
507+
offset,
508+
leaderEpoch match {
509+
case RecordBatch.NO_PARTITION_LEADER_EPOCH => OptionalInt.empty
510+
case committedLeaderEpoch => OptionalInt.of(committedLeaderEpoch)
511511
},
512-
metadata = metadata match {
513-
case null => OffsetAndMetadata.NoMetadata
512+
metadata match {
513+
case null => OffsetAndMetadata.NO_METADATA
514514
case metadata => metadata
515515
},
516-
commitTimestamp = commitTimestamp match {
516+
commitTimestamp match {
517517
case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs
518518
case customTimestamp => customTimestamp
519519
},
520-
expireTimestamp = expireTimestamp
520+
expireTimestamp match {
521+
case Some(timestamp) => OptionalLong.of(timestamp)
522+
case None => OptionalLong.empty()
523+
}
521524
)
522525
}
523526

core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,14 @@ package kafka.coordinator.group
1919
import java.nio.ByteBuffer
2020
import java.util.UUID
2121
import java.util.concurrent.locks.ReentrantLock
22-
23-
import kafka.common.OffsetAndMetadata
2422
import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
2523
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
2624
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
2725
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
2826
import org.apache.kafka.common.protocol.Errors
2927
import org.apache.kafka.common.protocol.types.SchemaException
3028
import org.apache.kafka.common.utils.Time
31-
import org.apache.kafka.coordinator.group.Group
29+
import org.apache.kafka.coordinator.group.{Group, OffsetAndMetadata}
3230

3331
import scala.collection.{Seq, immutable, mutable}
3432
import scala.jdk.CollectionConverters._
@@ -761,13 +759,12 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
761759
case (topicPartition, commitRecordMetadataAndOffset) =>
762760
!subscribedTopics.contains(topicPartition.topic()) &&
763761
!pendingOffsetCommits.contains(topicPartition) && {
764-
commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
765-
case None =>
766-
// current version with no per partition retention
767-
currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs
768-
case Some(expireTimestamp) =>
769-
// older versions with explicit expire_timestamp field => old expiration semantics is used
770-
currentTimestamp >= expireTimestamp
762+
if (commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestampMs.isEmpty) {
763+
// current version with no per partition retention
764+
currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs
765+
} else {
766+
// older versions with explicit expire_timestamp field => old expiration semantics is used
767+
currentTimestamp >= commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestampMs.getAsLong
771768
}
772769
}
773770
}.map {
@@ -785,7 +782,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
785782
// since the last commit timestamp, expire the offset
786783
getExpiredOffsets(
787784
commitRecordMetadataAndOffset => currentStateTimestamp
788-
.getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp)
785+
.getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestampMs)
789786
)
790787

791788
case Some(ConsumerProtocol.PROTOCOL_TYPE) if subscribedTopics.isDefined && is(Stable) =>
@@ -794,14 +791,14 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
794791
// the last commit timestamp, expire the offset. offset with pending offset commit are not
795792
// expired
796793
getExpiredOffsets(
797-
_.offsetAndMetadata.commitTimestamp,
794+
_.offsetAndMetadata.commitTimestampMs,
798795
subscribedTopics.get
799796
)
800797

801798
case None =>
802799
// protocolType is None => standalone (simple) consumer, that uses Kafka for offset storage only
803800
// expire offsets with no pending offset commit that retention period has passed since their last commit
804-
getExpiredOffsets(_.offsetAndMetadata.commitTimestamp)
801+
getExpiredOffsets(_.offsetAndMetadata.commitTimestampMs)
805802

806803
case _ =>
807804
Map()

core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818
package kafka.coordinator.group
1919

2020
import java.nio.ByteBuffer
21-
import java.util.{Optional, OptionalInt}
21+
import java.util.{Optional, OptionalInt, OptionalLong}
2222
import java.util.concurrent.atomic.AtomicBoolean
2323
import java.util.concurrent.locks.ReentrantLock
2424
import java.util.concurrent.ConcurrentHashMap
2525
import java.util.function.Supplier
2626
import com.yammer.metrics.core.Gauge
27-
import kafka.common.OffsetAndMetadata
2827
import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError
2928
import kafka.server.ReplicaManager
3029
import kafka.utils.CoreUtils.inLock
@@ -40,7 +39,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
4039
import org.apache.kafka.common.requests.{OffsetCommitRequest, OffsetFetchResponse}
4140
import org.apache.kafka.common.utils.{Time, Utils}
4241
import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
43-
import org.apache.kafka.coordinator.group.OffsetConfig
42+
import org.apache.kafka.coordinator.group.{OffsetAndMetadata, OffsetConfig}
4443
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
4544
import org.apache.kafka.server.common.{MetadataVersion, RequestLocal}
4645
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0}
@@ -523,8 +522,12 @@ class GroupMetadataManager(brokerId: Int,
523522
new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
524523
Optional.empty(), "", Errors.NONE)
525524
case Some(offsetAndMetadata) =>
526-
new PartitionData(offsetAndMetadata.offset,
527-
offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
525+
val leaderEpoch: Optional[Integer] = if (offsetAndMetadata.leaderEpoch.isPresent) {
526+
Optional.of(offsetAndMetadata.leaderEpoch.getAsInt)
527+
} else {
528+
Optional.empty()
529+
}
530+
new PartitionData(offsetAndMetadata.committedOffset, leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
528531
}
529532
topicPartition -> partitionData
530533
}
@@ -1087,18 +1090,18 @@ object GroupMetadataManager {
10871090
def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
10881091
metadataVersion: MetadataVersion): Array[Byte] = {
10891092
val version =
1090-
if (metadataVersion.isLessThan(IBP_2_1_IV0) || offsetAndMetadata.expireTimestamp.nonEmpty) 1.toShort
1093+
if (metadataVersion.isLessThan(IBP_2_1_IV0) || offsetAndMetadata.expireTimestampMs.isPresent) 1.toShort
10911094
else if (metadataVersion.isLessThan(IBP_2_1_IV1)) 2.toShort
10921095
// Serialize with the highest supported non-flexible version
10931096
// until a tagged field is introduced or the version is bumped.
10941097
else 3.toShort
10951098
MessageUtil.toVersionPrefixedBytes(version, new OffsetCommitValue()
1096-
.setOffset(offsetAndMetadata.offset)
1099+
.setOffset(offsetAndMetadata.committedOffset)
10971100
.setMetadata(offsetAndMetadata.metadata)
1098-
.setCommitTimestamp(offsetAndMetadata.commitTimestamp)
1101+
.setCommitTimestamp(offsetAndMetadata.commitTimestampMs)
10991102
.setLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
11001103
// version 1 has a non empty expireTimestamp field
1101-
.setExpireTimestamp(offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
1104+
.setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
11021105
)
11031106
}
11041107

@@ -1179,12 +1182,12 @@ object GroupMetadataManager {
11791182
val version = buffer.getShort
11801183
if (version >= OffsetCommitValue.LOWEST_SUPPORTED_VERSION && version <= OffsetCommitValue.HIGHEST_SUPPORTED_VERSION) {
11811184
val value = new OffsetCommitValue(new ByteBufferAccessor(buffer), version)
1182-
OffsetAndMetadata(
1183-
offset = value.offset,
1184-
leaderEpoch = if (value.leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH) Optional.empty() else Optional.of(value.leaderEpoch),
1185-
metadata = value.metadata,
1186-
commitTimestamp = value.commitTimestamp,
1187-
expireTimestamp = if (value.expireTimestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP) None else Some(value.expireTimestamp))
1185+
new OffsetAndMetadata(
1186+
value.offset,
1187+
if (value.leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH) OptionalInt.empty() else OptionalInt.of(value.leaderEpoch),
1188+
value.metadata,
1189+
value.commitTimestamp,
1190+
if (value.expireTimestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP) OptionalLong.empty() else OptionalLong.of(value.expireTimestamp))
11881191
} else throw new IllegalStateException(s"Unknown offset message version: $version")
11891192
}
11901193
}

core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package kafka.coordinator.group
1818

19-
import kafka.common.OffsetAndMetadata
2019
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback}
2120
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
2221
import org.apache.kafka.common.errors.{InvalidGroupIdException, UnsupportedVersionException}
@@ -31,6 +30,7 @@ import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, Re
3130
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
3231
import org.apache.kafka.common.utils.{BufferSupplier, Time}
3332
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
33+
import org.apache.kafka.coordinator.group.OffsetAndMetadata
3434
import org.apache.kafka.server.common.RequestLocal
3535
import org.apache.kafka.server.util.MockTime
3636
import org.apache.kafka.test.TestUtils.assertFutureThrows
@@ -41,7 +41,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
4141
import org.mockito.Mockito.{mock, verify, when}
4242

4343
import java.net.InetAddress
44-
import java.util.Optional
44+
import java.util.{Optional, OptionalInt, OptionalLong}
4545
import scala.jdk.CollectionConverters._
4646

4747
class GroupCoordinatorAdapterTest {
@@ -696,11 +696,11 @@ class GroupCoordinatorAdapterTest {
696696
ArgumentMatchers.eq(data.generationIdOrMemberEpoch),
697697
ArgumentMatchers.eq(Map(
698698
new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new OffsetAndMetadata(
699-
offset = 100,
700-
leaderEpoch = Optional.of[Integer](1),
701-
metadata = "",
702-
commitTimestamp = now,
703-
expireTimestamp = Some(now + 1000L)
699+
100,
700+
OptionalInt.of(1),
701+
"",
702+
now,
703+
OptionalLong.of(now + 1000L)
704704
)
705705
)),
706706
capturedCallback.capture(),
@@ -769,11 +769,11 @@ class GroupCoordinatorAdapterTest {
769769
ArgumentMatchers.eq(data.generationId),
770770
ArgumentMatchers.eq(Map(
771771
new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new OffsetAndMetadata(
772-
offset = 100,
773-
leaderEpoch = Optional.of[Integer](1),
774-
metadata = "",
775-
commitTimestamp = now,
776-
expireTimestamp = None
772+
100,
773+
OptionalInt.of(1),
774+
"",
775+
now,
776+
OptionalLong.empty()
777777
)
778778
)),
779779
capturedCallback.capture(),

0 commit comments

Comments
 (0)