Skip to content

Commit 0d469a3

Browse files
committed
remove explicit reference of 32767 limit value and fix unit test
Signed-off-by: yeya24 <benye@amazon.com>
1 parent fd43830 commit 0d469a3

File tree

2 files changed

+30
-19
lines changed

2 files changed

+30
-19
lines changed

convert/convert.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ var DefaultConvertOpts = convertOpts{
5555
readConcurrency: runtime.GOMAXPROCS(0),
5656
writeConcurrency: 1,
5757
maxSamplesPerChunk: tsdb.DefaultSamplesPerChunk,
58-
maxNumColumns: parquet.MaxColumnIndex, // 32767 - max column index supported by parquet-go
58+
maxNumColumns: parquet.MaxColumnIndex, // max column index supported by parquet-go
5959
}
6060

6161
type Convertible interface {
@@ -283,7 +283,7 @@ func WithMaxSamplesPerChunk(samplesPerChunk int) ConvertOption {
283283
}
284284

285285
// WithMaxNumColumns sets the maximum number of columns allowed in a Parquet file.
286-
// Parquet has a limit of approximately 32767 columns (MaxInt16). When this limit is exceeded,
286+
// Parquet-go library has a limit of max column index supported. When this limit is exceeded,
287287
// the conversion will automatically shard the data into multiple files. This option allows
288288
// users to control the number of columns in the converted parquet file.
289289
//
@@ -292,7 +292,7 @@ func WithMaxSamplesPerChunk(samplesPerChunk int) ConvertOption {
292292
// 998 unique label names can be included in a single shard.
293293
//
294294
// Parameters:
295-
// - maxColumns: Maximum number of columns per Parquet file, including system columns (default: 32767)
295+
// - maxColumns: Maximum number of columns per Parquet file, including system columns
296296
//
297297
// Example:
298298
//
@@ -492,7 +492,8 @@ func singleTSDBRowReader(
492492
}
493493

494494
// If total unique label names exceed the limit, we need to shard based only on column limits.
495-
if len(allLabelNames)+systemColumns >= opts.maxNumColumns {
495+
// Equality is allowed (exactly maxNumColumns columns is fine).
496+
if len(allLabelNames)+systemColumns > opts.maxNumColumns {
496497
indexReaders := make([]blockIndexReader, len(blocks))
497498
defer func() {
498499
for _, indexReader := range indexReaders {
@@ -808,12 +809,12 @@ func shardSeries(
808809

809810
// Create a new shard if:
810811
// 1. Row-based sharding is enabled AND the row limit is reached, OR
811-
// 2. Adding this series would exceed the column limit
812+
// 2. Adding this series would exceed the column limit (equality is allowed)
812813
shouldCreateNewShard := false
813814
if opts.numRowGroups != math.MaxInt32 && uniqueCount >= rowsPerShard {
814815
shouldCreateNewShard = true
815816
}
816-
if len(labelColumns)+newLabelCount+systemColumns >= opts.maxNumColumns {
817+
if len(labelColumns)+newLabelCount+systemColumns > opts.maxNumColumns {
817818
shouldCreateNewShard = true
818819
}
819820

convert/convert_test.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -904,36 +904,36 @@ func Test_TooManyColumns(t *testing.T) {
904904
name: "with_numRowGroups_2_shards",
905905
withNumRowGroups: true,
906906
description: "Uses shardedTSDBRowReaders path when numRowGroups is set, creates 2 shards",
907-
maxNumColumns: 1000, // 998 label columns + 2 system columns
908-
uniqueLabelNames: 1200, // Exceeds maxNumColumns to trigger sharding
909-
labelsPerSeries: 200, // Each series will have 200 unique labels (plus __name__)
907+
maxNumColumns: 100, // 98 label columns + 2 system columns
908+
uniqueLabelNames: 150, // Exceeds maxNumColumns to trigger sharding
909+
labelsPerSeries: 50, // Each series will have 50 unique labels (plus __name__)
910910
minShards: 2,
911911
},
912912
{
913913
name: "without_numRowGroups_2_shards",
914914
withNumRowGroups: false,
915915
description: "Uses singleTSDBRowReader path when numRowGroups is not set, creates 2 shards",
916-
maxNumColumns: 1000, // 998 label columns + 2 system columns
917-
uniqueLabelNames: 1200, // Exceeds maxNumColumns to trigger sharding
918-
labelsPerSeries: 200, // Each series will have 200 unique labels (plus __name__)
916+
maxNumColumns: 100, // 98 label columns + 2 system columns
917+
uniqueLabelNames: 150, // Exceeds maxNumColumns to trigger sharding
918+
labelsPerSeries: 50, // Each series will have 50 unique labels (plus __name__)
919919
minShards: 2,
920920
},
921921
{
922922
name: "with_numRowGroups_3_shards",
923923
withNumRowGroups: true,
924924
description: "Uses shardedTSDBRowReaders path when numRowGroups is set, creates 3+ shards",
925-
maxNumColumns: 1000, // 998 label columns + 2 system columns
926-
uniqueLabelNames: 2500, // Will require at least 3 shards (2500 / 998 ≈ 2.5)
927-
labelsPerSeries: 300, // Each series will have 300 unique labels (plus __name__)
925+
maxNumColumns: 100, // 98 label columns + 2 system columns
926+
uniqueLabelNames: 250, // Will require at least 3 shards (250 / 98 ≈ 2.55)
927+
labelsPerSeries: 80, // Each series will have 80 unique labels (plus __name__)
928928
minShards: 3,
929929
},
930930
{
931931
name: "without_numRowGroups_3_shards",
932932
withNumRowGroups: false,
933933
description: "Uses singleTSDBRowReader path when numRowGroups is not set, creates 3+ shards",
934-
maxNumColumns: 1000, // 998 label columns + 2 system columns
935-
uniqueLabelNames: 2500, // Will require at least 3 shards (2500 / 998 ≈ 2.5)
936-
labelsPerSeries: 300, // Each series will have 300 unique labels (plus __name__)
934+
maxNumColumns: 100, // 98 label columns + 2 system columns
935+
uniqueLabelNames: 250, // Will require at least 3 shards (250 / 98 ≈ 2.55)
936+
labelsPerSeries: 80, // Each series will have 80 unique labels (plus __name__)
937937
minShards: 3,
938938
},
939939
}
@@ -1038,8 +1038,18 @@ func rowToSeries(t *testing.T, s *parquet.Schema, dec *schema.PrometheusParquetC
10381038
col := cols[colIdx][0]
10391039
label, ok := schema.ExtractLabelFromColumn(col)
10401040
if ok {
1041+
// Only include label columns that have actual values (not null/empty)
1042+
// This matches what's stored in s_col_indexes - only labels present in the series
1043+
if colVal.IsNull() {
1044+
continue
1045+
}
10411046
b.Add(label, colVal.String())
1042-
foundLblsIdxs = append(foundLblsIdxs, colIdx)
1047+
// Look up the ColumnIndex from the schema (same as when writing)
1048+
lc, ok := s.Lookup(col)
1049+
if !ok {
1050+
return nil, nil, fmt.Errorf("column %s not found in schema", col)
1051+
}
1052+
foundLblsIdxs = append(foundLblsIdxs, lc.ColumnIndex)
10431053
}
10441054

10451055
if schema.IsDataColumn(col) && dec != nil {

0 commit comments

Comments
 (0)