Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,30 @@ querier:
# [Experimental] If true, experimental promQL functions are enabled.
# CLI flag: -querier.enable-promql-experimental-functions
[enable_promql_experimental_functions: <boolean> | default = false]

# [Experimental] If true, querier will try to query the parquet files if
# available.
# CLI flag: -querier.enable-parquet-queryable
[enable_parquet_queryable: <boolean> | default = false]

# [Experimental] Maximum size of the Parquet queryable shard cache. 0 to
# disable.
# CLI flag: -querier.parquet-queryable-shard-cache-size
[parquet_queryable_shard_cache_size: <int> | default = 512]

# [Experimental] Parquet queryable's default block store to query. Valid
# options are tsdb and parquet. If it is set to tsdb, parquet queryable always
# fallback to store gateway.
# CLI flag: -querier.parquet-queryable-default-block-store
[parquet_queryable_default_block_store: <string> | default = "parquet"]

# [Experimental] Disable Parquet queryable to fallback queries to Store
# Gateway if the block is not available as Parquet files but available in
# TSDB. Setting this to true will disable the fallback and users can remove
# Store Gateway. But need to make sure Parquet files are created before it is
# queryable.
# CLI flag: -querier.parquet-queryable-fallback-disabled
[parquet_queryable_fallback_disabled: <boolean> | default = false]
```
### `blocks_storage_config`
Expand Down
129 changes: 129 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,110 @@ api:
# The compactor_config configures the compactor for the blocks storage.
[compactor: <compactor_config>]

parquet_converter:
# Maximum concurrent goroutines for downloading block metadata from object
# storage.
# CLI flag: -parquet-converter.meta-sync-concurrency
[meta_sync_concurrency: <int> | default = 20]

# How often to check for new TSDB blocks to convert to parquet format.
# CLI flag: -parquet-converter.conversion-interval
[conversion_interval: <duration> | default = 1m]

# Maximum number of time series per parquet row group. Larger values improve
# compression but may reduce performance during reads.
# CLI flag: -parquet-converter.max-rows-per-row-group
[max_rows_per_row_group: <int> | default = 1000000]

# Enable disk-based write buffering to reduce memory consumption during
# parquet file generation.
# CLI flag: -parquet-converter.file-buffer-enabled
[file_buffer_enabled: <boolean> | default = true]

# Local directory path for caching TSDB blocks during parquet conversion.
# CLI flag: -parquet-converter.data-dir
[data_dir: <string> | default = "./data"]

ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
# inmemory, memberlist, multi.
# CLI flag: -parquet-converter.ring.store
[store: <string> | default = "consul"]

# The prefix for the keys in the store. Should end with a /.
# CLI flag: -parquet-converter.ring.prefix
[prefix: <string> | default = "collectors/"]

dynamodb:
# Region to access dynamodb.
# CLI flag: -parquet-converter.ring.dynamodb.region
[region: <string> | default = ""]

# Table name to use on dynamodb.
# CLI flag: -parquet-converter.ring.dynamodb.table-name
[table_name: <string> | default = ""]

# Time to expire items on dynamodb.
# CLI flag: -parquet-converter.ring.dynamodb.ttl-time
[ttl: <duration> | default = 0s]

# Time to refresh local ring with information on dynamodb.
# CLI flag: -parquet-converter.ring.dynamodb.puller-sync-time
[puller_sync_time: <duration> | default = 1m]

# Maximum number of retries for DDB KV CAS.
# CLI flag: -parquet-converter.ring.dynamodb.max-cas-retries
[max_cas_retries: <int> | default = 10]

# Timeout of dynamoDbClient requests. Default is 2m.
# CLI flag: -parquet-converter.ring.dynamodb.timeout
[timeout: <duration> | default = 2m]

# The consul_config configures the consul client.
# The CLI flags prefix for this block config is: parquet-converter.ring
[consul: <consul_config>]

# The etcd_config configures the etcd client.
# The CLI flags prefix for this block config is: parquet-converter.ring
[etcd: <etcd_config>]

multi:
# Primary backend storage used by multi-client.
# CLI flag: -parquet-converter.ring.multi.primary
[primary: <string> | default = ""]

# Secondary backend storage used by multi-client.
# CLI flag: -parquet-converter.ring.multi.secondary
[secondary: <string> | default = ""]

# Mirror writes to secondary store.
# CLI flag: -parquet-converter.ring.multi.mirror-enabled
[mirror_enabled: <boolean> | default = false]

# Timeout for storing value to secondary store.
# CLI flag: -parquet-converter.ring.multi.mirror-timeout
[mirror_timeout: <duration> | default = 2s]

# Period at which to heartbeat to the ring. 0 = disabled.
# CLI flag: -parquet-converter.ring.heartbeat-period
[heartbeat_period: <duration> | default = 5s]

# The heartbeat timeout after which parquet-converter are considered
# unhealthy within the ring. 0 = never (timeout disabled).
# CLI flag: -parquet-converter.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# Time since last heartbeat before parquet-converter will be removed from
# ring. 0 to disable
# CLI flag: -parquet-converter.auto-forget-delay
[auto_forget_delay: <duration> | default = 2m]

# File path where tokens are stored. If empty, tokens are not stored at
# shutdown and restored at startup.
# CLI flag: -parquet-converter.ring.tokens-file-path
[tokens_file_path: <string> | default = ""]

# The store_gateway_config configures the store-gateway service used by the
# blocks storage.
[store_gateway: <store_gateway_config>]
Expand Down Expand Up @@ -2573,6 +2677,7 @@ The `consul_config` configures the consul client. The supported CLI flags `<pref
- `compactor.ring`
- `distributor.ha-tracker`
- `distributor.ring`
- `parquet-converter.ring`
- `ruler.ring`
- `store-gateway.sharding-ring`

Expand Down Expand Up @@ -2894,6 +2999,7 @@ The `etcd_config` configures the etcd client. The supported CLI flags `<prefix>`
- `compactor.ring`
- `distributor.ha-tracker`
- `distributor.ring`
- `parquet-converter.ring`
- `ruler.ring`
- `store-gateway.sharding-ring`

Expand Down Expand Up @@ -4328,6 +4434,29 @@ thanos_engine:
# [Experimental] If true, experimental promQL functions are enabled.
# CLI flag: -querier.enable-promql-experimental-functions
[enable_promql_experimental_functions: <boolean> | default = false]

# [Experimental] If true, querier will try to query the parquet files if
# available.
# CLI flag: -querier.enable-parquet-queryable
[enable_parquet_queryable: <boolean> | default = false]

# [Experimental] Maximum size of the Parquet queryable shard cache. 0 to
# disable.
# CLI flag: -querier.parquet-queryable-shard-cache-size
[parquet_queryable_shard_cache_size: <int> | default = 512]

# [Experimental] Parquet queryable's default block store to query. Valid options
# are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback
# to store gateway.
# CLI flag: -querier.parquet-queryable-default-block-store
[parquet_queryable_default_block_store: <string> | default = "parquet"]

# [Experimental] Disable Parquet queryable to fallback queries to Store Gateway
# if the block is not available as Parquet files but available in TSDB. Setting
# this to true will disable the fallback and users can remove Store Gateway. But
# need to make sure Parquet files are created before it is queryable.
# CLI flag: -querier.parquet-queryable-fallback-disabled
[parquet_queryable_fallback_disabled: <boolean> | default = false]
```
### `query_frontend_config`
Expand Down
17 changes: 14 additions & 3 deletions docs/guides/parquet-mode.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ Traditional TSDB format and Store Gateway architecture face significant challeng

### TSDB Format Limitations
- **Random Read Intensive**: TSDB index relies heavily on random reads, where each read becomes a separate request to object storage
- **Overfetching**: To reduce object storage requests, data needs to be merged, leading to higher bandwidth usage and overfetching
- **Overfetching**: To reduce object storage requests, data that are close together are merged in a sigle request, leading to higher bandwidth usage and overfetching
- **High Cardinality Bottlenecks**: Index postings can become a major bottleneck for high cardinality data

### Store Gateway Operational Challenges
- **Resource Intensive**: Requires significant local disk space for index headers and high memory utilization
- **Complex State Management**: Needs complex data sharding when scaling, often causing consistency and availability issues
- **Resource Intensive**: Requires significant local disk space for index headers and high memory usage
- **Complex State Management**: Requires complex data sharding when scaling, which often leads to consistency and availability issues, as well as long startup times
- **Query Inefficiencies**: Single-threaded block processing leads to high latency for large blocks

### Parquet Advantages
[Apache Parquet](https://parquet.apache.org/) addresses these challenges through:
- **Columnar Storage**: Data organized by columns reduces object storage requests as only specific columns need to be fetched
- **Data Locality**: Series that are likely to be queried together are co-located to minimize I/O operations
- **Stateless Design**: Rich file metadata eliminates the need for local state like index headers
- **Advanced Compression**: Reduces storage costs and improves query performance
- **Parallel Processing**: Row groups enable parallel processing for better scalability
Expand Down Expand Up @@ -132,6 +133,9 @@ querier:

# Default block store: "tsdb" or "parquet"
parquet_queryable_default_block_store: "parquet"

# Disable fallback to TSDB blocks when parquet files are not available
parquet_queryable_fallback_disabled: false
```
### Query Limits for Parquet
Expand Down Expand Up @@ -227,6 +231,7 @@ When parquet queryable is enabled:
* The bucket index now contains metadata indicating whether parquet files are available for querying
1. **Query Execution**: Queries prioritize parquet files when available, falling back to TSDB blocks when parquet conversion is incomplete
1. **Hybrid Queries**: Supports querying both parquet and TSDB blocks within the same query operation
1. **Fallback Control**: When `parquet_queryable_fallback_disabled` is set to `true`, queries will fail with a consistency check error if any required blocks are not available as parquet files, ensuring strict parquet-only querying

## Monitoring

Expand Down Expand Up @@ -276,6 +281,12 @@ cortex_parquet_queryable_cache_misses_total
2. **Cache Size**: Tune `parquet_queryable_shard_cache_size` based on available memory
3. **Concurrency**: Adjust `meta_sync_concurrency` based on object storage performance

### Fallback Configuration

1. **Gradual Migration**: Keep `parquet_queryable_fallback_disabled: false` (default) during initial deployment to allow queries to succeed even when parquet conversion is incomplete
2. **Strict Parquet Mode**: Set `parquet_queryable_fallback_disabled: true` only after ensuring all required blocks have been converted to parquet format
3. **Monitoring**: Monitor conversion progress and query failures before enabling strict parquet mode

## Limitations

1. **Experimental Feature**: Parquet mode is experimental and may have stability issues
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type Config struct {
QueryRange queryrange.Config `yaml:"query_range"`
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
Compactor compactor.Config `yaml:"compactor"`
ParquetConverter parquetconverter.Config `yaml:"parquet_converter" doc:"hidden"`
ParquetConverter parquetconverter.Config `yaml:"parquet_converter"`
StoreGateway storegateway.Config `yaml:"store_gateway"`
TenantFederation tenantfederation.Config `yaml:"tenant_federation"`

Expand Down
10 changes: 5 additions & 5 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ type Converter struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Ring.RegisterFlags(f)

f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Data directory in which to cache blocks and process conversions.")
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.")
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Max number of rows per parquet row group.")
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.")
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Whether to enable buffering the writes in disk to reduce memory utilization.")
f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Local directory path for caching TSDB blocks during parquet conversion.")
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Maximum concurrent goroutines for downloading block metadata from object storage.")
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.")
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "How often to check for new TSDB blocks to convert to parquet format.")
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Enable disk-based write buffering to reduce memory consumption during parquet file generation.")
}

func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ type Config struct {
EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"`

// Query Parquet files if available
EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"`
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"`
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store" doc:"hidden"`
ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled" doc:"hidden"`
EnableParquetQueryable bool `yaml:"enable_parquet_queryable"`
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size"`
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store"`
ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled"`
}

var (
Expand Down Expand Up @@ -144,8 +144,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.")
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.")
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.")
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "[Experimental] Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.")
f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.")
}

Expand Down
Loading