Skip to content

Commit da336a1

Browse files
committed
remove explicit reference of 32767 limit value and fix unit test
Signed-off-by: yeya24 <benye@amazon.com>
1 parent fd43830 commit da336a1

File tree

2 files changed

+18
-7
lines changed

2 files changed

+18
-7
lines changed

convert/convert.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ var DefaultConvertOpts = convertOpts{
5555
readConcurrency: runtime.GOMAXPROCS(0),
5656
writeConcurrency: 1,
5757
maxSamplesPerChunk: tsdb.DefaultSamplesPerChunk,
58-
maxNumColumns: parquet.MaxColumnIndex, // 32767 - max column index supported by parquet-go
58+
maxNumColumns: parquet.MaxColumnIndex, // max column index supported by parquet-go
5959
}
6060

6161
type Convertible interface {
@@ -283,7 +283,7 @@ func WithMaxSamplesPerChunk(samplesPerChunk int) ConvertOption {
283283
}
284284

285285
// WithMaxNumColumns sets the maximum number of columns allowed in a Parquet file.
286-
// Parquet has a limit of approximately 32767 columns (MaxInt16). When this limit is exceeded,
286+
// Parquet-go library has a limit of max column index supported. When this limit is exceeded,
287287
// the conversion will automatically shard the data into multiple files. This option allows
288288
// users to control the number of columns in the converted parquet file.
289289
//
@@ -292,7 +292,7 @@ func WithMaxSamplesPerChunk(samplesPerChunk int) ConvertOption {
292292
// 998 unique label names can be included in a single shard.
293293
//
294294
// Parameters:
295-
// - maxColumns: Maximum number of columns per Parquet file, including system columns (default: 32767)
295+
// - maxColumns: Maximum number of columns per Parquet file, including system columns
296296
//
297297
// Example:
298298
//
@@ -492,7 +492,8 @@ func singleTSDBRowReader(
492492
}
493493

494494
// If total unique label names exceed the limit, we need to shard based only on column limits.
495-
if len(allLabelNames)+systemColumns >= opts.maxNumColumns {
495+
// Equality is allowed (exactly maxNumColumns columns is fine).
496+
if len(allLabelNames)+systemColumns > opts.maxNumColumns {
496497
indexReaders := make([]blockIndexReader, len(blocks))
497498
defer func() {
498499
for _, indexReader := range indexReaders {
@@ -808,12 +809,12 @@ func shardSeries(
808809

809810
// Create a new shard if:
810811
// 1. Row-based sharding is enabled AND the row limit is reached, OR
811-
// 2. Adding this series would exceed the column limit
812+
// 2. Adding this series would exceed the column limit (equality is allowed)
812813
shouldCreateNewShard := false
813814
if opts.numRowGroups != math.MaxInt32 && uniqueCount >= rowsPerShard {
814815
shouldCreateNewShard = true
815816
}
816-
if len(labelColumns)+newLabelCount+systemColumns >= opts.maxNumColumns {
817+
if len(labelColumns)+newLabelCount+systemColumns > opts.maxNumColumns {
817818
shouldCreateNewShard = true
818819
}
819820

convert/convert_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1038,8 +1038,18 @@ func rowToSeries(t *testing.T, s *parquet.Schema, dec *schema.PrometheusParquetC
10381038
col := cols[colIdx][0]
10391039
label, ok := schema.ExtractLabelFromColumn(col)
10401040
if ok {
1041+
// Only include label columns that have actual values (not null/empty)
1042+
// This matches what's stored in s_col_indexes - only labels present in the series
1043+
if colVal.IsNull() {
1044+
continue
1045+
}
10411046
b.Add(label, colVal.String())
1042-
foundLblsIdxs = append(foundLblsIdxs, colIdx)
1047+
// Look up the ColumnIndex from the schema (same as when writing)
1048+
lc, ok := s.Lookup(col)
1049+
if !ok {
1050+
return nil, nil, fmt.Errorf("column %s not found in schema", col)
1051+
}
1052+
foundLblsIdxs = append(foundLblsIdxs, lc.ColumnIndex)
10431053
}
10441054

10451055
if schema.IsDataColumn(col) && dec != nil {

0 commit comments

Comments
 (0)