Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.24.1

require (
cloud.google.com/go/storage v1.59.1
github.com/ClickHouse/clickhouse-go/v2 v2.43.0
github.com/alecthomas/kong v0.9.0
github.com/apache/arrow-go/v18 v18.5.1
github.com/cenkalti/backoff/v4 v4.3.0
Expand Down Expand Up @@ -83,6 +84,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v4 v4.3.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 // indirect
github.com/ClickHouse/ch-go v0.71.0 // indirect
github.com/Code-Hex/go-generics-cache v1.5.1 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.54.0 // indirect
Expand Down Expand Up @@ -124,9 +126,9 @@ require (
github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect
github.com/dgryski/go-metro v0.0.0-20250106013310-edb8663e5e33 // indirect
github.com/digitalocean/godo v1.152.0 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/docker v28.3.3+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/docker v28.5.2+incompatible // indirect
github.com/docker/go-connections v0.6.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/efficientgo/core v1.0.0-rc.2 // indirect
Expand All @@ -137,6 +139,8 @@ require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/gin-gonic/gin v1.9.1 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/go-jose/go-jose/v4 v4.1.3 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
Expand Down Expand Up @@ -201,8 +205,6 @@ require (
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
github.com/moby/sys/sequential v0.6.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mozillazg/go-httpheader v0.2.1 // indirect
Expand All @@ -213,10 +215,11 @@ require (
github.com/oklog/ulid v1.3.1 // indirect
github.com/oklog/ulid/v2 v2.1.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc4 // indirect
github.com/opencontainers/image-spec v1.1.1 // indirect
github.com/oracle/oci-go-sdk/v65 v65.41.1 // indirect
github.com/ovh/go-ovh v1.8.0 // indirect
github.com/pierrec/lz4/v4 v4.1.23 // indirect
github.com/paulmach/orb v0.12.0 // indirect
github.com/pierrec/lz4/v4 v4.1.25 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
Expand All @@ -227,6 +230,8 @@ require (
github.com/rs/cors v1.8.0 // indirect
github.com/rs/xid v1.5.0 // indirect
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.33 // indirect
github.com/segmentio/asm v1.2.1 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect
Expand All @@ -242,6 +247,7 @@ require (
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.39.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/crypto v0.47.0 // indirect
golang.org/x/mod v0.32.0 // indirect
golang.org/x/sys v0.40.0 // indirect
Expand Down
57 changes: 45 additions & 12 deletions go.sum

Large diffs are not rendered by default.

140 changes: 140 additions & 0 deletions pkg/clickhouse/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2024-2026 The Parca Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clickhouse

import (
"context"
"crypto/tls"
"fmt"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

// Config holds ClickHouse connection configuration.
type Config struct {
Address string
Database string
Username string
Password string
Table string
Secure bool
}

// Client is a wrapper around the ClickHouse connection.
type Client struct {
conn driver.Conn
cfg Config
}

// NewClient creates a new ClickHouse client with the given configuration.
// It first connects without a database to ensure the database can be created,
// then reconnects with the database specified.
func NewClient(ctx context.Context, cfg Config) (*Client, error) {
// First, connect without specifying a database to allow database creation
opts := &clickhouse.Options{
Addr: []string{cfg.Address},
Auth: clickhouse.Auth{
Username: cfg.Username,
Password: cfg.Password,
},
}

if cfg.Secure {
opts.TLS = &tls.Config{}
}

conn, err := clickhouse.Open(opts)
if err != nil {
return nil, fmt.Errorf("failed to open ClickHouse connection: %w", err)
}

// Create database if it doesn't exist
if err := conn.Exec(ctx, fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", cfg.Database)); err != nil {
conn.Close()
return nil, fmt.Errorf("failed to create database: %w", err)
}

// Close the initial connection
conn.Close()

// Now connect with the database specified
opts.Auth.Database = cfg.Database
conn, err = clickhouse.Open(opts)
if err != nil {
return nil, fmt.Errorf("failed to open ClickHouse connection with database: %w", err)
}

return &Client{
conn: conn,
cfg: cfg,
}, nil
}

// Close closes the ClickHouse connection.
func (c *Client) Close() error {
return c.conn.Close()
}

// Conn returns the underlying ClickHouse connection.
func (c *Client) Conn() driver.Conn {
return c.conn
}

// Config returns the client configuration.
func (c *Client) Config() Config {
return c.cfg
}

// Database returns the database name.
func (c *Client) Database() string {
return c.cfg.Database
}

// Table returns the table name.
func (c *Client) Table() string {
return c.cfg.Table
}

// FullTableName returns the fully qualified table name (database.table).
func (c *Client) FullTableName() string {
return fmt.Sprintf("%s.%s", c.cfg.Database, c.cfg.Table)
}

// EnsureSchema creates the table if it doesn't exist.
// Note: The database is already created in NewClient.
func (c *Client) EnsureSchema(ctx context.Context) error {
// Create table using the schema definition
schema := CreateTableSQL(c.cfg.Database, c.cfg.Table)
if err := c.conn.Exec(ctx, schema); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}

return nil
}

// Query executes a query and returns the rows.
func (c *Client) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
return c.conn.Query(ctx, query, args...)
}

// Exec executes a query without returning rows.
func (c *Client) Exec(ctx context.Context, query string, args ...interface{}) error {
return c.conn.Exec(ctx, query, args...)
}

// PrepareBatch prepares a batch for insertion.
func (c *Client) PrepareBatch(ctx context.Context, query string) (driver.Batch, error) {
return c.conn.PrepareBatch(ctx, query)
}
163 changes: 163 additions & 0 deletions pkg/clickhouse/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2024-2026 The Parca Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clickhouse

import (
"fmt"
"strings"

"github.com/prometheus/prometheus/model/labels"

"github.com/parca-dev/parca/pkg/profile"
)

// ProfileTypeFilter generates SQL WHERE clause conditions for profile type filtering.
func ProfileTypeFilter(qp profile.QueryParts) (string, []interface{}) {
conditions := []string{
"name = ?",
"sample_type = ?",
"sample_unit = ?",
"period_type = ?",
"period_unit = ?",
}
args := []interface{}{
qp.Meta.Name,
qp.Meta.SampleType.Type,
qp.Meta.SampleType.Unit,
qp.Meta.PeriodType.Type,
qp.Meta.PeriodType.Unit,
}

if qp.Delta {
conditions = append(conditions, "duration != 0")
} else {
conditions = append(conditions, "duration = 0")
}

return strings.Join(conditions, " AND "), args
}

// LabelMatchersToSQL converts Prometheus label matchers to SQL WHERE clause conditions.
func LabelMatchersToSQL(matchers []*labels.Matcher) (string, []interface{}, error) {
if len(matchers) == 0 {
return "", nil, nil
}

conditions := make([]string, 0, len(matchers))
args := make([]interface{}, 0, len(matchers))

for _, m := range matchers {
condition, arg, err := matcherToSQL(m)
if err != nil {
return "", nil, err
}
conditions = append(conditions, condition)
if arg != nil {
args = append(args, arg)
}
}

return strings.Join(conditions, " AND "), args, nil
}

// matcherToSQL converts a single Prometheus label matcher to a SQL condition.
func matcherToSQL(m *labels.Matcher) (string, interface{}, error) {
// Use ClickHouse JSON path syntax for label access
labelPath := fmt.Sprintf("labels.%s", m.Name)

switch m.Type {
case labels.MatchEqual:
if m.Value == "" {
// Empty value means label should not exist or be null
return fmt.Sprintf("(%s IS NULL OR %s = '')", labelPath, labelPath), nil, nil
}
return fmt.Sprintf("%s = ?", labelPath), m.Value, nil

case labels.MatchNotEqual:
if m.Value == "" {
// Not empty means label should exist and not be null/empty
return fmt.Sprintf("(%s IS NOT NULL AND %s != '')", labelPath, labelPath), nil, nil
}
return fmt.Sprintf("(%s != ? OR %s IS NULL)", labelPath, labelPath), m.Value, nil

case labels.MatchRegexp:
// ClickHouse uses match() for regex
return fmt.Sprintf("match(toString(%s), ?)", labelPath), m.Value, nil

case labels.MatchNotRegexp:
return fmt.Sprintf("NOT match(toString(%s), ?)", labelPath), m.Value, nil

default:
return "", nil, fmt.Errorf("unsupported matcher type: %v", m.Type)
}
}

// TimeRangeFilter generates SQL WHERE clause conditions for time range filtering.
func TimeRangeFilter(startNanos, endNanos int64) (string, []interface{}) {
return "time_nanos >= ? AND time_nanos <= ?", []interface{}{startNanos, endNanos}
}

// BuildWhereClause combines multiple filter conditions into a single WHERE clause.
func BuildWhereClause(conditions []string, allArgs []interface{}) (string, []interface{}) {
nonEmpty := make([]string, 0, len(conditions))
for _, c := range conditions {
if c != "" {
nonEmpty = append(nonEmpty, c)
}
}

if len(nonEmpty) == 0 {
return "", nil
}

return "WHERE " + strings.Join(nonEmpty, " AND "), allArgs
}

// QueryToFilters converts a query string and time range to SQL filter components.
func QueryToFilters(query string, startNanos, endNanos int64) (string, []interface{}, profile.QueryParts, error) {
qp, err := profile.ParseQuery(query)
if err != nil {
return "", nil, qp, err
}

// Profile type filter
profileFilter, profileArgs := ProfileTypeFilter(qp)

// Label matchers filter
labelFilter, labelArgs, err := LabelMatchersToSQL(qp.Matchers)
if err != nil {
return "", nil, qp, err
}

// Time range filter
timeFilter, timeArgs := TimeRangeFilter(startNanos, endNanos)

// Combine all conditions
conditions := []string{profileFilter}
args := append([]interface{}{}, profileArgs...)

if labelFilter != "" {
conditions = append(conditions, labelFilter)
args = append(args, labelArgs...)
}

if startNanos != 0 || endNanos != 0 {
conditions = append(conditions, timeFilter)
args = append(args, timeArgs...)
}

whereClause, _ := BuildWhereClause(conditions, args)

return whereClause, args, qp, nil
}
Loading