@@ -30,38 +30,29 @@ import java.util.concurrent.ConcurrentMap
30
30
* when they overlap or are adjacent. When merging offsets, last processing time is taken as the
31
31
* maximum of all the merged processing times.
32
32
*/
33
- class OffsetRangeSet {
34
- private val factory: (OffsetIntervals ) -> FunctionalValue <OffsetIntervals >
35
- private val ranges: ConcurrentMap <TopicPartition , FunctionalValue <OffsetIntervals >>
33
+ class OffsetRangeSet (
34
+ private val ranges : ConcurrentMap <TopicPartition , FunctionalValue <OffsetIntervals >>,
35
+ private val factory : (OffsetIntervals ) -> FunctionalValue <OffsetIntervals >,
36
+ ) {
36
37
37
38
/* * Whether the stored offsets is empty. */
38
39
val isEmpty: Boolean
39
40
get() = ranges.isEmpty()
40
41
41
- @JvmOverloads
42
42
constructor (
43
43
factory: (OffsetIntervals ) -> FunctionalValue <OffsetIntervals > = { LockedFunctionalValue (it) },
44
- ) {
45
- this .ranges = ConcurrentHashMap ()
46
- this .factory = factory
47
- }
48
-
49
- private constructor (
50
- ranges: ConcurrentMap <TopicPartition , FunctionalValue <OffsetIntervals >>,
51
- factory: (OffsetIntervals ) -> FunctionalValue <OffsetIntervals >,
52
- ) {
53
- this .ranges = ranges
54
- this .factory = factory
55
- }
44
+ ) : this (ConcurrentHashMap (), factory)
56
45
57
46
/* * Add given offset range to seen offsets. */
58
47
fun add (range : TopicPartitionOffsetRange ) {
59
48
range.topicPartition.modifyIntervals { it.add(range.range) }
60
49
}
61
50
62
51
/* * Add given single offset to seen offsets. */
63
- fun add (topicPartition : TopicPartition , offset : Long , lastModified : Instant ) {
64
- topicPartition.modifyIntervals { it.add(offset, lastModified) }
52
+ fun add (transaction : Accountant .Transaction ) {
53
+ transaction.run {
54
+ topicPartition.modifyIntervals { it.add(offset, lastModified) }
55
+ }
65
56
}
66
57
67
58
/* * Add all offset stream of given set to the current set. */
@@ -81,23 +72,18 @@ class OffsetRangeSet {
81
72
}
82
73
83
74
/* * Whether this range value completely contains the given range. */
84
- operator fun contains (range : TopicPartitionOffsetRange ): Boolean {
85
- return range.topicPartition.readIntervals { it.contains(range.range) }
86
- }
75
+ operator fun contains (range : TopicPartitionOffsetRange ): Boolean =
76
+ range.topicPartition.readIntervals { it.contains(range.range) }
87
77
88
78
/* * Whether this range value completely contains the given range. */
89
- fun contains (partition : TopicPartition , offset : Long , lastModified : Instant ): Boolean {
90
- return partition.readIntervals { it.contains(offset, lastModified) }
91
- }
79
+ fun contains (partition : TopicPartition , offset : Long , lastModified : Instant ): Boolean =
80
+ partition.readIntervals { it.contains(offset, lastModified) }
92
81
93
82
/* * Number of distinct offsets in given topic/partition. */
94
- fun size (topicPartition : TopicPartition ): Int {
95
- return topicPartition.readIntervals { it.size() }
96
- }
83
+ fun size (topicPartition : TopicPartition ): Int = topicPartition.readIntervals { it.size() }
97
84
98
- fun remove (range : TopicPartitionOffsetRange ) {
99
- return range.topicPartition.modifyIntervals { it.remove(range.range) }
100
- }
85
+ fun remove (range : TopicPartitionOffsetRange ) =
86
+ range.topicPartition.modifyIntervals { it.remove(range.range) }
101
87
102
88
fun withFactory (
103
89
factory : (OffsetIntervals ) -> FunctionalValue <OffsetIntervals >,
@@ -168,7 +154,11 @@ class OffsetRangeSet {
168
154
factory,
169
155
)
170
156
171
- data class Range (val from : Long , val to : Long? , val lastProcessed : Instant = Instant .now()) {
157
+ data class Range (
158
+ val from : Long ,
159
+ val to : Long? ,
160
+ val lastProcessed : Instant = Instant .now(),
161
+ ) {
172
162
@JsonIgnore
173
163
val size: Long? = to?.let { it - from + 1 }
174
164
fun ensureToOffset (): Range = if (to == null ) copy(to = from) else this
0 commit comments