55import com .clickhouse .client .api .data_formats .ClickHouseBinaryFormatReader ;
66import com .clickhouse .client .api .internal .MapUtils ;
77import com .clickhouse .client .api .internal .ServerSettings ;
8+ import com .clickhouse .client .api .metadata .NoSuchColumnException ;
89import com .clickhouse .client .api .metadata .TableSchema ;
910import com .clickhouse .client .api .query .NullValueException ;
1011import com .clickhouse .client .api .query .POJOSetter ;
2223import java .io .EOFException ;
2324import java .io .IOException ;
2425import java .io .InputStream ;
26+ import java .lang .ref .WeakReference ;
2527import java .math .BigDecimal ;
2628import java .math .BigInteger ;
2729import java .net .Inet4Address ;
3436import java .time .ZoneOffset ;
3537import java .time .ZonedDateTime ;
3638import java .time .temporal .TemporalAmount ;
39+ import java .util .AbstractMap ;
40+ import java .util .Arrays ;
41+ import java .util .Collection ;
3742import java .util .Collections ;
3843import java .util .HashMap ;
44+ import java .util .HashSet ;
3945import java .util .List ;
4046import java .util .Map ;
47+ import java .util .Set ;
4148import java .util .TimeZone ;
4249import java .util .UUID ;
4350import java .util .function .Function ;
51+ import java .util .stream .Collectors ;
4452
4553public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryFormatReader {
4654
@@ -76,8 +84,8 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
7684 }
7785 }
7886
79- protected Map < String , Object > currentRecord = new HashMap <>() ;
80- protected Map < String , Object > nextRecord = new HashMap <>() ;
87+ protected Object [] currentRecord ;
88+ protected Object [] nextRecord ;
8189
8290 protected boolean nextRecordEmpty = true ;
8391
@@ -150,17 +158,39 @@ public boolean readRecord(Map<String, Object> record) throws IOException {
150158 return true ;
151159 }
152160
161+ protected boolean readRecord (Object [] record ) throws IOException {
162+ boolean firstColumn = true ;
163+ for (int i = 0 ; i < columns .length ; i ++) {
164+ try {
165+ Object val = binaryStreamReader .readValue (columns [i ]);
166+ if (val != null ) {
167+ record [i ] = val ;
168+ } else {
169+ record [i ] = null ;
170+ }
171+ firstColumn = false ;
172+ } catch (EOFException e ) {
173+ if (firstColumn ) {
174+ endReached ();
175+ return false ;
176+ }
177+ throw e ;
178+ }
179+ }
180+ return true ;
181+ }
182+
153183 @ Override
154184 public <T > T readValue (int colIndex ) {
155185 if (colIndex < 1 || colIndex > getSchema ().getColumns ().size ()) {
156186 throw new ClientException ("Column index out of bounds: " + colIndex );
157187 }
158- return (T ) currentRecord . get ( getSchema (). columnIndexToName ( colIndex )) ;
188+ return (T ) currentRecord [ colIndex - 1 ] ;
159189 }
160190
161191 @ Override
162192 public <T > T readValue (String colName ) {
163- return (T ) currentRecord . get (colName );
193+ return (T ) currentRecord [ getSchema (). nameToIndex (colName )] ;
164194 }
165195
166196 @ Override
@@ -195,16 +225,16 @@ public Map<String, Object> next() {
195225 }
196226
197227 if (!nextRecordEmpty ) {
198- Map < String , Object > tmp = currentRecord ;
228+ Object [] tmp = currentRecord ;
199229 currentRecord = nextRecord ;
200230 nextRecord = tmp ;
201231 readNextRecord ();
202- return currentRecord ;
232+ return new RecordWrapper ( currentRecord , schema ) ;
203233 } else {
204234 try {
205235 if (readRecord (currentRecord )) {
206236 readNextRecord ();
207- return currentRecord ;
237+ return new RecordWrapper ( currentRecord , schema ) ;
208238 } else {
209239 currentRecord = null ;
210240 return null ;
@@ -226,6 +256,9 @@ protected void setSchema(TableSchema schema) {
226256 this .columns = schema .getColumns ().toArray (ClickHouseColumn .EMPTY_ARRAY );
227257 this .convertions = new Map [columns .length ];
228258
259+ this .currentRecord = new Object [columns .length ];
260+ this .nextRecord = new Object [columns .length ];
261+
229262 for (int i = 0 ; i < columns .length ; i ++) {
230263 ClickHouseColumn column = columns [i ];
231264 ClickHouseDataType columnDataType = column .getDataType ();
@@ -533,13 +566,12 @@ public boolean[] getBooleanArray(String colName) {
533566
534567 @ Override
535568 public boolean hasValue (int colIndex ) {
536- return currentRecord . containsKey ( getSchema (). columnIndexToName ( colIndex )) ;
569+ return currentRecord [ colIndex - 1 ] != null ;
537570 }
538571
539572 @ Override
540573 public boolean hasValue (String colName ) {
541- getSchema ().getColumnByName (colName );
542- return currentRecord .containsKey (colName );
574+ return currentRecord [getSchema ().nameToIndex (colName )] != null ;
543575 }
544576
545577 @ Override
@@ -778,4 +810,108 @@ public ClickHouseBitmap getClickHouseBitmap(int index) {
778810 public void close () throws Exception {
779811 input .close ();
780812 }
813+
814+ private static class RecordWrapper implements Map <String , Object > {
815+
816+ private final WeakReference <Object []> recordRef ;
817+
818+ private final WeakReference <TableSchema > schemaRef ;
819+
820+ int size ;
821+ public RecordWrapper (Object [] record , TableSchema schema ) {
822+ this .recordRef = new WeakReference <>(record );
823+ this .schemaRef = new WeakReference <>(schema );
824+ this .size = record .length ;
825+ }
826+
827+ @ Override
828+ public int size () {
829+ return size ;
830+ }
831+
832+ @ Override
833+ public boolean isEmpty () {
834+ return size == 0 ;
835+ }
836+
837+ @ Override
838+ @ SuppressWarnings ("ConstantConditions" )
839+ public boolean containsKey (Object key ) {
840+ if (key instanceof String ) {
841+ return recordRef .get ()[schemaRef .get ().nameToIndex ((String )key )] != null ;
842+ }
843+ return false ;
844+ }
845+
846+ @ Override
847+ public boolean containsValue (Object value ) {
848+ for (Object obj : recordRef .get ()) {
849+ if (obj == value ) {
850+ return true ;
851+ }
852+ }
853+ return false ;
854+ }
855+
856+ @ Override
857+ @ SuppressWarnings ("ConstantConditions" )
858+ public Object get (Object key ) {
859+ if (key instanceof String ) {
860+ try {
861+ int index = schemaRef .get ().nameToIndex ((String ) key );
862+ if (index < size ) {
863+ return recordRef .get ()[index ];
864+ }
865+ } catch (NoSuchColumnException e ) {
866+ return null ;
867+ }
868+ }
869+
870+ return null ;
871+ }
872+
873+ @ Override
874+ public Object put (String key , Object value ) {
875+ throw new UnsupportedOperationException ("Record is read-only" );
876+ }
877+
878+ @ Override
879+ public Object remove (Object key ) {
880+ throw new UnsupportedOperationException ("Record is read-only" );
881+ }
882+
883+ @ Override
884+ public void putAll (Map <? extends String , ?> m ) {
885+ throw new UnsupportedOperationException ("Record is read-only" );
886+ }
887+
888+ @ Override
889+ public void clear () {
890+ throw new UnsupportedOperationException ("Record is read-only" );
891+ }
892+
893+ @ Override
894+ @ SuppressWarnings ("ConstantConditions" )
895+ public Set <String > keySet () {
896+ // TODO: create a view in Schema
897+ return schemaRef .get ().getColumns ().stream ().map (ClickHouseColumn ::getColumnName ).collect (Collectors .toSet ());
898+ }
899+
900+ @ Override
901+ @ SuppressWarnings ("ConstantConditions" )
902+ public Collection <Object > values () {
903+ return Arrays .asList (recordRef .get ());
904+ }
905+
906+ @ Override
907+ @ SuppressWarnings ("ConstantConditions" )
908+ public Set <Entry <String , Object >> entrySet () {
909+ int i = 0 ;
910+ Set <Entry <String , Object >> entrySet = new HashSet <>();
911+ for (ClickHouseColumn column : schemaRef .get ().getColumns ()) {
912+ entrySet .add ( new AbstractMap .SimpleImmutableEntry (column .getColumnName (), recordRef .get ()[i ++]));
913+ }
914+ return entrySet ;
915+ }
916+ }
781917}
0 commit comments