diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 99c1fe2e521..855ff5c9028 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1418,6 +1418,255 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.metadata-cache.partitioned-groups-list-ttl [partitioned_groups_list_ttl: | default = 0s] + parquet_labels_cache: + # The parquet labels cache backend type. Single or Multiple cache backend + # can be provided. Supported values in single cache: memcached, redis, + # inmemory, and '' (disable). Supported values in multi level cache: a + # comma-separated list of (inmemory, memcached, redis) + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.backend + [backend: | default = ""] + + inmemory: + # Maximum size in bytes of in-memory parquet-labels cache used (shared + # between all tenants). + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + + memcached: + # Comma separated list of memcached addresses. Supported prefixes are: + # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV + # query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup + # made after that). + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses + [addresses: | default = ""] + + # The socket read/write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.timeout + [timeout: | default = 100ms] + + # The maximum number of idle connections that will be maintained per + # address. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-idle-connections + [max_idle_connections: | default = 16] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of concurrent connections running get operations. + # If set to 0, concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum number of keys a single underlying get operation should + # run. If more keys are specified, internally keys are split into + # multiple batches and fetched concurrently, honoring the max + # concurrency. If set to 0, the max batch size is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-get-multi-batch-size + [max_get_multi_batch_size: | default = 0] + + # The maximum size of an item stored in memcached. Bigger items are not + # stored. If set to 0, no maximum size is enforced. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-item-size + [max_item_size: | default = 1048576] + + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.auto-discovery + [auto_discovery: | default = false] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit + # breaker becomes half-open. If set to 0, by default open duration is + # 60 seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should + # open. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + redis: + # Comma separated list of redis addresses. Supported prefixes are: dns+ + # (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.addresses + [addresses: | default = ""] + + # Redis username. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.username + [username: | default = ""] + + # Redis password. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.password + [password: | default = ""] + + # Database to be selected after connecting to the server. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.db + [db: | default = 0] + + # Specifies the master's name. Must be not empty for Redis Sentinel. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.master-name + [master_name: | default = ""] + + # The maximum number of concurrent GetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum size per batch for mget. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.get-multi-batch-size + [get_multi_batch_size: | default = 100] + + # The maximum number of concurrent SetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.max-set-multi-concurrency + [max_set_multi_concurrency: | default = 100] + + # The maximum size per batch for pipeline set. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-multi-batch-size + [set_multi_batch_size: | default = 100] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # Client dial timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.dial-timeout + [dial_timeout: | default = 5s] + + # Client read timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.read-timeout + [read_timeout: | default = 3s] + + # Client write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.write-timeout + [write_timeout: | default = 3s] + + # Whether to enable tls for redis connection. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-enabled + [tls_enabled: | default = false] + + # Path to the client certificate file, which will be used for + # authenticating with the server. Also requires the key path to be + # configured. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-cert-path + [tls_cert_path: | default = ""] + + # Path to the key file for the client certificate. Also requires the + # client certificate to be configured. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-key-path + [tls_key_path: | default = ""] + + # Path to the CA certificates file to validate server certificate + # against. If not set, the host's root CA certificates are used. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-ca-path + [tls_ca_path: | default = ""] + + # Override the expected name on the server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-server-name + [tls_server_name: | default = ""] + + # Skip validating server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-insecure-skip-verify + [tls_insecure_skip_verify: | default = false] + + # If not zero then client-side caching is enabled. Client-side caching + # is when data is stored in memory instead of fetching data each time. + # See https://redis.io/docs/manual/client-side-caching/ for more info. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.cache-size + [cache_size: | default = 0] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit + # breaker becomes half-open. If set to 0, by default open duration is + # 60 seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should + # open. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + multilevel: + # The maximum number of concurrent asynchronous operations can occur + # when backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + + # Size of each subrange that bucket object is split into for better + # caching. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.subrange-size + [subrange_size: | default = 16000] + + # Maximum number of sub-GetRange requests that a single GetRange request + # can be split into when fetching parquet labels file. Zero or negative + # value = unlimited number of sub-requests. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.max-get-range-requests + [max_get_range_requests: | default = 3] + + # TTL for caching object attributes for parquet labels file. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.attributes-ttl + [attributes_ttl: | default = 168h] + + # TTL for caching individual subranges. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.subrange-ttl + [subrange_ttl: | default = 24h] + # Maximum number of entries in the regex matchers cache. 0 to disable. # CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items [matchers_cache_max_items: | default = 0] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 8081e4fc822..f3ec87e1b53 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1510,6 +1510,255 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.metadata-cache.partitioned-groups-list-ttl [partitioned_groups_list_ttl: | default = 0s] + parquet_labels_cache: + # The parquet labels cache backend type. Single or Multiple cache backend + # can be provided. Supported values in single cache: memcached, redis, + # inmemory, and '' (disable). Supported values in multi level cache: a + # comma-separated list of (inmemory, memcached, redis) + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.backend + [backend: | default = ""] + + inmemory: + # Maximum size in bytes of in-memory parquet-labels cache used (shared + # between all tenants). + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + + memcached: + # Comma separated list of memcached addresses. Supported prefixes are: + # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV + # query, dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup + # made after that). + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses + [addresses: | default = ""] + + # The socket read/write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.timeout + [timeout: | default = 100ms] + + # The maximum number of idle connections that will be maintained per + # address. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-idle-connections + [max_idle_connections: | default = 16] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of concurrent connections running get operations. + # If set to 0, concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum number of keys a single underlying get operation should + # run. If more keys are specified, internally keys are split into + # multiple batches and fetched concurrently, honoring the max + # concurrency. If set to 0, the max batch size is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-get-multi-batch-size + [max_get_multi_batch_size: | default = 0] + + # The maximum size of an item stored in memcached. Bigger items are not + # stored. If set to 0, no maximum size is enforced. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-item-size + [max_item_size: | default = 1048576] + + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.auto-discovery + [auto_discovery: | default = false] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit + # breaker becomes half-open. If set to 0, by default open duration is + # 60 seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should + # open. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + redis: + # Comma separated list of redis addresses. Supported prefixes are: dns+ + # (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.addresses + [addresses: | default = ""] + + # Redis username. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.username + [username: | default = ""] + + # Redis password. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.password + [password: | default = ""] + + # Database to be selected after connecting to the server. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.db + [db: | default = 0] + + # Specifies the master's name. Must be not empty for Redis Sentinel. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.master-name + [master_name: | default = ""] + + # The maximum number of concurrent GetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum size per batch for mget. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.get-multi-batch-size + [get_multi_batch_size: | default = 100] + + # The maximum number of concurrent SetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.max-set-multi-concurrency + [max_set_multi_concurrency: | default = 100] + + # The maximum size per batch for pipeline set. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-multi-batch-size + [set_multi_batch_size: | default = 100] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # Client dial timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.dial-timeout + [dial_timeout: | default = 5s] + + # Client read timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.read-timeout + [read_timeout: | default = 3s] + + # Client write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.write-timeout + [write_timeout: | default = 3s] + + # Whether to enable tls for redis connection. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-enabled + [tls_enabled: | default = false] + + # Path to the client certificate file, which will be used for + # authenticating with the server. Also requires the key path to be + # configured. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-cert-path + [tls_cert_path: | default = ""] + + # Path to the key file for the client certificate. Also requires the + # client certificate to be configured. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-key-path + [tls_key_path: | default = ""] + + # Path to the CA certificates file to validate server certificate + # against. If not set, the host's root CA certificates are used. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-ca-path + [tls_ca_path: | default = ""] + + # Override the expected name on the server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-server-name + [tls_server_name: | default = ""] + + # Skip validating server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-insecure-skip-verify + [tls_insecure_skip_verify: | default = false] + + # If not zero then client-side caching is enabled. Client-side caching + # is when data is stored in memory instead of fetching data each time. + # See https://redis.io/docs/manual/client-side-caching/ for more info. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.cache-size + [cache_size: | default = 0] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit + # breaker becomes half-open. If set to 0, by default open duration is + # 60 seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should + # open. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + multilevel: + # The maximum number of concurrent asynchronous operations can occur + # when backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + + # Size of each subrange that bucket object is split into for better + # caching. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.subrange-size + [subrange_size: | default = 16000] + + # Maximum number of sub-GetRange requests that a single GetRange request + # can be split into when fetching parquet labels file. Zero or negative + # value = unlimited number of sub-requests. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.max-get-range-requests + [max_get_range_requests: | default = 3] + + # TTL for caching object attributes for parquet labels file. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.attributes-ttl + [attributes_ttl: | default = 168h] + + # TTL for caching individual subranges. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.subrange-ttl + [subrange_ttl: | default = 24h] + # Maximum number of entries in the regex matchers cache. 0 to disable. # CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items [matchers_cache_max_items: | default = 0] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index acd03b3de78..6d45f8256df 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2088,6 +2088,252 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.metadata-cache.partitioned-groups-list-ttl [partitioned_groups_list_ttl: | default = 0s] + parquet_labels_cache: + # The parquet labels cache backend type. Single or Multiple cache backend + # can be provided. Supported values in single cache: memcached, redis, + # inmemory, and '' (disable). Supported values in multi level cache: a + # comma-separated list of (inmemory, memcached, redis) + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.backend + [backend: | default = ""] + + inmemory: + # Maximum size in bytes of in-memory parquet-labels cache used (shared + # between all tenants). + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.inmemory.max-size-bytes + [max_size_bytes: | default = 1073741824] + + memcached: + # Comma separated list of memcached addresses. Supported prefixes are: + # dns+ (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses + [addresses: | default = ""] + + # The socket read/write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.timeout + [timeout: | default = 100ms] + + # The maximum number of idle connections that will be maintained per + # address. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-idle-connections + [max_idle_connections: | default = 16] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of concurrent connections running get operations. If + # set to 0, concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum number of keys a single underlying get operation should run. + # If more keys are specified, internally keys are split into multiple + # batches and fetched concurrently, honoring the max concurrency. If set + # to 0, the max batch size is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-get-multi-batch-size + [max_get_multi_batch_size: | default = 0] + + # The maximum size of an item stored in memcached. Bigger items are not + # stored. If set to 0, no maximum size is enforced. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.max-item-size + [max_item_size: | default = 1048576] + + # Use memcached auto-discovery mechanism provided by some cloud provider + # like GCP and AWS + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.auto-discovery + [auto_discovery: | default = false] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit breaker + # becomes half-open. If set to 0, by default open duration is 60 + # seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.memcached.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + redis: + # Comma separated list of redis addresses. Supported prefixes are: dns+ + # (looked up as an A/AAAA query), dnssrv+ (looked up as a SRV query, + # dnssrvnoa+ (looked up as a SRV query, with no A/AAAA lookup made after + # that). + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.addresses + [addresses: | default = ""] + + # Redis username. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.username + [username: | default = ""] + + # Redis password. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.password + [password: | default = ""] + + # Database to be selected after connecting to the server. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.db + [db: | default = 0] + + # Specifies the master's name. Must be not empty for Redis Sentinel. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.master-name + [master_name: | default = ""] + + # The maximum number of concurrent GetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.max-get-multi-concurrency + [max_get_multi_concurrency: | default = 100] + + # The maximum size per batch for mget. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.get-multi-batch-size + [get_multi_batch_size: | default = 100] + + # The maximum number of concurrent SetMulti() operations. If set to 0, + # concurrency is unlimited. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.max-set-multi-concurrency + [max_set_multi_concurrency: | default = 100] + + # The maximum size per batch for pipeline set. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-multi-batch-size + [set_multi_batch_size: | default = 100] + + # The maximum number of concurrent asynchronous operations can occur. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # Client dial timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.dial-timeout + [dial_timeout: | default = 5s] + + # Client read timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.read-timeout + [read_timeout: | default = 3s] + + # Client write timeout. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.write-timeout + [write_timeout: | default = 3s] + + # Whether to enable tls for redis connection. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-enabled + [tls_enabled: | default = false] + + # Path to the client certificate file, which will be used for + # authenticating with the server. Also requires the key path to be + # configured. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-cert-path + [tls_cert_path: | default = ""] + + # Path to the key file for the client certificate. Also requires the + # client certificate to be configured. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-key-path + [tls_key_path: | default = ""] + + # Path to the CA certificates file to validate server certificate against. + # If not set, the host's root CA certificates are used. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-ca-path + [tls_ca_path: | default = ""] + + # Override the expected name on the server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-server-name + [tls_server_name: | default = ""] + + # Skip validating server certificate. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.tls-insecure-skip-verify + [tls_insecure_skip_verify: | default = false] + + # If not zero then client-side caching is enabled. Client-side caching is + # when data is stored in memory instead of fetching data each time. See + # https://redis.io/docs/manual/client-side-caching/ for more info. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.cache-size + [cache_size: | default = 0] + + set_async_circuit_breaker_config: + # If true, enable circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.enabled + [enabled: | default = false] + + # Maximum number of requests allowed to pass through when the circuit + # breaker is half-open. If set to 0, by default it allows 1 request. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.half-open-max-requests + [half_open_max_requests: | default = 10] + + # Period of the open state after which the state of the circuit breaker + # becomes half-open. If set to 0, by default open duration is 60 + # seconds. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.open-duration + [open_duration: | default = 5s] + + # Minimal requests to trigger the circuit breaker. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.min-requests + [min_requests: | default = 50] + + # Consecutive failures to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.consecutive-failures + [consecutive_failures: | default = 5] + + # Failure percentage to determine if the circuit breaker should open. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.redis.set-async.circuit-breaker.failure-percent + [failure_percent: | default = 0.05] + + multilevel: + # The maximum number of concurrent asynchronous operations can occur when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.multilevel.max-async-concurrency + [max_async_concurrency: | default = 3] + + # The maximum number of enqueued asynchronous operations allowed when + # backfilling cache items. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.multilevel.max-async-buffer-size + [max_async_buffer_size: | default = 10000] + + # The maximum number of items to backfill per asynchronous operation. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.multilevel.max-backfill-items + [max_backfill_items: | default = 10000] + + # Size of each subrange that bucket object is split into for better caching. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.subrange-size + [subrange_size: | default = 16000] + + # Maximum number of sub-GetRange requests that a single GetRange request can + # be split into when fetching parquet labels file. Zero or negative value = + # unlimited number of sub-requests. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.max-get-range-requests + [max_get_range_requests: | default = 3] + + # TTL for caching object attributes for parquet labels file. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.attributes-ttl + [attributes_ttl: | default = 168h] + + # TTL for caching individual subranges. + # CLI flag: -blocks-storage.bucket-store.parquet-labels-cache.subrange-ttl + [subrange_ttl: | default = 24h] + # Maximum number of entries in the regex matchers cache. 0 to disable. # CLI flag: -blocks-storage.bucket-store.matchers-cache-max-items [matchers_cache_max_items: | default = 0] diff --git a/go.mod b/go.mod index 043a61da101..2bcb8301dd6 100644 --- a/go.mod +++ b/go.mod @@ -84,7 +84,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20250716185251-4cfa597e936c + github.com/prometheus-community/parquet-common v0.0.0-20250801093248-94ad2ac56fa4 github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.4 diff --git a/go.sum b/go.sum index 461f204f903..5dbb9bfaa55 100644 --- a/go.sum +++ b/go.sum @@ -822,8 +822,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250716185251-4cfa597e936c h1:yDtT3c2klcWJj6A0osq72qM8rd1ohtl/J3rHD3FHuNw= -github.com/prometheus-community/parquet-common v0.0.0-20250716185251-4cfa597e936c/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= +github.com/prometheus-community/parquet-common v0.0.0-20250801093248-94ad2ac56fa4 h1:xNWjbJzXJ+/YhTFyFIh6qgZj8sV2DufhcR1CSW+oswE= +github.com/prometheus-community/parquet-common v0.0.0-20250801093248-94ad2ac56fa4/go.mod h1:MbAv/yCv9GORLj0XvXgRF913R9Jc04+BvVq4VJpPCi0= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 7b92fe0d887..cc1be08b13b 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -285,7 +285,7 @@ type BucketStoreConfig struct { IndexCache IndexCacheConfig `yaml:"index_cache"` ChunksCache ChunksCacheConfig `yaml:"chunks_cache"` MetadataCache MetadataCacheConfig `yaml:"metadata_cache"` - ParquetLabelsCache ParquetLabelsCacheConfig `yaml:"parquet_labels_cache" doc:"hidden"` + ParquetLabelsCache ParquetLabelsCacheConfig `yaml:"parquet_labels_cache"` MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"` IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"` IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"` diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go index ed3c06ac778..778f29d5c6b 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go @@ -58,17 +58,19 @@ type Convertible interface { } type convertOpts struct { - numRowGroups int - rowGroupSize int - colDuration time.Duration - name string - sortedLabels []string - bloomfilterLabels []string - pageBufferSize int - writeBufferSize int - columnPageBuffers parquet.BufferPool - concurrency int - maxSamplesPerChunk int + numRowGroups int + rowGroupSize int + colDuration time.Duration + name string + sortedLabels []string + bloomfilterLabels []string + pageBufferSize int + writeBufferSize int + columnPageBuffers parquet.BufferPool + concurrency int + maxSamplesPerChunk int + labelsCompressionOpts []schema.CompressionOpts + chunksCompressionOpts []schema.CompressionOpts } func (cfg convertOpts) buildBloomfilterColumns() []parquet.BloomFilterColumn { @@ -92,54 +94,145 @@ func (cfg convertOpts) buildSortingColumns() []parquet.SortingColumn { type ConvertOption func(*convertOpts) +// WithSortBy configures the labels used for sorting time series data in the output Parquet files. +// The specified labels determine the sort order of rows within row groups, which can significantly +// improve query performance for filters on these labels. By default, data is sorted by __name__. +// +// Parameters: +// - labels: Label names to sort by, in order of precedence +// +// Example: +// +// WithSortBy("__name__", "job", "instance") func WithSortBy(labels ...string) ConvertOption { return func(opts *convertOpts) { opts.sortedLabels = labels } } +// WithColDuration sets the time duration for each column in the Parquet schema. +// This determines how time series data is partitioned across columns, affecting +// both storage efficiency and query performance. Shorter durations create more +// columns but allow for more precise time-based filtering. +// +// Parameters: +// - d: Duration for each time column (default: 8 hours) +// +// Example: +// +// WithColDuration(4 * time.Hour) // 4-hour columns func WithColDuration(d time.Duration) ConvertOption { return func(opts *convertOpts) { opts.colDuration = d } } +// WithWriteBufferSize configures the buffer size used for writing Parquet data. +// Larger buffers can improve write performance by reducing I/O operations, +// but consume more memory during the conversion process. +// +// Parameters: +// - s: Buffer size in bytes (default: parquet.DefaultWriteBufferSize) +// +// Example: +// +// WithWriteBufferSize(64 * 1024) // 64KB buffer func WithWriteBufferSize(s int) ConvertOption { return func(opts *convertOpts) { opts.writeBufferSize = s } } +// WithPageBufferSize sets the buffer size for Parquet page operations. +// This affects how data is buffered when reading and writing individual pages +// within the Parquet file format. Larger page buffers can improve performance +// for large datasets but increase memory usage. +// +// Parameters: +// - s: Page buffer size in bytes (default: parquet.DefaultPageBufferSize) +// +// Example: +// +// WithPageBufferSize(128 * 1024) // 128KB page buffer func WithPageBufferSize(s int) ConvertOption { return func(opts *convertOpts) { opts.pageBufferSize = s } } +// WithName sets the base name used for generated Parquet files. +// This name is used as a prefix for the output files in the object store bucket. +// +// Parameters: +// - name: Base name for output files (default: "block") +// +// Example: +// +// WithName("prometheus-data") // Files will be named prometheus-data-* func WithName(name string) ConvertOption { return func(opts *convertOpts) { opts.name = name } } +// WithNumRowGroups limits the maximum number of row groups to create during conversion. +// Row groups are the primary unit of parallelization in Parquet files. More row groups +// allow for better parallelization but may increase metadata overhead. +// +// Parameters: +// - n: Maximum number of row groups (default: math.MaxInt32, effectively unlimited) +// +// Example: +// +// WithNumRowGroups(100) // Limit to 100 row groups func WithNumRowGroups(n int) ConvertOption { return func(opts *convertOpts) { opts.numRowGroups = n } } +// WithRowGroupSize sets the target number of rows per row group in the output Parquet files. +// Larger row groups improve compression and reduce metadata overhead, but require more +// memory during processing and may reduce parallelization opportunities. +// +// Parameters: +// - size: Target number of rows per row group (default: 1,000,000) +// +// Example: +// +// WithRowGroupSize(500000) // 500K rows per row group func WithRowGroupSize(size int) ConvertOption { return func(opts *convertOpts) { opts.rowGroupSize = size } } +// WithConcurrency sets the number of concurrent goroutines used during conversion. +// Higher concurrency can improve performance on multi-core systems but increases +// memory usage. The optimal value depends on available CPU cores and memory. +// +// Parameters: +// - concurrency: Number of concurrent workers (default: runtime.GOMAXPROCS(0)) +// +// Example: +// +// WithConcurrency(8) // Use 8 concurrent workers func WithConcurrency(concurrency int) ConvertOption { return func(opts *convertOpts) { opts.concurrency = concurrency } } +// WithMaxSamplesPerChunk sets the maximum number of samples to include in each chunk +// during the encoding process. This affects how time series data is chunked and can +// impact both compression efficiency and query performance. +// +// Parameters: +// - samplesPerChunk: Maximum samples per chunk (default: tsdb.DefaultSamplesPerChunk) +// +// Example: +// +// WithMaxSamplesPerChunk(240) // Limit chunks to 240 samples each func WithMaxSamplesPerChunk(samplesPerChunk int) ConvertOption { return func(opts *convertOpts) { opts.maxSamplesPerChunk = samplesPerChunk @@ -152,6 +245,66 @@ func WithColumnPageBuffers(buffers parquet.BufferPool) ConvertOption { } } +// WithCompression adds compression options to the conversion process. +// These options will be applied to both labels and chunks projections. +// For separate configuration, use WithLabelsCompression and WithChunksCompression. +func WithCompression(compressionOpts ...schema.CompressionOpts) ConvertOption { + return func(opts *convertOpts) { + opts.labelsCompressionOpts = compressionOpts + opts.chunksCompressionOpts = compressionOpts + } +} + +// WithLabelsCompression configures compression options specifically for the labels projection. +// This allows fine-grained control over how label data is compressed in the output Parquet files, +// which can be optimized differently from chunk data due to different access patterns and data characteristics. +// +// Parameters: +// - compressionOpts: optional compression options to apply to the projection schema +// +// Example: +// +// WithLabelsCompression(schema.WithSnappyCompression()) +func WithLabelsCompression(compressionOpts ...schema.CompressionOpts) ConvertOption { + return func(opts *convertOpts) { + opts.labelsCompressionOpts = compressionOpts + } +} + +// WithChunksCompression configures compression options specifically for the chunks projection. +// This allows optimization of compression settings for time series chunk data, which typically +// has different compression characteristics compared to label metadata. +// +// Parameters: +// - compressionOpts: optional compression options to apply to the projection schema +// +// Example: +// +// WithChunksCompression(schema.WithGzipCompression()) +func WithChunksCompression(compressionOpts ...schema.CompressionOpts) ConvertOption { + return func(opts *convertOpts) { + opts.chunksCompressionOpts = compressionOpts + } +} + +// ConvertTSDBBlock converts one or more TSDB blocks to Parquet format and writes them to an object store bucket. +// It processes time series data within the specified time range and outputs sharded Parquet files. +// +// Parameters: +// - ctx: Context for cancellation and timeout control +// - bkt: Object store bucket where the converted Parquet files will be written +// - mint: Minimum timestamp (inclusive) for the time range to convert +// - maxt: Maximum timestamp (exclusive) for the time range to convert +// - blks: Slice of Convertible blocks (typically TSDB blocks) to be converted +// - opts: Optional configuration options to customize the conversion process +// +// Returns: +// - int: The current shard number after conversion +// - error: Any error that occurred during the conversion process +// +// The function creates a row reader from the TSDB blocks, generates both labels and chunks +// projections with optional compression, and writes the data to the bucket using a sharded +// writer approach for better performance and parallelization. func ConvertTSDBBlock( ctx context.Context, bkt objstore.Bucket, @@ -171,11 +324,11 @@ func ConvertTSDBBlock( } defer func() { _ = rr.Close() }() - labelsProjection, err := rr.Schema().LabelsProjection() + labelsProjection, err := rr.Schema().LabelsProjection(cfg.labelsCompressionOpts...) if err != nil { return 0, errors.Wrap(err, "error getting labels projection from tsdb schema") } - chunksProjection, err := rr.Schema().ChunksProjection() + chunksProjection, err := rr.Schema().ChunksProjection(cfg.chunksCompressionOpts...) if err != nil { return 0, errors.Wrap(err, "error getting chunks projection from tsdb schema") } diff --git a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go index a674c96efc4..e5518eaf6c3 100644 --- a/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go +++ b/vendor/github.com/prometheus-community/parquet-common/queryable/parquet_queryable.go @@ -164,6 +164,9 @@ func (p parquetQuerier) LabelValues(ctx context.Context, name string, hints *pro if err != nil { return nil, nil, err } + span.SetAttributes( + attribute.Int("shards_count", len(shards)), + ) limit := int64(0) @@ -212,6 +215,9 @@ func (p parquetQuerier) LabelNames(ctx context.Context, hints *prom_storage.Labe if err != nil { return nil, nil, err } + span.SetAttributes( + attribute.Int("shards_count", len(shards)), + ) limit := int64(0) @@ -281,6 +287,11 @@ func (p parquetQuerier) Select(ctx context.Context, sorted bool, sp *prom_storag minT, maxT = sp.Start, sp.End } skipChunks := sp != nil && sp.Func == "series" + span.SetAttributes( + attribute.Int("shards_count", len(shards)), + attribute.Bool("skip_chunks", skipChunks), + ) + errGroup, ctx := errgroup.WithContext(ctx) errGroup.SetLimit(p.opts.concurrency) @@ -296,11 +307,6 @@ func (p parquetQuerier) Select(ctx context.Context, sorted bool, sp *prom_storag return prom_storage.ErrSeriesSet(err) } - span.SetAttributes( - attribute.Int("shards_count", len(shards)), - attribute.Bool("skip_chunks", skipChunks), - ) - ss := convert.NewMergeChunkSeriesSet(seriesSet, labels.Compare, prom_storage.NewConcatenatingChunkSeriesMerger()) return convert.NewSeriesSetFromChunkSeriesSet(ss, skipChunks) @@ -357,7 +363,7 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, sp *prom_storage for rgi := range b.shard.LabelsFile().RowGroups() { errGroup.Go(func() error { - cs, err := search.MatchersToConstraint(matchers...) + cs, err := search.MatchersToConstraints(matchers...) if err != nil { return err } @@ -411,7 +417,7 @@ func (b queryableShard) LabelNames(ctx context.Context, limit int64, matchers [] for rgi := range b.shard.LabelsFile().RowGroups() { errGroup.Go(func() error { - cs, err := search.MatchersToConstraint(matchers...) + cs, err := search.MatchersToConstraints(matchers...) if err != nil { return err } @@ -451,7 +457,7 @@ func (b queryableShard) LabelValues(ctx context.Context, name string, limit int6 for rgi := range b.shard.LabelsFile().RowGroups() { errGroup.Go(func() error { - cs, err := search.MatchersToConstraint(matchers...) + cs, err := search.MatchersToConstraints(matchers...) if err != nil { return err } diff --git a/vendor/github.com/prometheus-community/parquet-common/schema/encoder.go b/vendor/github.com/prometheus-community/parquet-common/schema/encoder.go index 5c98f724146..27a58b9a1ec 100644 --- a/vendor/github.com/prometheus-community/parquet-common/schema/encoder.go +++ b/vendor/github.com/prometheus-community/parquet-common/schema/encoder.go @@ -39,6 +39,19 @@ func NewPrometheusParquetChunksEncoder(schema *TSDBSchema, samplesPerChunk int) } } +// Encode re-encodes Prometheus chunks from the given iterator into a format optimized for Parquet storage. +// It takes chunks (typically for one day) and redistributes the samples across data columns based on the schema's +// time-based partitioning. The function handles three chunk encodings: XOR (float), Histogram, and FloatHistogram. +// +// The encoding process: +// 1. Sorts input chunks by minimum timestamp +// 2. Creates new chunks for each data column and encoding type +// 3. Redistributes samples from input chunks to appropriate data column chunks based on timestamp +// 4. Cuts new chunks when samplesPerChunk limit is reached +// 5. Serializes the re-encoded chunks into binary format with metadata +// +// Returns a slice of byte slices, where each element corresponds to a data column containing +// serialized chunk data with encoding type, min/max timestamps, size, and chunk bytes. func (e *PrometheusParquetChunksEncoder) Encode(it chunks.Iterator) ([][]byte, error) { // NOTE: usually 'it' should hold chunks for one day. Chunks are usually length 2h so we should get 12 of them. chks := make([]chunks.Meta, 0, 12) @@ -241,6 +254,23 @@ func NewPrometheusParquetChunksDecoder(pool chunkenc.Pool) *PrometheusParquetChu } } +// Decode deserializes chunk data that was previously encoded by PrometheusParquetChunksEncoder. +// It takes binary data containing serialized chunks and reconstructs them as chunks.Meta objects. +// The function filters chunks based on the provided time range [mint, maxt]. +// +// The binary format contains multiple chunks, each with: +// - Encoding type (varint) +// - Min timestamp (varint) +// - Max timestamp (varint) +// - Chunk size (varint) +// - Chunk bytes +// +// Parameters: +// - data: Binary data containing serialized chunks +// - mint: Minimum timestamp filter (inclusive) +// - maxt: Maximum timestamp filter (inclusive) +// +// Returns chunks that overlap with the time range [mint, maxt], or an error if deserialization fails. func (e *PrometheusParquetChunksDecoder) Decode(data []byte, mint, maxt int64) ([]chunks.Meta, error) { // We usually have only 1 chunk per column as the chunks got re-encoded. Lets create a slice with capacity of 5 // just in case of re-encoding. diff --git a/vendor/github.com/prometheus-community/parquet-common/schema/schema.go b/vendor/github.com/prometheus-community/parquet-common/schema/schema.go index 6c86484190d..c3a27ccc15f 100644 --- a/vendor/github.com/prometheus-community/parquet-common/schema/schema.go +++ b/vendor/github.com/prometheus-community/parquet-common/schema/schema.go @@ -18,6 +18,8 @@ import ( "strings" "github.com/parquet-go/parquet-go" + "github.com/parquet-go/parquet-go/compress" + "github.com/parquet-go/parquet-go/compress/snappy" "github.com/parquet-go/parquet-go/compress/zstd" "github.com/parquet-go/parquet-go/format" ) @@ -32,6 +34,45 @@ const ( MaxTMd = "maxT" ) +type CompressionCodec int + +const ( + CompressionZstd CompressionCodec = iota + CompressionSnappy +) + +type compressionOpts struct { + enabled bool + codec CompressionCodec + level zstd.Level +} + +var DefaultCompressionOpts = compressionOpts{ + enabled: true, + codec: CompressionZstd, + level: zstd.SpeedBetterCompression, +} + +type CompressionOpts func(*compressionOpts) + +func WithCompressionEnabled(enabled bool) CompressionOpts { + return func(opts *compressionOpts) { + opts.enabled = enabled + } +} + +func WithCompressionCodec(codec CompressionCodec) CompressionOpts { + return func(opts *compressionOpts) { + opts.codec = codec + } +} + +func WithCompressionLevel(level zstd.Level) CompressionOpts { + return func(opts *compressionOpts) { + opts.level = level + } +} + func LabelToColumn(lbl string) string { return fmt.Sprintf("%s%s", LabelColumnPrefix, lbl) } @@ -59,12 +100,38 @@ func ChunksPfileNameForShard(name string, shard int) string { return fmt.Sprintf("%s/%d.%s", name, shard, "chunks.parquet") } -func WithCompression(s *parquet.Schema) *parquet.Schema { +// WithCompression applies compression configuration to a parquet schema. +// +// This function takes a parquet schema and applies compression settings based on the provided options. +// By default, it enables zstd compression with SpeedBetterCompression level, maintaining backward compatibility. +// The compression can be disabled, or the codec and level can be customized using the configuration options. +func WithCompression(s *parquet.Schema, opts ...CompressionOpts) *parquet.Schema { + cfg := DefaultCompressionOpts + + for _, opt := range opts { + opt(&cfg) + } + + if !cfg.enabled { + return s + } + g := make(parquet.Group) for _, c := range s.Columns() { lc, _ := s.Lookup(c...) - g[lc.Path[0]] = parquet.Compressed(lc.Node, &zstd.Codec{Level: zstd.SpeedBetterCompression}) + + var codec compress.Codec + switch cfg.codec { + case CompressionZstd: + codec = &zstd.Codec{Level: cfg.level} + case CompressionSnappy: + codec = &snappy.Codec{} + default: + codec = &zstd.Codec{Level: zstd.SpeedBetterCompression} + } + + g[lc.Path[0]] = parquet.Compressed(lc.Node, codec) } return parquet.NewSchema("compressed", g) diff --git a/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go b/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go index 4b6b86fe77f..4d1df102eff 100644 --- a/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go +++ b/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go @@ -29,6 +29,15 @@ type Builder struct { mint, maxt int64 } +// NewBuilder creates a new Builder instance for constructing TSDB schemas. +// It initializes the builder with time range boundaries and data column duration. +// +// Parameters: +// - mint: minimum timestamp (inclusive) for the time range +// - maxt: maximum timestamp (inclusive) for the time range +// - colDuration: duration in milliseconds for each data column +// +// Returns a pointer to a new Builder instance with initialized metadata. func NewBuilder(mint, maxt, colDuration int64) *Builder { b := &Builder{ g: make(parquet.Group), @@ -45,6 +54,10 @@ func NewBuilder(mint, maxt, colDuration int64) *Builder { return b } +// FromLabelsFile creates a TSDBSchema from an existing parquet labels file. +// It extracts metadata (mint, maxt, dataColDurationMs) from the file's key-value metadata +// and reconstructs the schema by examining the file's columns to identify label columns. +// Returns an error if the metadata cannot be parsed or the schema cannot be built. func FromLabelsFile(lf *parquet.File) (*TSDBSchema, error) { md := MetadataToMap(lf.Metadata().KeyValueMetadata) mint, err := strconv.ParseInt(md[MinTMd], 0, 64) @@ -142,7 +155,17 @@ func (s *TSDBSchema) DataColumIdx(t int64) int { return int((t - s.MinTs) / s.DataColDurationMs) } -func (s *TSDBSchema) LabelsProjection() (*TSDBProjection, error) { +// LabelsProjection creates a TSDBProjection containing only label columns and column indexes. +// This projection is used for creating parquet files that contain only the label metadata +// without the actual time series data columns. The resulting projection includes: +// - ColIndexes column for row indexing +// - All label columns extracted from the original schema +// +// Parameters: +// - opts: optional compression options to apply to the projection schema +// +// Returns a TSDBProjection configured for labels files, or an error if required columns are missing. +func (s *TSDBSchema) LabelsProjection(opts ...CompressionOpts) (*TSDBProjection, error) { g := make(parquet.Group) lc, ok := s.Schema.Lookup(ColIndexes) @@ -165,12 +188,22 @@ func (s *TSDBSchema) LabelsProjection() (*TSDBProjection, error) { FilenameFunc: func(name string, shard int) string { return LabelsPfileNameForShard(name, shard) }, - Schema: WithCompression(parquet.NewSchema("labels-projection", g)), + Schema: WithCompression(parquet.NewSchema("labels-projection", g), opts...), ExtraOptions: []parquet.WriterOption{parquet.SkipPageBounds(ColIndexes)}, }, nil } -func (s *TSDBSchema) ChunksProjection() (*TSDBProjection, error) { +// ChunksProjection creates a TSDBProjection containing only data columns for time series chunks. +// This projection is used for creating parquet files that contain the actual time series data +// without the label metadata. The resulting projection includes: +// - All data columns (columns that store time series chunk data) +// - Page bounds are skipped for all data columns to optimize storage +// +// Parameters: +// - opts: optional compression options to apply to the projection schema +// +// Returns a TSDBProjection configured for chunks files, or an error if required columns are missing. +func (s *TSDBSchema) ChunksProjection(opts ...CompressionOpts) (*TSDBProjection, error) { g := make(parquet.Group) writeOptions := make([]parquet.WriterOption, 0, len(s.DataColsIndexes)) @@ -190,7 +223,7 @@ func (s *TSDBSchema) ChunksProjection() (*TSDBProjection, error) { FilenameFunc: func(name string, shard int) string { return ChunksPfileNameForShard(name, shard) }, - Schema: WithCompression(parquet.NewSchema("chunk-projection", g)), + Schema: WithCompression(parquet.NewSchema("chunk-projection", g), opts...), ExtraOptions: writeOptions, }, nil } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go index f77bcf4aa87..159c639c6f6 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go @@ -35,12 +35,16 @@ type Constraint interface { // filter returns a set of non-overlapping increasing row indexes that may satisfy the constraint. filter(ctx context.Context, rgIdx int, primary bool, rr []RowRange) ([]RowRange, error) // init initializes the constraint with respect to the file schema and projections. - init(f *storage.ParquetFile) error + init(f storage.ParquetFileView) error // path is the path for the column that is constrained path() string } -func MatchersToConstraint(matchers ...*labels.Matcher) ([]Constraint, error) { +// MatchersToConstraints converts Prometheus label matchers into parquet search constraints. +// It supports MatchEqual, MatchNotEqual, MatchRegexp, and MatchNotRegexp matcher types. +// Returns a slice of constraints that can be used to filter parquet data based on the +// provided label matchers, or an error if an unsupported matcher type is encountered. +func MatchersToConstraints(matchers ...*labels.Matcher) ([]Constraint, error) { r := make([]Constraint, 0, len(matchers)) for _, matcher := range matchers { switch matcher.Type { @@ -67,7 +71,17 @@ func MatchersToConstraint(matchers ...*labels.Matcher) ([]Constraint, error) { return r, nil } -func Initialize(f *storage.ParquetFile, cs ...Constraint) error { +// Initialize prepares the given constraints for use with the specified parquet file. +// It calls the init method on each constraint to validate compatibility with the +// file schema and set up any necessary internal state. +// +// Parameters: +// - f: The ParquetFile that the constraints will be applied to +// - cs: Variable number of constraints to initialize +// +// Returns an error if any constraint fails to initialize, wrapping the original +// error with context about which constraint failed. +func Initialize(f storage.ParquetFileView, cs ...Constraint) error { for i := range cs { if err := cs[i].init(f); err != nil { return fmt.Errorf("unable to initialize constraint %d: %w", i, err) @@ -76,23 +90,56 @@ func Initialize(f *storage.ParquetFile, cs ...Constraint) error { return nil } +// sortConstraintsBySortingColumns reorders constraints to prioritize those that match sorting columns. +// Constraints matching sorting columns are moved to the front, ordered by the sorting column priority. +// Other constraints maintain their original relative order. +func sortConstraintsBySortingColumns(cs []Constraint, sc []parquet.SortingColumn) { + if len(sc) == 0 { + return // No sorting columns, nothing to do + } + + sortingPaths := make(map[string]int, len(sc)) + for i, col := range sc { + sortingPaths[col.Path()[0]] = i + } + + // Sort constraints: sorting column constraints first (by their order in sc), then others + slices.SortStableFunc(cs, func(a, b Constraint) int { + aIdx, aIsSorting := sortingPaths[a.path()] + bIdx, bIsSorting := sortingPaths[b.path()] + + if aIsSorting && bIsSorting { + return aIdx - bIdx // Sort by sorting column order + } + if aIsSorting { + return -1 // a comes first + } + if bIsSorting { + return 1 // b comes first + } + return 0 // preserve original order for non-sorting constraints + }) +} + +// Filter applies the given constraints to a parquet row group and returns the row ranges +// that satisfy all constraints. It optimizes performance by prioritizing constraints on +// sorting columns, which are cheaper to evaluate. +// +// Parameters: +// - ctx: Context for cancellation and timeouts +// - s: ParquetShard containing the parquet file to filter +// - rgIdx: Index of the row group to filter within the parquet file +// - cs: Variable number of constraints to apply for filtering +// +// Returns a slice of RowRange that represent the rows satisfying all constraints, +// or an error if any constraint fails to filter. func Filter(ctx context.Context, s storage.ParquetShard, rgIdx int, cs ...Constraint) ([]RowRange, error) { rg := s.LabelsFile().RowGroups()[rgIdx] // Constraints for sorting columns are cheaper to evaluate, so we sort them first. sc := rg.SortingColumns() - var n int - for i := range sc { - if n == len(cs) { - break - } - for j := range cs { - if cs[j].path() == sc[i].Path()[0] { - cs[n], cs[j] = cs[j], cs[n] - n++ - } - } - } + sortConstraintsBySortingColumns(cs, sc) + var err error rr := []RowRange{{From: int64(0), Count: rg.NumRows()}} for i := range cs { @@ -168,7 +215,7 @@ type equalConstraint struct { pth string val parquet.Value - f *storage.ParquetFile + f storage.ParquetFileView comp func(l, r parquet.Value) int } @@ -272,7 +319,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, // If the gap between the first page and the dic page is less than PagePartitioningMaxGapSize, // we include the dic to be read in the single read - if int(minOffset-(dictOff+dictSz)) < ec.f.Cfg.PagePartitioningMaxGapSize { + if int(minOffset-(dictOff+dictSz)) < ec.f.PagePartitioningMaxGapSize() { minOffset = dictOff } @@ -338,7 +385,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, return intersectRowRanges(simplify(res), rr), nil } -func (ec *equalConstraint) init(f *storage.ParquetFile) error { +func (ec *equalConstraint) init(f storage.ParquetFileView) error { c, ok := f.Schema().Lookup(ec.path()) ec.f = f if !ok { @@ -364,7 +411,7 @@ func (ec *equalConstraint) matches(v parquet.Value) bool { } func (ec *equalConstraint) skipByBloomfilter(cc parquet.ColumnChunk) (bool, error) { - if ec.f.Cfg.SkipBloomFilters { + if ec.f.SkipBloomFilters() { return false, nil } @@ -386,7 +433,7 @@ func Regex(path string, r *labels.FastRegexMatcher) Constraint { type regexConstraint struct { pth string cache map[parquet.Value]bool - f *storage.ParquetFile + f storage.ParquetFileView r *labels.FastRegexMatcher } @@ -494,7 +541,7 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, return intersectRowRanges(simplify(res), rr), nil } -func (rc *regexConstraint) init(f *storage.ParquetFile) error { +func (rc *regexConstraint) init(f storage.ParquetFileView) error { c, ok := f.Schema().Lookup(rc.path()) rc.f = f if !ok { @@ -541,7 +588,7 @@ func (nc *notConstraint) filter(ctx context.Context, rgIdx int, primary bool, rr return complementRowRanges(base, rr), nil } -func (nc *notConstraint) init(f *storage.ParquetFile) error { +func (nc *notConstraint) init(f storage.ParquetFileView) error { return nc.c.init(f) } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index 7de75834692..8c4a5a463bd 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -117,7 +117,7 @@ func NewMaterializer(s *schema.TSDBSchema, b: block, colIdx: colIdx.ColumnIndex, concurrency: concurrency, - partitioner: util.NewGapBasedPartitioner(block.ChunksFile().Cfg.PagePartitioningMaxGapSize), + partitioner: util.NewGapBasedPartitioner(block.ChunksFile().PagePartitioningMaxGapSize()), dataColToIndex: dataColToIndex, rowCountQuota: rowCountQuota, chunkBytesQuota: chunkBytesQuota, @@ -247,6 +247,14 @@ func (m *Materializer) filterSeries(ctx context.Context, hints *prom_storage.Sel return results, filteredRR } +// MaterializeAllLabelNames extracts and returns all label names from the schema +// of the labels file. It iterates through all columns in the schema and extracts +// valid label names, filtering out any columns that are not label columns. +// +// This method is useful for discovering all possible label names that exist in +// the parquet file without needing to read any actual data rows. +// +// Returns a slice of all label names found in the schema. func (m *Materializer) MaterializeAllLabelNames() []string { r := make([]string, 0, len(m.b.LabelsFile().Schema().Columns())) for _, c := range m.b.LabelsFile().Schema().Columns() { @@ -260,6 +268,16 @@ func (m *Materializer) MaterializeAllLabelNames() []string { return r } +// MaterializeLabelNames extracts and returns all unique label names from the specified row ranges +// within a given row group. It reads the column indexes from the labels file and decodes them +// to determine which label columns are present in the data. +// +// Parameters: +// - ctx: Context for cancellation and tracing +// - rgi: Row group index to read from +// - rr: Row ranges to process within the row group +// +// Returns a slice of label names found in the specified ranges, or an error if materialization fails. func (m *Materializer) MaterializeLabelNames(ctx context.Context, rgi int, rr []RowRange) ([]string, error) { labelsRg := m.b.LabelsFile().RowGroups()[rgi] cc := labelsRg.ColumnChunks()[m.colIdx] @@ -295,6 +313,18 @@ func (m *Materializer) MaterializeLabelNames(ctx context.Context, rgi int, rr [] return lbls, nil } +// MaterializeLabelValues extracts and returns all unique values for a specific label name +// from the specified row ranges within a given row group. It reads the label column data +// and deduplicates the values to provide a unique set of label values. +// +// Parameters: +// - ctx: Context for cancellation and tracing +// - name: The label name to extract values for +// - rgi: Row group index to read from +// - rr: Row ranges to process within the row group +// +// Returns a slice of unique label values for the specified label name, or an error if +// materialization fails. If the label name doesn't exist in the schema, returns an empty slice. func (m *Materializer) MaterializeLabelValues(ctx context.Context, name string, rgi int, rr []RowRange) ([]string, error) { labelsRg := m.b.LabelsFile().RowGroups()[rgi] cIdx, ok := m.b.LabelsFile().Schema().Lookup(schema.LabelToColumn(name)) @@ -319,6 +349,20 @@ func (m *Materializer) MaterializeLabelValues(ctx context.Context, name string, return r, nil } +// MaterializeAllLabelValues extracts all unique values for a specific label name +// from the entire row group using the dictionary page optimization. This method +// is more efficient than MaterializeLabelValues when you need all values for a +// label across the entire row group, as it reads directly from the dictionary +// page instead of scanning individual data pages. +// +// Parameters: +// - ctx: Context for cancellation and tracing +// - name: The label name to extract all values for +// - rgi: Row group index to read from +// +// Returns a slice of all unique label values for the specified label name from +// the dictionary, or an error if materialization fails. If the label name doesn't +// exist in the schema, returns an empty slice. func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name string, rgi int) ([]string, error) { labelsRg := m.b.LabelsFile().RowGroups()[rgi] cIdx, ok := m.b.LabelsFile().Schema().Lookup(schema.LabelToColumn(name)) @@ -473,7 +517,7 @@ func (m *Materializer) materializeChunks(ctx context.Context, rgi int, mint, max return r, nil } -func (m *Materializer) materializeColumn(ctx context.Context, file *storage.ParquetFile, rgi int, cc parquet.ColumnChunk, rr []RowRange, chunkColumn bool) ([]parquet.Value, error) { +func (m *Materializer) materializeColumn(ctx context.Context, file storage.ParquetFileView, rgi int, cc parquet.ColumnChunk, rr []RowRange, chunkColumn bool) ([]parquet.Value, error) { if len(rr) == 0 { return nil, nil } @@ -534,7 +578,7 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq maxOffset := uint64(p.off + p.csz) // if dictOff == 0, it means that the collum is not dictionary encoded - if dictOff > 0 && int(minOffset-(dictOff+dictSz)) < file.Cfg.PagePartitioningMaxGapSize { + if dictOff > 0 && int(minOffset-(dictOff+dictSz)) < file.PagePartitioningMaxGapSize() { minOffset = dictOff } @@ -562,6 +606,7 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq return errors.Wrap(err, "could not read page") } vi.Reset(page) + for vi.Next() { if currentRow == next { rMutex.Lock() @@ -577,6 +622,11 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq remainingRr = remainingRr[1:] } } + + if vi.CanSkip() && remaining > 0 { + currentRow += vi.Skip(next - currentRow - 1) + } + currentRow++ } parquet.Release(page) @@ -701,6 +751,16 @@ func (vi *valuesIterator) Reset(p parquet.Page) { vi.current = -1 } +func (vi *valuesIterator) CanSkip() bool { + return vi.vr == nil +} + +func (vi *valuesIterator) Skip(n int64) int64 { + r := min(n, vi.p.NumRows()-int64(vi.current)-1) + vi.current += int(r) + return r +} + func (vi *valuesIterator) Next() bool { if vi.err != nil { return false diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go index 4eba9de8c2a..98e5e100c43 100644 --- a/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go +++ b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go @@ -26,6 +26,19 @@ import ( "github.com/prometheus-community/parquet-common/schema" ) +type ParquetFileConfigView interface { + SkipMagicBytes() bool + SkipPageIndex() bool + SkipBloomFilters() bool + OptimisticRead() bool + ReadBufferSize() int + ReadMode() parquet.ReadMode + + // Extended options beyond parquet.FileConfig + + PagePartitioningMaxGapSize() int +} + var DefaultFileOptions = ExtendedFileConfig{ FileConfig: &parquet.FileConfig{ SkipPageIndex: parquet.DefaultSkipPageIndex, @@ -35,18 +48,56 @@ var DefaultFileOptions = ExtendedFileConfig{ ReadBufferSize: 4096, OptimisticRead: true, }, - PagePartitioningMaxGapSize: 10 * 1024, + pagePartitioningMaxGapSize: 10 * 1024, } type ExtendedFileConfig struct { *parquet.FileConfig - PagePartitioningMaxGapSize int + pagePartitioningMaxGapSize int +} + +func (c ExtendedFileConfig) SkipMagicBytes() bool { + return c.FileConfig.SkipMagicBytes +} + +func (c ExtendedFileConfig) SkipPageIndex() bool { + return c.FileConfig.SkipPageIndex +} + +func (c ExtendedFileConfig) SkipBloomFilters() bool { + return c.FileConfig.SkipBloomFilters +} + +func (c ExtendedFileConfig) OptimisticRead() bool { + return c.FileConfig.OptimisticRead +} + +func (c ExtendedFileConfig) ReadBufferSize() int { + return c.FileConfig.ReadBufferSize +} + +func (c ExtendedFileConfig) ReadMode() parquet.ReadMode { + return c.FileConfig.ReadMode +} + +func (c ExtendedFileConfig) PagePartitioningMaxGapSize() int { + return c.pagePartitioningMaxGapSize +} + +type ParquetFileView interface { + parquet.FileView + GetPages(ctx context.Context, cc parquet.ColumnChunk, minOffset, maxOffset int64) (parquet.Pages, error) + DictionaryPageBounds(rgIdx, colIdx int) (uint64, uint64) + + ReadAtWithContextCloser + + ParquetFileConfigView } type ParquetFile struct { *parquet.File ReadAtWithContextCloser - Cfg ExtendedFileConfig + ParquetFileConfigView } type FileOption func(*ExtendedFileConfig) @@ -62,15 +113,15 @@ func WithFileOptions(options ...parquet.FileOption) FileOption { // WithPageMaxGapSize set the max gap size between pages that should be downloaded together in a single read call func WithPageMaxGapSize(pagePartitioningMaxGapSize int) FileOption { return func(opts *ExtendedFileConfig) { - opts.PagePartitioningMaxGapSize = pagePartitioningMaxGapSize + opts.pagePartitioningMaxGapSize = pagePartitioningMaxGapSize } } -func (f *ParquetFile) GetPages(ctx context.Context, cc parquet.ColumnChunk, minOffset, maxOffset int64) (*parquet.FilePages, error) { +func (f *ParquetFile) GetPages(ctx context.Context, cc parquet.ColumnChunk, minOffset, maxOffset int64) (parquet.Pages, error) { colChunk := cc.(*parquet.FileColumnChunk) reader := f.WithContext(ctx) - if f.Cfg.OptimisticRead { + if f.OptimisticRead() { reader = NewOptimisticReaderAt(reader, minOffset, maxOffset) } @@ -99,7 +150,7 @@ func Open(ctx context.Context, r ReadAtWithContextCloser, size int64, opts ...Fi return &ParquetFile{ File: file, ReadAtWithContextCloser: r, - Cfg: cfg, + ParquetFileConfigView: cfg, }, nil } @@ -134,8 +185,8 @@ func OpenFromFile(ctx context.Context, path string, opts ...FileOption) (*Parque } type ParquetShard interface { - LabelsFile() *ParquetFile - ChunksFile() *ParquetFile + LabelsFile() ParquetFileView + ChunksFile() ParquetFileView TSDBSchema() (*schema.TSDBSchema, error) } @@ -214,11 +265,11 @@ func NewParquetShardOpener( }, nil } -func (s *ParquetShardOpener) LabelsFile() *ParquetFile { +func (s *ParquetShardOpener) LabelsFile() ParquetFileView { return s.labelsFile } -func (s *ParquetShardOpener) ChunksFile() *ParquetFile { +func (s *ParquetShardOpener) ChunksFile() ParquetFileView { return s.chunksFile } diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go b/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go index 9f7e7fadf2a..4738f607349 100644 --- a/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go +++ b/vendor/github.com/prometheus-community/parquet-common/storage/read_at.go @@ -21,12 +21,18 @@ import ( "github.com/thanos-io/objstore" ) +type SizeReaderAt interface { + io.ReaderAt + Size() int64 +} + type ReadAtWithContextCloser interface { + WithContext(ctx context.Context) SizeReaderAt io.Closer - WithContext(ctx context.Context) io.ReaderAt } type fileReadAt struct { + ctx context.Context *os.File } @@ -37,11 +43,23 @@ func NewFileReadAt(f *os.File) ReadAtWithContextCloser { } } -func (f *fileReadAt) WithContext(_ context.Context) io.ReaderAt { - return f.File +func (f *fileReadAt) WithContext(ctx context.Context) SizeReaderAt { + return &fileReadAt{ + ctx: ctx, + File: f.File, + } +} + +func (f *fileReadAt) Size() int64 { + fi, err := f.Stat() + if err != nil { + return 0 + } + return fi.Size() } type bReadAt struct { + ctx context.Context path string obj objstore.BucketReader } @@ -54,52 +72,60 @@ func NewBucketReadAt(path string, obj objstore.BucketReader) ReadAtWithContextCl } } -func (b *bReadAt) WithContext(ctx context.Context) io.ReaderAt { - return readAtFunc{ - f: func(p []byte, off int64) (n int, err error) { - rc, err := b.obj.GetRange(ctx, b.path, off, int64(len(p))) - if err != nil { - return 0, err - } - defer func() { _ = rc.Close() }() - n, err = io.ReadFull(rc, p) - if err == io.EOF { - err = nil - } - return - }, +func (b *bReadAt) WithContext(ctx context.Context) SizeReaderAt { + return &bReadAt{ + ctx: ctx, + path: b.path, + obj: b.obj, } } -func (b *bReadAt) Close() error { - return nil +func (b *bReadAt) Size() int64 { + attr, err := b.obj.Attributes(context.Background(), b.path) + if err != nil { + return 0 + } + return attr.Size } -type readAtFunc struct { - f func([]byte, int64) (n int, err error) +func (b *bReadAt) Close() error { + return nil } -func (r readAtFunc) ReadAt(p []byte, off int64) (n int, err error) { - return r.f(p, off) +func (b *bReadAt) ReadAt(p []byte, off int64) (n int, err error) { + rc, err := b.obj.GetRange(b.ctx, b.path, off, int64(len(p))) + if err != nil { + return 0, err + } + defer func() { _ = rc.Close() }() + n, err = io.ReadFull(rc, p) + if err == io.EOF { + err = nil + } + return } type optimisticReaderAt struct { - r io.ReaderAt + r SizeReaderAt b []byte offset int64 } -func (b optimisticReaderAt) ReadAt(p []byte, off int64) (n int, err error) { - if off >= b.offset && off < b.offset+int64(len(b.b)) { - diff := off - b.offset - n := copy(p, b.b[diff:]) +func (or optimisticReaderAt) Size() int64 { + return or.r.Size() +} + +func (or optimisticReaderAt) ReadAt(p []byte, off int64) (n int, err error) { + if off >= or.offset && off < or.offset+int64(len(or.b)) { + diff := off - or.offset + n := copy(p, or.b[diff:]) return n, nil } - return b.r.ReadAt(p, off) + return or.r.ReadAt(p, off) } -func NewOptimisticReaderAt(r io.ReaderAt, minOffset, maxOffset int64) io.ReaderAt { +func NewOptimisticReaderAt(r SizeReaderAt, minOffset, maxOffset int64) SizeReaderAt { if minOffset < maxOffset { b := make([]byte, maxOffset-minOffset) n, err := r.ReadAt(b, minOffset) diff --git a/vendor/modules.txt b/vendor/modules.txt index 6755790bcc0..e550fac2e68 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -957,7 +957,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250716185251-4cfa597e936c +# github.com/prometheus-community/parquet-common v0.0.0-20250801093248-94ad2ac56fa4 ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable