-
Notifications
You must be signed in to change notification settings - Fork 1.5k
ByteBufBsonDocument & ByteBufBsonArray refactorings #1874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
| import org.bson.ByteBuf; | ||
| import org.bson.io.ByteBufferBsonInput; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.util.ArrayList; | ||
| import java.util.Collection; | ||
| import java.util.Iterator; | ||
|
|
@@ -33,20 +34,31 @@ | |
|
|
||
| import static com.mongodb.internal.connection.ByteBufBsonHelper.readBsonValue; | ||
|
|
||
| final class ByteBufBsonArray extends BsonArray { | ||
| final class ByteBufBsonArray extends BsonArray implements Closeable { | ||
| private final ByteBuf byteBuf; | ||
|
|
||
| /** | ||
| * List of resources that need to be closed when this array is closed. | ||
| * Tracks the main ByteBuf and iterator duplicates. Iterator buffers are automatically | ||
| * removed and released when iteration completes normally to prevent memory accumulation. | ||
| */ | ||
| private final List<Closeable> trackedResources = new ArrayList<>(); | ||
|
strogiyotec marked this conversation as resolved.
|
||
| private boolean closed; | ||
|
|
||
| ByteBufBsonArray(final ByteBuf byteBuf) { | ||
| this.byteBuf = byteBuf; | ||
| trackedResources.add(byteBuf::release); | ||
| } | ||
|
|
||
| @Override | ||
| public Iterator<BsonValue> iterator() { | ||
| ensureOpen(); | ||
| return new ByteBufBsonArrayIterator(); | ||
| } | ||
|
|
||
| @Override | ||
| public List<BsonValue> getValues() { | ||
| ensureOpen(); | ||
| List<BsonValue> values = new ArrayList<>(); | ||
| for (BsonValue cur: this) { | ||
| //noinspection UseBulkOperation | ||
|
|
@@ -59,6 +71,7 @@ public List<BsonValue> getValues() { | |
|
|
||
| @Override | ||
| public int size() { | ||
| ensureOpen(); | ||
| int size = 0; | ||
| for (BsonValue ignored : this) { | ||
| size++; | ||
|
|
@@ -68,11 +81,13 @@ public int size() { | |
|
|
||
| @Override | ||
| public boolean isEmpty() { | ||
| ensureOpen(); | ||
| return !iterator().hasNext(); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(final Object o) { | ||
| ensureOpen(); | ||
| if (o == this) { | ||
| return true; | ||
| } | ||
|
|
@@ -91,6 +106,7 @@ public boolean equals(final Object o) { | |
|
|
||
| @Override | ||
| public int hashCode() { | ||
| ensureOpen(); | ||
| int hashCode = 1; | ||
| for (BsonValue cur : this) { | ||
| hashCode = 31 * hashCode + (cur == null ? 0 : cur.hashCode()); | ||
|
|
@@ -100,6 +116,7 @@ public int hashCode() { | |
|
|
||
| @Override | ||
| public boolean contains(final Object o) { | ||
| ensureOpen(); | ||
| for (BsonValue cur : this) { | ||
| if (Objects.equals(o, cur)) { | ||
| return true; | ||
|
|
@@ -111,6 +128,7 @@ public boolean contains(final Object o) { | |
|
|
||
| @Override | ||
| public Object[] toArray() { | ||
| ensureOpen(); | ||
| Object[] retVal = new Object[size()]; | ||
| Iterator<BsonValue> it = iterator(); | ||
| for (int i = 0; i < retVal.length; i++) { | ||
|
|
@@ -122,6 +140,7 @@ public Object[] toArray() { | |
| @Override | ||
| @SuppressWarnings("unchecked") | ||
| public <T> T[] toArray(final T[] a) { | ||
| ensureOpen(); | ||
| int size = size(); | ||
| T[] retVal = a.length >= size ? a : (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size); | ||
| Iterator<BsonValue> it = iterator(); | ||
|
|
@@ -133,6 +152,7 @@ public <T> T[] toArray(final T[] a) { | |
|
|
||
| @Override | ||
| public boolean containsAll(final Collection<?> c) { | ||
| ensureOpen(); | ||
| for (Object e : c) { | ||
| if (!contains(e)) { | ||
| return false; | ||
|
|
@@ -143,6 +163,7 @@ public boolean containsAll(final Collection<?> c) { | |
|
|
||
| @Override | ||
| public BsonValue get(final int index) { | ||
| ensureOpen(); | ||
| if (index < 0) { | ||
| throw new IndexOutOfBoundsException("Index out of range: " + index); | ||
| } | ||
|
|
@@ -159,6 +180,7 @@ public BsonValue get(final int index) { | |
|
|
||
| @Override | ||
| public int indexOf(final Object o) { | ||
| ensureOpen(); | ||
| int i = 0; | ||
| for (BsonValue cur : this) { | ||
| if (Objects.equals(o, cur)) { | ||
|
|
@@ -172,6 +194,7 @@ public int indexOf(final Object o) { | |
|
|
||
| @Override | ||
| public int lastIndexOf(final Object o) { | ||
| ensureOpen(); | ||
| ListIterator<BsonValue> listIterator = listIterator(size()); | ||
| while (listIterator.hasPrevious()) { | ||
| if (Objects.equals(o, listIterator.previous())) { | ||
|
|
@@ -183,17 +206,20 @@ public int lastIndexOf(final Object o) { | |
|
|
||
| @Override | ||
| public ListIterator<BsonValue> listIterator() { | ||
| ensureOpen(); | ||
| return listIterator(0); | ||
| } | ||
|
|
||
| @Override | ||
| public ListIterator<BsonValue> listIterator(final int index) { | ||
| ensureOpen(); | ||
| // Not the most efficient way to do this, but unlikely anyone will notice in practice | ||
| return new ArrayList<>(this).listIterator(index); | ||
| } | ||
|
|
||
| @Override | ||
| public List<BsonValue> subList(final int fromIndex, final int toIndex) { | ||
| ensureOpen(); | ||
| if (fromIndex < 0) { | ||
| throw new IndexOutOfBoundsException("fromIndex = " + fromIndex); | ||
| } | ||
|
|
@@ -234,6 +260,7 @@ public boolean addAll(final Collection<? extends BsonValue> c) { | |
|
|
||
| @Override | ||
| public boolean addAll(final int index, final Collection<? extends BsonValue> c) { | ||
| ensureOpen(); | ||
| throw new UnsupportedOperationException(READ_ONLY_MESSAGE); | ||
| } | ||
|
|
||
|
|
@@ -267,11 +294,43 @@ public BsonValue remove(final int index) { | |
| throw new UnsupportedOperationException(READ_ONLY_MESSAGE); | ||
| } | ||
|
|
||
| @Override | ||
| public void close(){ | ||
| if (!closed) { | ||
| for (Closeable closeable : trackedResources) { | ||
| try { | ||
| closeable.close(); | ||
| } catch (Exception e) { | ||
| // Log and continue closing other resources | ||
|
strogiyotec marked this conversation as resolved.
|
||
| } | ||
| } | ||
| trackedResources.clear(); | ||
| closed = true; | ||
| } | ||
| } | ||
|
|
||
| private void ensureOpen() { | ||
| if (closed) { | ||
| throw new IllegalStateException("The BsonArray resources have been released."); | ||
| } | ||
| } | ||
|
|
||
| private class ByteBufBsonArrayIterator implements Iterator<BsonValue> { | ||
| private final ByteBuf duplicatedByteBuf = byteBuf.duplicate(); | ||
| private final BsonBinaryReader bsonReader; | ||
| private ByteBuf duplicatedByteBuf; | ||
| private BsonBinaryReader bsonReader; | ||
| private Closeable resourceHandle; | ||
| private boolean finished; | ||
|
|
||
| { | ||
| ensureOpen(); | ||
| duplicatedByteBuf = byteBuf.duplicate(); | ||
| resourceHandle = () -> { | ||
| if (duplicatedByteBuf != null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in this PR Should we close the reader here as well ? Looks like none calls
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated to match the same naming as ByteBufBsonDocument |
||
| duplicatedByteBuf.release(); | ||
| duplicatedByteBuf = null; | ||
| } | ||
| }; | ||
| trackedResources.add(resourceHandle); | ||
| bsonReader = new BsonBinaryReader(new ByteBufferBsonInput(duplicatedByteBuf)); | ||
| // While one might expect that this would be a call to BsonReader#readStartArray that doesn't work because BsonBinaryReader | ||
| // expects to be positioned at the start at the beginning of a document, not an array. Fortunately, a BSON array has exactly | ||
|
|
@@ -283,7 +342,11 @@ private class ByteBufBsonArrayIterator implements Iterator<BsonValue> { | |
|
|
||
| @Override | ||
| public boolean hasNext() { | ||
| return bsonReader.getCurrentBsonType() != BsonType.END_OF_DOCUMENT; | ||
| boolean hasNext = bsonReader.getCurrentBsonType() != BsonType.END_OF_DOCUMENT; | ||
| if (!hasNext) { | ||
| cleanup(); | ||
| } | ||
| return hasNext; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -292,9 +355,22 @@ public BsonValue next() { | |
| throw new NoSuchElementException(); | ||
| } | ||
| bsonReader.skipName(); | ||
| BsonValue value = readBsonValue(duplicatedByteBuf, bsonReader); | ||
| BsonValue value = readBsonValue(duplicatedByteBuf, bsonReader, trackedResources); | ||
| bsonReader.readBsonType(); | ||
| return value; | ||
| } | ||
|
|
||
| private void cleanup() { | ||
| if (!finished) { | ||
| finished = true; | ||
| // Remove from tracked resources since we're cleaning up immediately | ||
| trackedResources.remove(resourceHandle); | ||
| try { | ||
| resourceHandle.close(); | ||
| } catch (Exception e) { | ||
| // Ignore | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add NotThreadSafe annotation here ? Not mandatory