66import com .clickhouse .data .ClickHouseFormat ;
77
88import java .io .IOException ;
9+ import java .io .InputStream ;
910import java .io .OutputStream ;
11+ import java .io .Reader ;
12+ import java .math .BigDecimal ;
13+ import java .math .BigInteger ;
1014import java .time .LocalDate ;
1115import java .time .LocalDateTime ;
1216import java .time .ZonedDateTime ;
17+ import java .util .Arrays ;
1318import java .util .List ;
1419
1520
2126 * <p>
2227 * Experimental API
2328 */
24- public class RowBinaryFormatWriter {
29+ public class RowBinaryFormatWriter implements ClickHouseBinaryFormatWriter {
2530
2631 private final OutputStream out ;
2732
@@ -31,6 +36,10 @@ public class RowBinaryFormatWriter {
3136
3237 private final boolean defaultSupport ;
3338
39+ private int rowCount = 0 ;
40+
41+ private boolean rowStarted = false ; // indicates if at least one value was written to a row
42+
3443 public RowBinaryFormatWriter (OutputStream out , TableSchema tableSchema , ClickHouseFormat format ) {
3544 if (format != ClickHouseFormat .RowBinary && format != ClickHouseFormat .RowBinaryWithDefaults ) {
3645 throw new IllegalArgumentException ("Only RowBinary and RowBinaryWithDefaults are supported" );
@@ -42,96 +51,233 @@ public RowBinaryFormatWriter(OutputStream out, TableSchema tableSchema, ClickHou
4251 this .defaultSupport = format == ClickHouseFormat .RowBinaryWithDefaults ;
4352 }
4453
54+ @ Override
55+ public OutputStream getOutputStream () {
56+ return out ;
57+ }
58+
59+ @ Override
60+ public int getRowCount () {
61+ return rowCount ;
62+ }
63+
64+ @ Override
65+ public ClickHouseFormat getFormat () {
66+ return defaultSupport ? ClickHouseFormat .RowBinaryWithDefaults : ClickHouseFormat .RowBinary ;
67+ }
68+
69+ @ Override
70+ public void clearRow () {
71+ Arrays .fill (row , null );
72+ rowStarted = false ;
73+ }
74+
75+ @ Override
4576 public void setValue (String column , Object value ) {
4677 setValue (tableSchema .nameToColumnIndex (column ), value );
4778 }
4879
80+ @ Override
4981 public void setValue (int colIndex , Object value ) {
5082 row [colIndex - 1 ] = value ;
83+ if (!rowStarted ) {
84+ rowStarted = true ;
85+ }
5186 }
5287
88+ @ Override
5389 public void commitRow () throws IOException {
54- List <ClickHouseColumn > columnList = tableSchema .getColumns ();
55- for (int i = 0 ; i < row .length ; i ++) {
56- ClickHouseColumn column = columnList .get (i );
57- // here we skip if we have a default value that is MATERIALIZED or ALIAS or ...
58- if (column .hasDefault () && column .getDefaultValue () != ClickHouseColumn .DefaultValue .DEFAULT )
59- continue ;
60- if (RowBinaryFormatSerializer .writeValuePreamble (out , defaultSupport , column , row [i ])) {
61- SerializerUtils .serializeData (out , row [i ], column );
90+ if (rowStarted ) {
91+ List <ClickHouseColumn > columnList = tableSchema .getColumns ();
92+ for (int i = 0 ; i < row .length ; i ++) {
93+ ClickHouseColumn column = columnList .get (i );
94+ // here we skip if we have a default value that is MATERIALIZED or ALIAS or ...
95+ if (column .hasDefault () && column .getDefaultValue () != ClickHouseColumn .DefaultValue .DEFAULT )
96+ continue ;
97+ if (RowBinaryFormatSerializer .writeValuePreamble (out , defaultSupport , column , row [i ])) {
98+ SerializerUtils .serializeData (out , row [i ], column );
99+ }
62100 }
101+ clearRow ();
102+ rowCount ++;
63103 }
64104 }
65105
106+ @ Override
66107 public void setByte (String column , byte value ) {
67108 setValue (column , value );
68109 }
69110
111+ @ Override
70112 public void setByte (int colIndex , byte value ) {
71113 setValue (colIndex , value );
72114 }
73115
116+ @ Override
74117 public void setShort (String column , short value ) {
75118 setValue (column , value );
76119 }
77120
121+ @ Override
78122 public void setShort (int colIndex , short value ) {
79123 setValue (colIndex , value );
80124 }
81125
126+ @ Override
82127 public void setInteger (String column , int value ) {
83128 setValue (column , value );
84129 }
85130
131+ @ Override
86132 public void setInteger (int colIndex , int value ) {
87133 setValue (colIndex , value );
88134 }
89135
136+ @ Override
90137 public void setLong (String column , long value ) {
91138 setValue (column , value );
92139 }
93140
141+ @ Override
94142 public void setLong (int colIndex , long value ) {
95143 setValue (colIndex , value );
96144 }
97145
146+ @ Override
147+ public void setBigInteger (int colIndex , BigInteger value ) {
148+ setValue (colIndex , value );
149+ }
150+
151+ @ Override
152+ public void setBigInteger (String column , BigInteger value ) {
153+ setValue (column , value );
154+ }
155+
156+ @ Override
157+ public void setFloat (int colIndex , float value ) {
158+ setValue (colIndex , value );
159+ }
160+
161+ @ Override
162+ public void setFloat (String column , float value ) {
163+ setValue (column , value );
164+ }
165+
166+ @ Override
167+ public void setDouble (int colIndex , double value ) {
168+ setValue (colIndex , value );
169+ }
170+
171+ @ Override
172+ public void setDouble (String column , double value ) {
173+ setValue (column , value );
174+ }
175+
176+ @ Override
177+ public void setBigDecimal (int colIndex , BigDecimal value ) {
178+ setValue (colIndex , value );
179+ }
180+
181+ @ Override
182+ public void setBigDecimal (String column , BigDecimal value ) {
183+ setValue (column , value );
184+ }
185+
186+ @ Override
187+ public void setBoolean (int colIndex , boolean value ) {
188+ setValue (colIndex , value );
189+ }
190+
191+ @ Override
192+ public void setBoolean (String column , boolean value ) {
193+ setValue (column , value );
194+ }
195+
196+ @ Override
98197 public void setString (String column , String value ) {
99198 setValue (column , value );
100199 }
101200
201+ @ Override
102202 public void setString (int colIndex , String value ) {
103203 setValue (colIndex , value );
104204 }
105205
206+ @ Override
106207 public void setDate (String column , LocalDate value ) {
107208 setValue (column , value );
108209 }
109210
211+ @ Override
110212 public void setDate (int colIndex , LocalDate value ) {
111213 setValue (colIndex , value );
112214 }
113215
216+ @ Override
114217 public void setDateTime (String column , LocalDateTime value ) {
115218 setValue (column , value );
116219 }
117220
221+ @ Override
118222 public void setDateTime (int colIndex , LocalDateTime value ) {
119223 setValue (colIndex , value );
120224 }
121225
226+ @ Override
122227 public void setDateTime (String column , ZonedDateTime value ) {
123228 setValue (column , value );
124229 }
125230
231+ @ Override
126232 public void setDateTime (int colIndex , ZonedDateTime value ) {
127233 setValue (colIndex , value );
128234 }
129235
236+ @ Override
130237 public void setList (String column , List <?> value ) {
131238 setValue (column , value );
132239 }
133240
241+ @ Override
134242 public void setList (int colIndex , List <?> value ) {
135243 setValue (colIndex , value );
136244 }
245+
246+ @ Override
247+ public void setInputStream (int colIndex , InputStream in , long len ) {
248+ setValue (colIndex , new InputStreamHolder (in , len ));
249+ }
250+
251+ @ Override
252+ public void setInputStream (String column , InputStream in , long len ) {
253+ setValue (column , new InputStreamHolder (in , len ));
254+ }
255+
256+ @ Override
257+ public void setReader (int colIndex , Reader reader , long len ) {
258+ setValue (colIndex , new ReaderHolder (reader , len ));
259+ }
260+
261+ @ Override
262+ public void setReader (String column , Reader reader , long len ) {
263+ setValue (column , new ReaderHolder (reader , len ));
264+ }
265+
266+ private static class InputStreamHolder {
267+ final InputStream stream ;
268+ final long length ;
269+ InputStreamHolder (InputStream stream , long length ) {
270+ this .stream = stream ;
271+ this .length = length ;
272+ }
273+ }
274+
275+ private static class ReaderHolder {
276+ final Reader read ;
277+ final long length ;
278+ ReaderHolder (Reader reader , long length ) {
279+ this .read = reader ;
280+ this .length = length ;
281+ }
282+ }
137283}
0 commit comments