1919-define (TS_COLUMN , <<" greptime_timestamp" >>).
2020-define (DEFAULT_DBNAME , " greptime-public" ).
2121
22+ % % @doc Converts a batch of data points into GreptimeDB gRPC insert requests.
23+ % %
24+ % % This function transforms Erlang data structures into the protobuf format
25+ % % required by GreptimeDB's gRPC API for batch inserts.
26+ % %
27+ % % @param Client A map containing client options, including:
28+ % % - `cli_opts`: List of options such as auth, dbname, timeunit
29+ % %
30+ % % @param TableAndPoints A list of tuples where each tuple contains:
31+ % % - Metric: Can be one of:
32+ % % * Binary/string/atom table name
33+ % % * {DbName, TableName} tuple for backward compatibility
34+ % % * Map with keys: table (required), dbname, timeunit
35+ % % - Points: List of point maps, each containing:
36+ % % * fields: Map of field names to values (stored as metrics)
37+ % % * tags: Map of tag names to values (stored as metadata)
38+ % % * timestamp: Integer timestamp or map with typed value
39+ % %
40+ % % @returns A map containing:
41+ % % - `header`: Map with dbname and optional authorization
42+ % % - `request`: Tuple {inserts, #{inserts => [...]}} with column data
43+ % %
44+ % % Example:
45+ % % ```
46+ % % Client = #{cli_opts => [{auth, {basic, #{username => "user", password => "pass"}}},
47+ % % {timeunit, ms}]},
48+ % % Points = [#{fields => #{<<"temperature">> => 25.5},
49+ % % tags => #{<<"location">> => <<"room1">>},
50+ % % timestamp => 1619775142098}],
51+ % % Request = insert_requests(Client, [{<<"sensors">>, Points}]).
52+ % % '''
2253insert_requests (Client , TableAndPoints ) ->
2354 insert_requests (Client , TableAndPoints , unknown , []).
2455
@@ -32,19 +63,27 @@ insert_requests(#{cli_opts := Options} = _Client, [], DbName, Inserts) ->
3263 #{dbname => DbName , authorization => #{auth_scheme => Scheme }}
3364 end ,
3465 #{header => Header , request => {inserts , #{inserts => Inserts }}};
35- insert_requests (#{cli_opts := Options } = Client , [{Metric , Points } | T ], PrevDbName , Inserts ) ->
36- {DbName , Insert } = insert_request (Options , metric (Options , Metric ), Points ),
66+ insert_requests (#{cli_opts := Options } = Client ,
67+ [{Metric , Points } | T ],
68+ PrevDbName ,
69+ Inserts ) ->
70+ {DbName , Insert } = make_insert_request (Options , metric (Options , Metric ), Points ),
3771 case PrevDbName of
3872 unknown ->
3973 insert_requests (Client , T , DbName , [Insert | Inserts ]);
4074 Name when Name == DbName ->
4175 insert_requests (Client , T , Name , [Insert | Inserts ])
4276 end .
4377
44- insert_request (_Options , #{dbname := DbName , table := Table , timeunit := Timeunit }, Points ) ->
78+ make_insert_request (_Options ,
79+ #{dbname := DbName ,
80+ table := Table ,
81+ timeunit := Timeunit },
82+ Points ) ->
4583 RowCount = length (Points ),
46- Columns = lists :map (fun (Column ) -> pad_null_mask (Column , RowCount ) end ,
47- collect_columns (Timeunit , Points )),
84+ Columns =
85+ lists :map (fun (Column ) -> pad_null_mask (Column , RowCount ) end ,
86+ collect_columns (Timeunit , Points )),
4887 {DbName ,
4988 #{table_name => Table ,
5089 columns => Columns ,
@@ -61,18 +100,28 @@ default_metric(Options) ->
61100 #{dbname => proplists :get_value (dbname , Options , ? DEFAULT_DBNAME ),
62101 timeunit => proplists :get_value (timeunit , Options , ms )}.
63102
64- % % table is required
65103metric_with_default (Default , #{table := _ } = Metric ) ->
66104 maps :merge (Default , Metric );
67- % % backward compatibility
68105metric_with_default (Default , {DbName , Table }) ->
69106 Default #{dbname => DbName , table => Table };
70- metric_with_default (Default , Table ) when is_atom (Table ); is_list (Table ); is_binary (Table ) ->
107+ metric_with_default (Default , Table )
108+ when is_atom (Table ); is_list (Table ); is_binary (Table ) ->
71109 Default #{table => Table }.
72110
111+ % % @private
112+ % % @doc Collects and merges columns from all data points.
113+ % %
114+ % % Iterates through all points, extracting columns from each and merging them
115+ % % into a unified column structure. Handles sparse data by tracking null values.
116+ % %
117+ % % @param Timeunit Time unit for timestamp columns (ns, us, ms, s)
118+ % % @param Points List of data points
119+ % % @returns List of merged column structures
73120collect_columns (Timeunit , Points ) ->
74121 collect_columns (Timeunit , Points , []).
75122
123+ % % @private
124+ % % @doc Recursive helper for collect_columns/2.
76125collect_columns (_Timeunit , [], Columns ) ->
77126 merge_columns (Columns );
78127collect_columns (Timeunit , [Point | T ], Columns ) ->
@@ -145,15 +194,28 @@ merge_values(#{date_values := V1} = L, #{date_values := V2}) ->
145194 L #{date_values := [V2 | V1 ]};
146195merge_values (#{timestamp_second_values := V1 } = L , #{timestamp_second_values := V2 }) ->
147196 L #{timestamp_second_values := [V2 | V1 ]};
148- merge_values (#{timestamp_millisecond_values := V1 } = L , #{timestamp_millisecond_values := V2 }) ->
197+ merge_values (#{timestamp_millisecond_values := V1 } = L ,
198+ #{timestamp_millisecond_values := V2 }) ->
149199 L #{timestamp_millisecond_values := [V2 | V1 ]};
150- merge_values (#{timestamp_microsecond_values := V1 } = L , #{timestamp_microsecond_values := V2 }) ->
200+ merge_values (#{timestamp_microsecond_values := V1 } = L ,
201+ #{timestamp_microsecond_values := V2 }) ->
151202 L #{timestamp_microsecond_values := [V2 | V1 ]};
152- merge_values (#{timestamp_nanosecond_values := V1 } = L , #{timestamp_nanosecond_values := V2 }) ->
203+ merge_values (#{timestamp_nanosecond_values := V1 } = L ,
204+ #{timestamp_nanosecond_values := V2 }) ->
153205 L #{timestamp_nanosecond_values := [V2 | V1 ]};
154206merge_values (V1 , V2 ) when map_size (V1 ) == 0 ->
155207 V2 .
156208
209+ % % @private
210+ % % @doc Pads null mask to byte boundary or removes it if not needed.
211+ % %
212+ % % If a column has values for all rows, the null mask is removed.
213+ % % Otherwise, the null mask is padded with zeros to align to byte boundaries
214+ % % as required by the GreptimeDB protocol.
215+ % %
216+ % % @param Column Column map with values and null_mask
217+ % % @param RowCount Total number of rows in the batch
218+ % % @returns Updated column with adjusted null mask
157219pad_null_mask (#{values := Values , null_mask := NullMask } = Column , RowCount ) ->
158220 case values_size (Values ) of
159221 RowCount ->
@@ -164,6 +226,15 @@ pad_null_mask(#{values := Values, null_mask := NullMask} = Column, RowCount) ->
164226 Column #{null_mask := <<0 :PadBits , NullMask /bits >>}
165227 end .
166228
229+ % % @private
230+ % % @doc Converts a single point into column structures.
231+ % %
232+ % % Transforms fields, tags, and timestamp from a point into column maps
233+ % % keyed by column name. Each column includes semantic type and values.
234+ % %
235+ % % @param Timeunit Time unit for timestamp conversion
236+ % % @param Point Map with fields, tags, and timestamp
237+ % % @returns Map of column_name -> column structure
167238convert_columns (Timeunit ,
168239 #{fields := Fields ,
169240 tags := Tags ,
@@ -174,6 +245,17 @@ convert_columns(Timeunit,
174245 maps :put (
175246 maps :get (column_name , TsColumn ), TsColumn , maps :merge (FieldColumns , TagColumns )).
176247
248+ % % @private
249+ % % @doc Merges a column with data from the next row.
250+ % %
251+ % % Updates the null mask to track whether the column has a value in this row:
252+ % % - 1 bit: column has a value in this row
253+ % % - 0 bit: column is null/missing in this row
254+ % %
255+ % % @param Column Existing column accumulator with null mask
256+ % % @param Name Column name to look for in NextColumns
257+ % % @param NextColumns Map of columns from the current row being processed
258+ % % @returns Updated column with merged values and extended null mask
177259merge_column (#{null_mask := NullMask } = Column , Name , NextColumns ) ->
178260 case NextColumns of
179261 #{Name := NewColumn } ->
@@ -190,10 +272,28 @@ merge_column(#{null_mask := NullMask} = Column, Name, NextColumns) ->
190272 Column #{null_mask := <<NullMask /bits , 0 :1 /integer >>}
191273 end .
192274
275+ % % @private
276+ % % @doc Merges columns from a new row into the accumulator.
277+ % %
278+ % % This is the fold function used by merge_columns/1. It processes one row
279+ % % at a time, calling merge_column/3 for each column in the accumulator.
280+ % %
281+ % % @param NextColumns Columns from the current row
282+ % % @param Columns Accumulator of {Name, Column} tuples
283+ % % @returns Updated accumulator with merged columns
193284merge_columns (NextColumns , Columns ) ->
194285 lists :map (fun ({Name , Column }) -> {Name , merge_column (Column , Name , NextColumns )} end ,
195286 Columns ).
196287
288+ % % @private
289+ % % @doc Flattens nested lists of values.
290+ % %
291+ % % Used to flatten the accumulated value lists after merging.
292+ % % Values are accumulated as nested lists during merging and need
293+ % % to be flattened into a single list for the final column structure.
294+ % %
295+ % % @param L Nested list structure
296+ % % @returns Flattened list
197297flatten ([H ]) ->
198298 [H ];
199299flatten ([[H ] | T ]) ->
@@ -206,6 +306,34 @@ flatten([H], Acc) ->
206306flatten ([[H ] | T ], Acc ) ->
207307 flatten (T , [H | Acc ]).
208308
309+ % % @private
310+ % % @doc Merges columns from all rows into a unified structure.
311+ % %
312+ % % Creates a column for every unique column name across all rows,
313+ % % tracking null values where columns are missing in specific rows.
314+ % %
315+ % % Example with 3 rows having different columns:
316+ % % ```
317+ % % Row 1: #{"temp" => 20, "humidity" => 60}
318+ % % Row 2: #{"temp" => 22} % missing humidity
319+ % % Row 3: #{"temp" => 21, "pressure" => 1013} % missing humidity, has pressure
320+ % %
321+ % % Result after merging:
322+ % % - temp: values=[20,22,21], null_mask=<<1:1,1:1,1:1>> (all rows have it)
323+ % % - humidity: values=[60], null_mask=<<1:1,0:1,0:1>> (only row 1 has it)
324+ % % - pressure: values=[1013], null_mask=<<0:1,0:1,1:1>> (only row 3 has it)
325+ % % ```
326+ % %
327+ % % The null mask bitfield: 1=has value, 0=null/missing, read left-to-right for rows.
328+ % %
329+ % % The process:
330+ % % 1. Collects all unique column names (temp, humidity, pressure)
331+ % % 2. Creates empty columns with null masks
332+ % % 3. Merges data from each row using merge_columns/2 and merge_column/3
333+ % % 4. Flattens the accumulated value lists
334+ % %
335+ % % @param Columns List of column maps from all rows
336+ % % @returns List of merged columns with complete data and null masks
209337merge_columns (Columns ) ->
210338 Names =
211339 sets :to_list (
@@ -225,10 +353,20 @@ ts_column(_Timeunit, Ts) when is_map(Ts) ->
225353 maps :merge (#{column_name => ? TS_COLUMN , semantic_type => 'TIMESTAMP' }, Ts );
226354ts_column (Timeunit , Ts ) ->
227355 TsValue = ts_value (Timeunit , Ts ),
228- TsValue #{
229- column_name => ? TS_COLUMN ,
230- semantic_type => 'TIMESTAMP' }.
356+ TsValue #{column_name => ? TS_COLUMN , semantic_type => 'TIMESTAMP' }.
231357
358+ % % @private
359+ % % @doc Converts timestamp to appropriate typed value based on time unit.
360+ % %
361+ % % Supports both short and long time unit names:
362+ % % - ns/nanosecond
363+ % % - us/microsecond
364+ % % - ms/millisecond (default)
365+ % % - s/second
366+ % %
367+ % % @param Timeunit Time unit specification
368+ % % @param Ts Timestamp value
369+ % % @returns Map with typed timestamp value and datatype
232370ts_value (ns , Ts ) ->
233371 greptimedb_values :timestamp_nanosecond_value (Ts );
234372ts_value (nanosecond , Ts ) ->
@@ -246,7 +384,6 @@ ts_value(s, Ts) ->
246384ts_value (second , Ts ) ->
247385 greptimedb_values :timestamp_second_value (Ts ).
248386
249-
250387field_column (Name , V ) when is_map (V ) ->
251388 maps :merge (#{column_name => Name , semantic_type => 'FIELD' }, V );
252389field_column (Name , V ) ->
0 commit comments