Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,33 @@

package org.apache.kafka.metadata.stream;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

public class SortedStreamSetObjectsList implements SortedStreamSetObjects {

// ArrayList instead of LinkedList for faster random access needed by binary search
private final List<S3StreamSetObject> list;

public SortedStreamSetObjectsList(SortedStreamSetObjects source) {
this.list = new LinkedList<>(source.list());
Objects.requireNonNull(source, "source must not be null");
this.list = new ArrayList<>(source.list());
}

public SortedStreamSetObjectsList() {
this.list = new LinkedList<>();
this.list = new ArrayList<>();
}

/**
* Construct a SortedStreamSetObjectsList from a list of S3StreamSetObjects.
* @param list the list of S3StreamSetObjects, must guarantee that the list is sorted
*/
public SortedStreamSetObjectsList(List<S3StreamSetObject> list) {
this.list = list;
this.list = new ArrayList<>(Objects.requireNonNull(list, "list must not be null"));
}

@Override
Expand All @@ -65,27 +68,39 @@ public List<S3StreamSetObject> list() {

@Override
public boolean contains(Object o) {
return this.list.contains(o);
if (!(o instanceof S3StreamSetObject)) {
return false;
}
// Use binary search for faster lookup
return findExactMatch((S3StreamSetObject) o) >= 0;
}

@Override
public boolean add(S3StreamSetObject s3StreamSetObject) {
// TODO: optimize by binary search
for (int index = 0; index < this.list.size(); index++) {
S3StreamSetObject current = this.list.get(index);
if (s3StreamSetObject.compareTo(current) <= 0) {
this.list.add(index, s3StreamSetObject);
return true;
}
}
this.list.add(s3StreamSetObject);
Objects.requireNonNull(s3StreamSetObject, "s3StreamSetObject must not be null");

// Use binary search to find where to insert (replaces linear scan)
int insertionIndex = findInsertionIndex(s3StreamSetObject);
this.list.add(insertionIndex, s3StreamSetObject);
return true;
}

@Override
public boolean remove(Object o) {
// TODO: optimize by binary search
return this.list.remove(o);
if (!(o instanceof S3StreamSetObject)) {
return false;
}

S3StreamSetObject target = (S3StreamSetObject) o;

// Use binary search to find the object (replaces linear scan)
int index = findExactMatch(target);

if (index >= 0) {
this.list.remove(index);
return true;
}
return false;
}


Expand All @@ -100,6 +115,73 @@ public void clear() {
this.list.clear();
}

/**
* Finds the exact object in the list using binary search.
* Note: compareTo() uses orderId, but equals() uses objectId.
* So we binary search by orderId, then scan duplicates to find matching objectId.
*/
private int findExactMatch(S3StreamSetObject target) {
int index = Collections.binarySearch(this.list, target);

if (index < 0) {
// No element with matching orderId found
return -1;
}

// Found an element with same orderId, check if it's the exact match
if (this.list.get(index).equals(target)) {
return index;
}

// Multiple objects can have the same orderId but different objectId
// Search forward through duplicates
int size = this.list.size();
for (int i = index + 1; i < size; i++) {
S3StreamSetObject element = this.list.get(i);
if (target.compareTo(element) != 0) {
break; // No more duplicates
}
if (element.equals(target)) {
return i;
}
}

// Search backward through duplicates
for (int i = index - 1; i >= 0; i--) {
S3StreamSetObject element = this.list.get(i);
if (target.compareTo(element) != 0) {
break; // No more duplicates
}
if (element.equals(target)) {
return i;
}
}

return -1;
}

/**
* Finds where to insert a new object using binary search.
* Inserts at the leftmost position among objects with the same orderId.
* This keeps the list sorted and maintains stable insertion order.
*/
private int findInsertionIndex(S3StreamSetObject target) {
int index = Collections.binarySearch(this.list, target);

if (index < 0) {
// No matching orderId found, convert to insertion point
// binarySearch returns (-(insertion point) - 1) when not found
return -(index + 1);
}

// Found objects with same orderId, move to leftmost position
while (index > 0 && target.compareTo(this.list.get(index - 1)) == 0) {
index--;
}

return index;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,76 @@ public void testSorted() {
.collect(Collectors.toList()));
}

@Test
public void testDuplicateOrderIds() {
SortedStreamSetObjects objects = new SortedStreamSetObjectsList();
// Add multiple objects with the same orderId but different objectIds
objects.add(new S3StreamSetObject(100, -1, Collections.emptyList(), 5));
objects.add(new S3StreamSetObject(101, -1, Collections.emptyList(), 5));
objects.add(new S3StreamSetObject(102, -1, Collections.emptyList(), 5));
objects.add(new S3StreamSetObject(200, -1, Collections.emptyList(), 3));
objects.add(new S3StreamSetObject(300, -1, Collections.emptyList(), 7));

assertEquals(5, objects.size());

// Verify all objects with orderId=5 are grouped together
List<Long> orderIds = objects.list().stream()
.map(S3StreamSetObject::orderId)
.collect(Collectors.toList());
assertEquals(List.of(3L, 5L, 5L, 5L, 7L), orderIds);

// Test contains with duplicate orderIds
assertEquals(true, objects.contains(new S3StreamSetObject(101, -1, Collections.emptyList(), 5)));
assertEquals(false, objects.contains(new S3StreamSetObject(999, -1, Collections.emptyList(), 5)));

// Test remove with duplicate orderIds - should only remove the exact match
boolean removed = objects.remove(new S3StreamSetObject(101, -1, Collections.emptyList(), 5));
assertEquals(true, removed);
assertEquals(4, objects.size());

// Verify the correct object was removed (objectId 101 should be gone)
List<Long> objectIds = objects.list().stream()
.map(S3StreamSetObject::objectId)
.collect(Collectors.toList());
assertEquals(false, objectIds.contains(101L));
assertEquals(true, objectIds.contains(100L));
assertEquals(true, objectIds.contains(102L));
assertEquals(List.of(200L, 300L), List.of(objectIds.get(0), objectIds.get(3)));
}

@Test
public void testRemoveNonExistent() {
SortedStreamSetObjects objects = new SortedStreamSetObjectsList();
objects.add(new S3StreamSetObject(1, -1, Collections.emptyList(), 10));
objects.add(new S3StreamSetObject(2, -1, Collections.emptyList(), 20));

// Try to remove object that doesn't exist
boolean removed = objects.remove(new S3StreamSetObject(999, -1, Collections.emptyList(), 15));
assertEquals(false, removed);
assertEquals(2, objects.size());
}

@Test
public void testEmptyList() {
SortedStreamSetObjects objects = new SortedStreamSetObjectsList();
assertEquals(0, objects.size());
assertEquals(true, objects.isEmpty());
assertEquals(false, objects.contains(new S3StreamSetObject(1, -1, Collections.emptyList(), 1)));
}

@Test
public void testSingleElement() {
SortedStreamSetObjects objects = new SortedStreamSetObjectsList();
S3StreamSetObject obj = new S3StreamSetObject(42, -1, Collections.emptyList(), 100);

objects.add(obj);
assertEquals(1, objects.size());
assertEquals(true, objects.contains(obj));

objects.remove(obj);
assertEquals(0, objects.size());
assertEquals(true, objects.isEmpty());
}


}
Loading