2121import java .nio .ByteBuffer ;
2222import java .nio .ByteOrder ;
2323import java .util .Arrays ;
24+ import java .util .Collections ;
25+ import java .util .IdentityHashMap ;
2426import java .util .Objects ;
27+ import java .util .Set ;
28+ import java .util .stream .Stream ;
2529
2630final class CompositeBuf extends RcSupport <Buf , CompositeBuf > implements Buf {
2731 /**
@@ -62,8 +66,25 @@ private static Buf[] filterExternalBufs(Buf[] bufs) {
6266 // This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
6367 // will never overflow their component counts.
6468 // Allocating a new array unconditionally also prevents external modification of the array.
65- // TODO if any buffer is itself a composite buffer, then we should unwrap its sub-buffers
66- return Arrays .stream (bufs ).filter (b -> b .capacity () > 0 ).toArray (Buf []::new );
69+ bufs = Arrays .stream (bufs )
70+ .filter (b -> b .capacity () > 0 )
71+ .flatMap (CompositeBuf ::flattenBuffer )
72+ .toArray (Buf []::new );
73+ // Make sure there are no duplicates among the buffers.
74+ Set <Buf > duplicatesCheck = Collections .newSetFromMap (new IdentityHashMap <>());
75+ duplicatesCheck .addAll (Arrays .asList (bufs ));
76+ if (duplicatesCheck .size () < bufs .length ) {
77+ throw new IllegalArgumentException (
78+ "Cannot create composite buffer with duplicate constituent buffer components." );
79+ }
80+ return bufs ;
81+ }
82+
83+ private static Stream <Buf > flattenBuffer (Buf buf ) {
84+ if (buf instanceof CompositeBuf ) {
85+ return Stream .of (((CompositeBuf ) buf ).bufs );
86+ }
87+ return Stream .of (buf );
6788 }
6889
6990 private CompositeBuf (Allocator allocator , boolean isSendable , Buf [] bufs , Drop <CompositeBuf > drop ) {
@@ -616,9 +637,6 @@ void extendWith(Buf extension) {
616637 if (!isOwned ()) {
617638 throw new IllegalStateException ("This buffer cannot be extended because it is not in an owned state." );
618639 }
619- if (extension == this ) {
620- throw new IllegalArgumentException ("This buffer cannot be extended with itself." );
621- }
622640 if (bufs .length > 0 && extension .order () != order ()) {
623641 throw new IllegalArgumentException (
624642 "This buffer uses " + order () + " byte order, and cannot be extended with " +
@@ -639,14 +657,38 @@ void extendWith(Buf extension) {
639657 // overflow in their component counters.
640658 return ;
641659 }
642- // TODO if extension is itself a composite buffer, then we should extend ourselves by all of the sub-buffers
643660
644661 long newSize = capacity () + extensionCapacity ;
645662 Allocator .checkSize (newSize );
646663
647664 Buf [] restoreTemp = bufs ; // We need this to restore our buffer array, in case offset computations fail.
648665 try {
649- unsafeExtendWith (extension .acquire ());
666+ if (extension instanceof CompositeBuf ) {
667+ // If the extension is itself a composite buffer, then extend this one by all of the constituent
668+ // component buffers.
669+ CompositeBuf compositeExtension = (CompositeBuf ) extension ;
670+ Buf [] addedBuffers = compositeExtension .bufs ;
671+ Set <Buf > duplicatesCheck = Collections .newSetFromMap (new IdentityHashMap <>());
672+ duplicatesCheck .addAll (Arrays .asList (bufs ));
673+ duplicatesCheck .addAll (Arrays .asList (addedBuffers ));
674+ if (duplicatesCheck .size () < bufs .length + addedBuffers .length ) {
675+ throw extensionDuplicatesException ();
676+ }
677+ for (Buf addedBuffer : addedBuffers ) {
678+ addedBuffer .acquire ();
679+ }
680+ int extendAtIndex = bufs .length ;
681+ bufs = Arrays .copyOf (bufs , extendAtIndex + addedBuffers .length );
682+ System .arraycopy (addedBuffers , 0 , bufs , extendAtIndex , addedBuffers .length );
683+ computeBufferOffsets ();
684+ } else {
685+ for (Buf buf : restoreTemp ) {
686+ if (buf == extension ) {
687+ throw extensionDuplicatesException ();
688+ }
689+ }
690+ unsafeExtendWith (extension .acquire ());
691+ }
650692 if (restoreTemp .length == 0 ) {
651693 order = extension .order ();
652694 readOnly = extension .readOnly ();
@@ -657,6 +699,12 @@ void extendWith(Buf extension) {
657699 }
658700 }
659701
702+ private static IllegalArgumentException extensionDuplicatesException () {
703+ return new IllegalArgumentException (
704+ "The composite buffer cannot be extended with the given extension," +
705+ " as it would cause the buffer to have duplicate constituent buffers." );
706+ }
707+
660708 private void unsafeExtendWith (Buf extension ) {
661709 bufs = Arrays .copyOf (bufs , bufs .length + 1 );
662710 bufs [bufs .length - 1 ] = extension ;
0 commit comments