Skip to content

Commit 62f3608

Browse files
superhxmanmohak07
andauthored
fix(metadata): improvise search (#3215) (#3234)
Co-authored-by: Mohak Agrawal <112922671+manmohak07@users.noreply.github.com>
1 parent 8c6588e commit 62f3608

File tree

2 files changed

+169
-16
lines changed

2 files changed

+169
-16
lines changed

metadata/src/main/java/org/apache/kafka/metadata/stream/SortedStreamSetObjectsList.java

Lines changed: 98 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,33 @@
1717

1818
package org.apache.kafka.metadata.stream;
1919

20+
import java.util.ArrayList;
21+
import java.util.Collections;
2022
import java.util.Iterator;
21-
import java.util.LinkedList;
2223
import java.util.List;
2324
import java.util.Objects;
2425
import java.util.stream.Collectors;
2526

2627
public class SortedStreamSetObjectsList implements SortedStreamSetObjects {
2728

29+
// ArrayList instead of LinkedList for faster random access needed by binary search
2830
private final List<S3StreamSetObject> list;
2931

3032
public SortedStreamSetObjectsList(SortedStreamSetObjects source) {
31-
this.list = new LinkedList<>(source.list());
33+
Objects.requireNonNull(source, "source must not be null");
34+
this.list = new ArrayList<>(source.list());
3235
}
3336

3437
public SortedStreamSetObjectsList() {
35-
this.list = new LinkedList<>();
38+
this.list = new ArrayList<>();
3639
}
3740

3841
/**
3942
* Construct a SortedStreamSetObjectsList from a list of S3StreamSetObjects.
4043
* @param list the list of S3StreamSetObjects, must guarantee that the list is sorted
4144
*/
4245
public SortedStreamSetObjectsList(List<S3StreamSetObject> list) {
43-
this.list = list;
46+
this.list = new ArrayList<>(Objects.requireNonNull(list, "list must not be null"));
4447
}
4548

4649
@Override
@@ -65,27 +68,39 @@ public List<S3StreamSetObject> list() {
6568

6669
@Override
6770
public boolean contains(Object o) {
68-
return this.list.contains(o);
71+
if (!(o instanceof S3StreamSetObject)) {
72+
return false;
73+
}
74+
// Use binary search for faster lookup
75+
return findExactMatch((S3StreamSetObject) o) >= 0;
6976
}
7077

7178
@Override
7279
public boolean add(S3StreamSetObject s3StreamSetObject) {
73-
// TODO: optimize by binary search
74-
for (int index = 0; index < this.list.size(); index++) {
75-
S3StreamSetObject current = this.list.get(index);
76-
if (s3StreamSetObject.compareTo(current) <= 0) {
77-
this.list.add(index, s3StreamSetObject);
78-
return true;
79-
}
80-
}
81-
this.list.add(s3StreamSetObject);
80+
Objects.requireNonNull(s3StreamSetObject, "s3StreamSetObject must not be null");
81+
82+
// Use binary search to find where to insert (replaces linear scan)
83+
int insertionIndex = findInsertionIndex(s3StreamSetObject);
84+
this.list.add(insertionIndex, s3StreamSetObject);
8285
return true;
8386
}
8487

8588
@Override
8689
public boolean remove(Object o) {
87-
// TODO: optimize by binary search
88-
return this.list.remove(o);
90+
if (!(o instanceof S3StreamSetObject)) {
91+
return false;
92+
}
93+
94+
S3StreamSetObject target = (S3StreamSetObject) o;
95+
96+
// Use binary search to find the object (replaces linear scan)
97+
int index = findExactMatch(target);
98+
99+
if (index >= 0) {
100+
this.list.remove(index);
101+
return true;
102+
}
103+
return false;
89104
}
90105

91106

@@ -100,6 +115,73 @@ public void clear() {
100115
this.list.clear();
101116
}
102117

118+
/**
119+
* Finds the exact object in the list using binary search.
120+
* Note: compareTo() uses orderId, but equals() uses objectId.
121+
* So we binary search by orderId, then scan duplicates to find matching objectId.
122+
*/
123+
private int findExactMatch(S3StreamSetObject target) {
124+
int index = Collections.binarySearch(this.list, target);
125+
126+
if (index < 0) {
127+
// No element with matching orderId found
128+
return -1;
129+
}
130+
131+
// Found an element with same orderId, check if it's the exact match
132+
if (this.list.get(index).equals(target)) {
133+
return index;
134+
}
135+
136+
// Multiple objects can have the same orderId but different objectId
137+
// Search forward through duplicates
138+
int size = this.list.size();
139+
for (int i = index + 1; i < size; i++) {
140+
S3StreamSetObject element = this.list.get(i);
141+
if (target.compareTo(element) != 0) {
142+
break; // No more duplicates
143+
}
144+
if (element.equals(target)) {
145+
return i;
146+
}
147+
}
148+
149+
// Search backward through duplicates
150+
for (int i = index - 1; i >= 0; i--) {
151+
S3StreamSetObject element = this.list.get(i);
152+
if (target.compareTo(element) != 0) {
153+
break; // No more duplicates
154+
}
155+
if (element.equals(target)) {
156+
return i;
157+
}
158+
}
159+
160+
return -1;
161+
}
162+
163+
/**
164+
* Finds where to insert a new object using binary search.
165+
* Inserts at the leftmost position among objects with the same orderId.
166+
* This keeps the list sorted and maintains stable insertion order.
167+
*/
168+
private int findInsertionIndex(S3StreamSetObject target) {
169+
int index = Collections.binarySearch(this.list, target);
170+
171+
if (index < 0) {
172+
// No matching orderId found, convert to insertion point
173+
// binarySearch returns (-(insertion point) - 1) when not found
174+
return -(index + 1);
175+
}
176+
177+
// Found objects with same orderId, move to leftmost position
178+
while (index > 0 && target.compareTo(this.list.get(index - 1)) == 0) {
179+
index--;
180+
}
181+
182+
return index;
183+
}
184+
103185
@Override
104186
public boolean equals(Object o) {
105187
if (this == o) {

metadata/src/test/java/org/apache/kafka/metadata/stream/SortedStreamSetObjectsListTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,5 +70,76 @@ public void testSorted() {
7070
.collect(Collectors.toList()));
7171
}
7272

73+
@Test
74+
public void testDuplicateOrderIds() {
75+
SortedStreamSetObjects objects = new SortedStreamSetObjectsList();
76+
// Add multiple objects with the same orderId but different objectIds
77+
objects.add(new S3StreamSetObject(100, -1, Collections.emptyList(), 5));
78+
objects.add(new S3StreamSetObject(101, -1, Collections.emptyList(), 5));
79+
objects.add(new S3StreamSetObject(102, -1, Collections.emptyList(), 5));
80+
objects.add(new S3StreamSetObject(200, -1, Collections.emptyList(), 3));
81+
objects.add(new S3StreamSetObject(300, -1, Collections.emptyList(), 7));
82+
83+
assertEquals(5, objects.size());
84+
85+
// Verify all objects with orderId=5 are grouped together
86+
List<Long> orderIds = objects.list().stream()
87+
.map(S3StreamSetObject::orderId)
88+
.collect(Collectors.toList());
89+
assertEquals(List.of(3L, 5L, 5L, 5L, 7L), orderIds);
90+
91+
// Test contains with duplicate orderIds
92+
assertEquals(true, objects.contains(new S3StreamSetObject(101, -1, Collections.emptyList(), 5)));
93+
assertEquals(false, objects.contains(new S3StreamSetObject(999, -1, Collections.emptyList(), 5)));
94+
95+
// Test remove with duplicate orderIds - should only remove the exact match
96+
boolean removed = objects.remove(new S3StreamSetObject(101, -1, Collections.emptyList(), 5));
97+
assertEquals(true, removed);
98+
assertEquals(4, objects.size());
99+
100+
// Verify the correct object was removed (objectId 101 should be gone)
101+
List<Long> objectIds = objects.list().stream()
102+
.map(S3StreamSetObject::objectId)
103+
.collect(Collectors.toList());
104+
assertEquals(false, objectIds.contains(101L));
105+
assertEquals(true, objectIds.contains(100L));
106+
assertEquals(true, objectIds.contains(102L));
107+
assertEquals(List.of(200L, 300L), List.of(objectIds.get(0), objectIds.get(3)));
108+
}
109+
110+
@Test
111+
public void testRemoveNonExistent() {
112+
SortedStreamSetObjects objects = new SortedStreamSetObjectsList();
113+
objects.add(new S3StreamSetObject(1, -1, Collections.emptyList(), 10));
114+
objects.add(new S3StreamSetObject(2, -1, Collections.emptyList(), 20));
115+
116+
// Try to remove object that doesn't exist
117+
boolean removed = objects.remove(new S3StreamSetObject(999, -1, Collections.emptyList(), 15));
118+
assertEquals(false, removed);
119+
assertEquals(2, objects.size());
120+
}
121+
122+
@Test
123+
public void testEmptyList() {
124+
SortedStreamSetObjects objects = new SortedStreamSetObjectsList();
125+
assertEquals(0, objects.size());
126+
assertEquals(true, objects.isEmpty());
127+
assertEquals(false, objects.contains(new S3StreamSetObject(1, -1, Collections.emptyList(), 1)));
128+
}
129+
130+
@Test
131+
public void testSingleElement() {
132+
SortedStreamSetObjects objects = new SortedStreamSetObjectsList();
133+
S3StreamSetObject obj = new S3StreamSetObject(42, -1, Collections.emptyList(), 100);
134+
135+
objects.add(obj);
136+
assertEquals(1, objects.size());
137+
assertEquals(true, objects.contains(obj));
138+
139+
objects.remove(obj);
140+
assertEquals(0, objects.size());
141+
assertEquals(true, objects.isEmpty());
142+
}
143+
73144

74145
}

0 commit comments

Comments
 (0)