3939import java .util .TimeZone ;
4040import java .util .UUID ;
4141import java .util .concurrent .ConcurrentHashMap ;
42+ import java .util .concurrent .atomic .AtomicBoolean ;
4243import java .util .function .Function ;
4344
4445public abstract class AbstractBinaryFormatReader implements ClickHouseBinaryFormatReader {
@@ -73,6 +74,7 @@ protected AbstractBinaryFormatReader(InputStream inputStream, QuerySettings quer
7374 protected Map <String , Object > currentRecord = new ConcurrentHashMap <>();
7475 protected Map <String , Object > nextRecord = new ConcurrentHashMap <>();
7576
77+ protected AtomicBoolean nextRecordEmpty = new AtomicBoolean (true );
7678
7779 /**
7880 * It is still internal method and should be used with care.
@@ -91,7 +93,9 @@ public boolean readRecord(Map<String, Object> record) throws IOException {
9193 try {
9294 Object val = binaryStreamReader .readValue (column );
9395 if (val != null ) {
94- record .put (column .getColumnName (),val );
96+ record .put (column .getColumnName (), val );
97+ } else {
98+ record .remove (column .getColumnName ());
9599 }
96100 firstColumn = false ;
97101 } catch (EOFException e ) {
@@ -121,15 +125,17 @@ public <T> T readValue(String colName) {
121125
122126 @ Override
123127 public boolean hasNext () {
124- return hasNext ;
128+ return hasNext ;
125129 }
126130
127131
128132 protected void readNextRecord () {
129133 try {
130- nextRecord . clear ( );
134+ nextRecordEmpty . set ( true );
131135 if (!readRecord (nextRecord )) {
132136 hasNext = false ;
137+ } else {
138+ nextRecordEmpty .compareAndSet (true , false );
133139 }
134140 } catch (IOException e ) {
135141 hasNext = false ;
@@ -143,15 +149,14 @@ public Map<String, Object> next() {
143149 return null ;
144150 }
145151
146- if (!nextRecord . isEmpty ()) {
152+ if (!nextRecordEmpty . get ()) {
147153 Map <String , Object > tmp = currentRecord ;
148154 currentRecord = nextRecord ;
149155 nextRecord = tmp ;
150156 readNextRecord ();
151157 return currentRecord ;
152158 } else {
153159 try {
154- currentRecord .clear ();
155160 if (readRecord (currentRecord )) {
156161 readNextRecord ();
157162 return currentRecord ;
0 commit comments