Skip to content

Commit b8746a9

Browse files
[exporter/clickhouseexporter] Support JSON type for logs and traces (open-telemetry#40547)
#### Description Depends on refactoring from open-telemetry#40536 Adds JSON support to the exporter. Summary: - Added feature gate (`clickhouse.json`, alpha stability) for enabling the JSON type pipeline. The `Map` based pipeline is still functional and is the default. ([How to use feature gates](https://github.com/open-telemetry/opentelemetry-collector/blob/main/featuregate/README.md#controlling-gates)) - The JSON type replaces all `Map` columns with the [new JSON type](https://clickhouse.com/docs/sql-reference/data-types/newjson). - JSON is sent to the server as a string. - Due to how the server stores JSON paths, OTel standard paths such as `a.b.c` are returned as objects `a { b { c } }` by the server. This can be changed via server setting in the future to return all data as flattened paths `a.b.c`. Let me know if you have any questions, suggestions, etc. Thanks! #### Testing - Added unit tests for related helper functions - Added integration tests for JSON logs/traces #### Documentation - Updated README, changelog
1 parent a70530a commit b8746a9

20 files changed

+945
-47
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: 'clickhouseexporter'
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Support JSON type for logs and traces
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [40547]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Added a feature gate to enable a JSON pipeline for logs and traces.
20+
This feature gate ID is `clickhouse.json`, and will automatically use the new
21+
DDL and column type on supported server versions.
22+
You may also need to add `enable_json_type=1` to your connection
23+
settings, depending on the server version.
24+
25+
# If your change doesn't affect end users or the exported elements of any package,
26+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
27+
# Optional: The change log or logs in which this entry should be included.
28+
# e.g. '[user]' or '[user, api]'
29+
# Include 'user' if the change is relevant to end users.
30+
# Include 'api' if there is a change to a library API.
31+
# Default: '[user]'
32+
change_logs: [user]

exporter/clickhouseexporter/README.md

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ If the official plugin doesn't meet your needs, you can try the [Altinity plugin
4848
- Get log severity count time series.
4949

5050
```sql
51-
SELECT toDateTime(toStartOfInterval(TimestampTime, INTERVAL 60 second)) as time, SeverityText, count() as count
51+
SELECT toDateTime(toStartOfInterval(Timestamp, INTERVAL 60 second)) as time, SeverityText, count() as count
5252
FROM otel_logs
5353
WHERE time >= NOW() - INTERVAL 1 HOUR
5454
GROUP BY SeverityText, time
@@ -60,7 +60,7 @@ ORDER BY time;
6060
```sql
6161
SELECT Timestamp as log_time, Body
6262
FROM otel_logs
63-
WHERE TimestampTime >= NOW() - INTERVAL 1 HOUR
63+
WHERE Timestamp >= NOW() - INTERVAL 1 HOUR
6464
Limit 100;
6565
```
6666

@@ -70,7 +70,7 @@ Limit 100;
7070
SELECT Timestamp as log_time, Body
7171
FROM otel_logs
7272
WHERE ServiceName = 'clickhouse-exporter'
73-
AND TimestampTime >= NOW() - INTERVAL 1 HOUR
73+
AND Timestamp >= NOW() - INTERVAL 1 HOUR
7474
Limit 100;
7575
```
7676

@@ -80,7 +80,7 @@ Limit 100;
8080
SELECT Timestamp as log_time, Body
8181
FROM otel_logs
8282
WHERE LogAttributes['container_name'] = '/example_flog_1'
83-
AND TimestampTime >= NOW() - INTERVAL 1 HOUR
83+
AND Timestamp >= NOW() - INTERVAL 1 HOUR
8484
Limit 100;
8585
```
8686

@@ -90,7 +90,7 @@ Limit 100;
9090
SELECT Timestamp as log_time, Body
9191
FROM otel_logs
9292
WHERE hasToken(Body, 'http')
93-
AND TimestampTime >= NOW() - INTERVAL 1 HOUR
93+
AND Timestamp >= NOW() - INTERVAL 1 HOUR
9494
Limit 100;
9595
```
9696

@@ -100,7 +100,7 @@ Limit 100;
100100
SELECT Timestamp as log_time, Body
101101
FROM otel_logs
102102
WHERE Body like '%http%'
103-
AND TimestampTime >= NOW() - INTERVAL 1 HOUR
103+
AND Timestamp >= NOW() - INTERVAL 1 HOUR
104104
Limit 100;
105105
```
106106

@@ -110,7 +110,7 @@ Limit 100;
110110
SELECT Timestamp as log_time, Body
111111
FROM otel_logs
112112
WHERE match(Body, 'http')
113-
AND TimestampTime >= NOW() - INTERVAL 1 HOUR
113+
AND Timestamp >= NOW() - INTERVAL 1 HOUR
114114
Limit 100;
115115
```
116116

@@ -120,7 +120,7 @@ Limit 100;
120120
SELECT Timestamp as log_time, Body
121121
FROM otel_logs
122122
WHERE JSONExtractFloat(Body, 'bytes') > 1000
123-
AND TimestampTime >= NOW() - INTERVAL 1 HOUR
123+
AND Timestamp >= NOW() - INTERVAL 1 HOUR
124124
Limit 100;
125125
```
126126

@@ -347,12 +347,12 @@ use the `https` scheme.
347347

348348
## Schema management
349349

350-
By default the exporter will create the database and tables under the names defined in the config. This is fine for simple deployments, but for production workloads, it is recommended that you manage your own schema by setting `create_schema` to `false` in the config.
350+
By default, the exporter will create the database and tables under the names defined in the config. This is fine for simple deployments, but for production workloads, it is recommended that you manage your own schema by setting `create_schema` to `false` in the config.
351351
This prevents each exporter process from racing to create the database and tables, and makes it easier to upgrade the exporter in the future.
352352

353353
In this mode, the only SQL sent to your server will be for `INSERT` statements.
354354

355-
The default DDL used by the exporter can be found in `example/default_ddl`.
355+
The default DDL used by the exporter can be found in `internal/sqltemplates`.
356356
Be sure to customize the indexes, TTL, and partitioning to fit your deployment.
357357
Column names and types must be the same to preserve compatibility with the exporter's `INSERT` statements.
358358
As long as the column names/types match the `INSERT` statement, you can create whatever kind of table you want.
@@ -410,14 +410,23 @@ service:
410410
exporters: [ clickhouse ]
411411
```
412412
413+
## Experimental JSON support
414+
415+
A feature gate is available for testing the experimental JSON pipeline.
416+
Enable the `clickhouse.json` feature gate by following the [feature gate documentation](https://github.com/open-telemetry/opentelemetry-collector/blob/main/featuregate/README.md).
417+
You may also need to add `enable_json_type=1` to your `endpoint` query string parameters.
418+
DDL has been updated, but feel free to tune the schema as needed. DDL can be found in the `internal/sqltemplates` package.
419+
All `Map` columns have been replaced with `JSON`.
420+
ClickHouse v25+ is recommended for reliable JSON support.
421+
413422
## Contributing
414423

415424
Before contributing, review the contribution guidelines in [CONTRIBUTING.md](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md).
416425

417426
#### Integration tests
418427

419-
Integration tests can be run with the following command:
428+
Integration tests can be run with the following command (includes unit tests):
420429
```sh
421-
go test -tags integration -run=TestIntegration
430+
go test -tags integration
422431
```
423432
*Note: Make sure integration tests pass after making changes to SQL.*

exporter/clickhouseexporter/exporter_logs_integration_test.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,9 @@ func newTestLogsExporter(t *testing.T, dsn string, fns ...func(*Config)) *logsEx
3232
}
3333

3434
func verifyExportLogs(t *testing.T, exporter *logsExporter) {
35-
// 3 pushes
36-
mustPushLogsData(t, exporter, simpleLogs(5000))
37-
mustPushLogsData(t, exporter, simpleLogs(5000))
38-
mustPushLogsData(t, exporter, simpleLogs(5000))
35+
pushConcurrentlyNoError(t, func() error {
36+
return exporter.pushLogsData(context.Background(), simpleLogs(5000))
37+
})
3938

4039
type log struct {
4140
Timestamp time.Time `ch:"Timestamp"`
@@ -90,11 +89,6 @@ func verifyExportLogs(t *testing.T, exporter *logsExporter) {
9089
require.Equal(t, expectedLog, actualLog)
9190
}
9291

93-
func mustPushLogsData(t *testing.T, exporter *logsExporter, ld plog.Logs) {
94-
err := exporter.pushLogsData(context.Background(), ld)
95-
require.NoError(t, err)
96-
}
97-
9892
func simpleLogs(count int) plog.Logs {
9993
logs := plog.NewLogs()
10094
rl := logs.ResourceLogs().AppendEmpty()
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package clickhouseexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter"
5+
6+
import (
7+
"context"
8+
"encoding/json"
9+
"fmt"
10+
"time"
11+
12+
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
13+
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/pdata/plog"
15+
"go.uber.org/zap"
16+
17+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal"
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter/internal/sqltemplates"
19+
)
20+
21+
type logsJSONExporter struct {
22+
cfg *Config
23+
logger *zap.Logger
24+
db driver.Conn
25+
insertSQL string
26+
}
27+
28+
func newLogsJSONExporter(logger *zap.Logger, cfg *Config) *logsJSONExporter {
29+
return &logsJSONExporter{
30+
cfg: cfg,
31+
logger: logger,
32+
insertSQL: renderInsertLogsJSONSQL(cfg),
33+
}
34+
}
35+
36+
func (e *logsJSONExporter) start(ctx context.Context, _ component.Host) error {
37+
dsn, err := e.cfg.buildDSN()
38+
if err != nil {
39+
return err
40+
}
41+
42+
e.db, err = internal.NewClickhouseClient(dsn)
43+
if err != nil {
44+
return err
45+
}
46+
47+
if e.cfg.shouldCreateSchema() {
48+
if err := internal.CreateDatabase(ctx, e.db, e.cfg.database(), e.cfg.clusterString()); err != nil {
49+
return err
50+
}
51+
52+
if err := createLogsJSONTable(ctx, e.cfg, e.db); err != nil {
53+
return err
54+
}
55+
}
56+
57+
return nil
58+
}
59+
60+
func (e *logsJSONExporter) shutdown(_ context.Context) error {
61+
if e.db != nil {
62+
if err := e.db.Close(); err != nil {
63+
e.logger.Warn("failed to close json logs db connection", zap.Error(err))
64+
}
65+
}
66+
67+
return nil
68+
}
69+
70+
func (e *logsJSONExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
71+
batch, err := e.db.PrepareBatch(ctx, e.insertSQL)
72+
if err != nil {
73+
return err
74+
}
75+
defer func(batch driver.Batch) {
76+
closeErr := batch.Close()
77+
if closeErr != nil {
78+
e.logger.Warn("failed to close json logs batch", zap.Error(closeErr))
79+
}
80+
}(batch)
81+
82+
processStart := time.Now()
83+
84+
var logCount int
85+
rsLogs := ld.ResourceLogs()
86+
rsLen := rsLogs.Len()
87+
for i := 0; i < rsLen; i++ {
88+
logs := rsLogs.At(i)
89+
res := logs.Resource()
90+
resURL := logs.SchemaUrl()
91+
resAttr := res.Attributes()
92+
serviceName := internal.GetServiceName(resAttr)
93+
resAttrBytes, resAttrErr := json.Marshal(resAttr.AsRaw())
94+
if resAttrErr != nil {
95+
return fmt.Errorf("failed to marshal json log resource attributes: %w", resAttrErr)
96+
}
97+
98+
slLen := logs.ScopeLogs().Len()
99+
for j := 0; j < slLen; j++ {
100+
scopeLog := logs.ScopeLogs().At(j)
101+
scopeURL := scopeLog.SchemaUrl()
102+
scopeLogScope := scopeLog.Scope()
103+
scopeName := scopeLogScope.Name()
104+
scopeVersion := scopeLogScope.Version()
105+
scopeLogRecords := scopeLog.LogRecords()
106+
scopeAttrBytes, scopeAttrErr := json.Marshal(scopeLogScope.Attributes().AsRaw())
107+
if scopeAttrErr != nil {
108+
return fmt.Errorf("failed to marshal json log scope attributes: %w", scopeAttrErr)
109+
}
110+
111+
slrLen := scopeLogRecords.Len()
112+
for k := 0; k < slrLen; k++ {
113+
r := scopeLogRecords.At(k)
114+
logAttrBytes, logAttrErr := json.Marshal(r.Attributes().AsRaw())
115+
if logAttrErr != nil {
116+
return fmt.Errorf("failed to marshal json log attributes: %w", logAttrErr)
117+
}
118+
119+
timestamp := r.Timestamp()
120+
if timestamp == 0 {
121+
timestamp = r.ObservedTimestamp()
122+
}
123+
124+
appendErr := batch.Append(
125+
timestamp.AsTime(),
126+
r.TraceID().String(),
127+
r.SpanID().String(),
128+
uint8(r.Flags()),
129+
r.SeverityText(),
130+
uint8(r.SeverityNumber()),
131+
serviceName,
132+
r.Body().Str(),
133+
resURL,
134+
resAttrBytes,
135+
scopeURL,
136+
scopeName,
137+
scopeVersion,
138+
scopeAttrBytes,
139+
logAttrBytes,
140+
)
141+
if appendErr != nil {
142+
return fmt.Errorf("failed to append json log row: %w", appendErr)
143+
}
144+
145+
logCount++
146+
}
147+
}
148+
}
149+
150+
processDuration := time.Since(processStart)
151+
networkStart := time.Now()
152+
if sendErr := batch.Send(); sendErr != nil {
153+
return fmt.Errorf("logs json insert failed: %w", sendErr)
154+
}
155+
156+
networkDuration := time.Since(networkStart)
157+
totalDuration := time.Since(processStart)
158+
e.logger.Debug("insert json logs",
159+
zap.Int("records", logCount),
160+
zap.String("process_cost", processDuration.String()),
161+
zap.String("network_cost", networkDuration.String()),
162+
zap.String("total_cost", totalDuration.String()))
163+
164+
return nil
165+
}
166+
167+
func renderInsertLogsJSONSQL(cfg *Config) string {
168+
return fmt.Sprintf(sqltemplates.LogsJSONInsert, cfg.database(), cfg.LogsTableName)
169+
}
170+
171+
func renderCreateLogsJSONTableSQL(cfg *Config) string {
172+
ttlExpr := internal.GenerateTTLExpr(cfg.TTL, "Timestamp")
173+
return fmt.Sprintf(sqltemplates.LogsJSONCreateTable,
174+
cfg.database(), cfg.LogsTableName, cfg.clusterString(),
175+
cfg.tableEngineString(),
176+
ttlExpr,
177+
)
178+
}
179+
180+
func createLogsJSONTable(ctx context.Context, cfg *Config, db driver.Conn) error {
181+
if err := db.Exec(ctx, renderCreateLogsJSONTableSQL(cfg)); err != nil {
182+
return fmt.Errorf("exec create logs json table sql: %w", err)
183+
}
184+
185+
return nil
186+
}

0 commit comments

Comments
 (0)