1010package org .elasticsearch .ingest ;
1111
1212import org .apache .lucene .util .BytesRef ;
13+ import org .elasticsearch .common .bytes .BytesArray ;
1314import org .elasticsearch .common .bytes .BytesReference ;
15+ import org .elasticsearch .common .io .stream .BytesStreamOutput ;
1416import org .elasticsearch .common .io .stream .RecyclerBytesStreamOutput ;
1517import org .elasticsearch .common .io .stream .StreamInput ;
1618import org .elasticsearch .common .io .stream .StreamOutput ;
1719import org .elasticsearch .common .recycler .Recycler ;
1820
21+ import java .io .ByteArrayOutputStream ;
1922import java .io .IOException ;
2023import java .io .UncheckedIOException ;
2124import java .nio .charset .StandardCharsets ;
@@ -70,34 +73,8 @@ public BytesReference getSerializedKeyBytes() {
7073 // String key = entry.key();
7174 // estimate += key == null ? 0 : key.length() + 5;
7275 // }
73- try (RecyclerBytesStreamOutput streamOutput = new RecyclerBytesStreamOutput (new Recycler <>() {
74-
75- // TODO: Better estimate
76- final int estimate = 512 ;
77-
78- @ Override
79- public V <BytesRef > obtain () {
80- return new V <>() {
81- @ Override
82- public BytesRef v () {
83- return new BytesRef (new byte [estimate ]);
84- }
85-
86- @ Override
87- public boolean isRecycled () {
88- return false ;
89- }
90-
91- @ Override
92- public void close () {}
93- };
94- }
95-
96- @ Override
97- public int pageSize () {
98- return estimate ;
99- }
100- })) {
76+ // try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
77+ try (RecyclerBytesStreamOutput streamOutput = new RecyclerBytesStreamOutput (getBytesRefRecycler ())) {
10178 streamOutput .writeVInt (keys .size ());
10279 for (ESONEntry entry : keys ) {
10380 String key = entry .key ();
@@ -112,11 +89,54 @@ public int pageSize() {
11289 streamOutput .writeInt (entry .offsetOrCount ());
11390 }
11491 }
115- serializedKeyBytes .set (streamOutput .bytes ());
92+ BytesReference bytes = streamOutput .bytes ();
93+ ByteArrayOutputStream os = new ByteArrayOutputStream (bytes .length ());
94+ bytes .writeTo (os );
95+ serializedKeyBytes .set (bytes );
11696 } catch (IOException e ) {
11797 throw new UncheckedIOException (e );
11898 }
11999 }
120100 return serializedKeyBytes .get ();
121101 }
102+
103+ private static final ThreadLocal <BytesRef > BYTES_REF = ThreadLocal .withInitial (() -> new BytesRef (new byte [16384 ]));
104+
105+ private static Recycler <BytesRef > getBytesRefRecycler () {
106+ return new Recycler <>() {
107+
108+ private boolean first = true ;
109+
110+ @ Override
111+ public V <BytesRef > obtain () {
112+ final BytesRef bytesRef ;
113+ if (first ) {
114+ first = false ;
115+ bytesRef = BYTES_REF .get ();
116+ } else {
117+ bytesRef = new BytesRef (new byte [16384 ]);
118+ }
119+ return new V <>() {
120+
121+ @ Override
122+ public BytesRef v () {
123+ return bytesRef ;
124+ }
125+
126+ @ Override
127+ public boolean isRecycled () {
128+ return false ;
129+ }
130+
131+ @ Override
132+ public void close () {}
133+ };
134+ }
135+
136+ @ Override
137+ public int pageSize () {
138+ return 16384 ;
139+ }
140+ };
141+ }
122142}
0 commit comments