Skip to content

Commit 1d9b563

Browse files
committed
log: add support for migrating logs to different channels
Adds a safe and backwards compatible means of moving logs from one channel to another. The new log.Migrator and log.StructuredEventMigrator provide engineers with a way of safely migrating logs from one channel to another with minimal interruption to customers. Both of these migrators require a `shouldMigrate` callback which is used to determine whether to log to a new or old logging channel. The StructuredEventMigrator only requires a new channel to be specified, which will be used instead of the channel defined on the event itself. Note that the event proto doc string should still be updated to document that it may be logged to a different channel. The Migrator struct requires an old and new channel to write to. Both of the migrators will write to the "new" channel if `shouldMigrate` returns true, otherwise it writes to the "old" channel. Part of: CRDB-53412 Epic: CRDB-53410 Release note: None
1 parent d5e3063 commit 1d9b563

File tree

8 files changed

+415
-23
lines changed

8 files changed

+415
-23
lines changed

docs/generated/settings/settings-for-tenants.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ kv.transaction.write_pipelining.locking_reads.enabled boolean true if enabled, t
106106
kv.transaction.write_pipelining.ranged_writes.enabled boolean true if enabled, transactional ranged writes are pipelined through Raft consensus application
107107
kv.transaction.write_pipelining.enabled (alias: kv.transaction.write_pipelining_enabled) boolean true if enabled, transactional writes are pipelined through Raft consensus application
108108
kv.transaction.write_pipelining.max_batch_size (alias: kv.transaction.write_pipelining_max_batch_size) integer 128 if non-zero, defines that maximum size batch that will be pipelined through Raft consensus application
109+
log.channel_compatibility_mode.enabled boolean true when true, logs will continue to log to the expected logging channels; when false, logs will be moved to new logging channels as part of a logging channel consolidation effort application
109110
obs.tablemetadata.automatic_updates.enabled boolean false enables automatic updates of the table metadata cache system.table_metadata application
110111
obs.tablemetadata.data_valid_duration duration 20m0s the duration for which the data in system.table_metadata is considered valid application
111112
schedules.backup.gc_protection.enabled boolean true enable chaining of GC protection across backups run as part of a schedule application

docs/generated/settings/settings.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@
137137
<tr><td><div id="setting-kv-transaction-write-pipelining-max-batch-size" class="anchored"><code>kv.transaction.write_pipelining.max_batch_size<br />(alias: kv.transaction.write_pipelining_max_batch_size)</code></div></td><td>integer</td><td><code>128</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
138138
<tr><td><div id="setting-kvadmission-store-provisioned-bandwidth" class="anchored"><code>kvadmission.store.provisioned_bandwidth</code></div></td><td>byte size</td><td><code>0 B</code></td><td>if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be overridden on a per-store basis using the --store flag. Note that setting the provisioned bandwidth to a positive value may enable disk bandwidth based admission control, since admission.disk_bandwidth_tokens.elastic.enabled defaults to true</td><td>Dedicated/Self-Hosted</td></tr>
139139
<tr><td><div id="setting-kvadmission-store-snapshot-ingest-bandwidth-control-enabled" class="anchored"><code>kvadmission.store.snapshot_ingest_bandwidth_control.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set to true, snapshot ingests will be subject to disk write control in AC</td><td>Dedicated/Self-Hosted</td></tr>
140+
<tr><td><div id="setting-log-channel-compatibility-mode-enabled" class="anchored"><code>log.channel_compatibility_mode.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>when true, logs will continue to log to the expected logging channels; when false, logs will be moved to new logging channels as part of a logging channel consolidation effort</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
140141
<tr><td><div id="setting-obs-tablemetadata-automatic-updates-enabled" class="anchored"><code>obs.tablemetadata.automatic_updates.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>enables automatic updates of the table metadata cache system.table_metadata</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
141142
<tr><td><div id="setting-obs-tablemetadata-data-valid-duration" class="anchored"><code>obs.tablemetadata.data_valid_duration</code></div></td><td>duration</td><td><code>20m0s</code></td><td>the duration for which the data in system.table_metadata is considered valid</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
142143
<tr><td><div id="setting-schedules-backup-gc-protection-enabled" class="anchored"><code>schedules.backup.gc_protection.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>enable chaining of GC protection across backups run as part of a schedule</td><td>Serverless/Dedicated/Self-Hosted</td></tr>

pkg/testutils/lint/passes/fmtsafe/functions.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,13 @@ var requireConstFmt = map[string]bool{
177177

178178
"(*github.com/cockroachdb/cockroach/pkg/cloud/amazon.awsLogAdapter).Logf": true,
179179

180+
"(github.com/cockroachdb/cockroach/pkg/util/log.Migrator).logfDepth": true,
181+
"(github.com/cockroachdb/cockroach/pkg/util/log.Migrator).Infof": true,
182+
"(github.com/cockroachdb/cockroach/pkg/util/log.Migrator).Warningf": true,
183+
"(github.com/cockroachdb/cockroach/pkg/util/log.Migrator).Errorf": true,
184+
"(github.com/cockroachdb/cockroach/pkg/util/log.Migrator).Fatalf": true,
185+
"(github.com/cockroachdb/cockroach/pkg/util/log.Migrator).VEventf": true,
186+
180187
// Error things are populated in the init() message.
181188
}
182189

pkg/util/log/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ go_library(
3434
"log_entry.go",
3535
"log_flush.go",
3636
"metric.go",
37+
"migrator.go",
3738
"otlp_client.go",
3839
"redact.go",
3940
"registry.go",
@@ -76,6 +77,7 @@ go_library(
7677
"//pkg/util/log/logflags",
7778
"//pkg/util/log/logpb",
7879
"//pkg/util/log/severity",
80+
"//pkg/util/metamorphic",
7981
"//pkg/util/syncutil",
8082
"//pkg/util/sysutil",
8183
"//pkg/util/timeutil",
@@ -178,6 +180,7 @@ go_test(
178180
"intercept_test.go",
179181
"log_decoder_test.go",
180182
"main_test.go",
183+
"migrator_test.go",
181184
"otlp_client_test.go",
182185
"redact_test.go",
183186
"registry_test.go",
@@ -194,6 +197,7 @@ go_test(
194197
"//pkg/build",
195198
"//pkg/cli/exit",
196199
"//pkg/settings/cluster",
200+
"//pkg/testutils/datapathutils",
197201
"//pkg/util/caller",
198202
"//pkg/util/ctxgroup",
199203
"//pkg/util/encoding",

pkg/util/log/event_log.go

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -96,29 +96,7 @@ func StructuredEvent(ctx context.Context, sev logpb.Severity, event logpb.EventP
9696
func StructuredEventDepth(
9797
ctx context.Context, sev logpb.Severity, depth int, event logpb.EventPayload,
9898
) {
99-
// Populate the missing common fields.
100-
common := event.CommonDetails()
101-
if common.Timestamp == 0 {
102-
common.Timestamp = timeutil.Now().UnixNano()
103-
}
104-
if len(common.EventType) == 0 {
105-
common.EventType = logpb.GetEventTypeName(event)
106-
}
107-
108-
entry := makeStructuredEntry(ctx,
109-
sev,
110-
event.LoggingChannel(),
111-
depth+1,
112-
event)
113-
114-
if sp := getSpan(ctx); sp != nil {
115-
// Prevent `entry` from moving to the heap when this branch is not taken.
116-
heapEntry := entry
117-
eventInternal(sp, entry.sev >= severity.ERROR, &heapEntry)
118-
}
119-
120-
logger := logging.getLogger(entry.ch)
121-
logger.outputLogEntry(entry)
99+
structuredEventDepth(ctx, sev, depth+1, event.LoggingChannel(), event)
122100
}
123101

124102
// EventLog emits a structured event log and writes it to the system.eventlog
@@ -187,3 +165,31 @@ func WriteAsync() StructuredEventSettingsFunc {
187165
return o
188166
}
189167
}
168+
169+
func structuredEventDepth(
170+
ctx context.Context, sev logpb.Severity, depth int, ch Channel, event logpb.EventPayload,
171+
) {
172+
// Populate the missing common fields.
173+
common := event.CommonDetails()
174+
if common.Timestamp == 0 {
175+
common.Timestamp = timeutil.Now().UnixNano()
176+
}
177+
if len(common.EventType) == 0 {
178+
common.EventType = logpb.GetEventTypeName(event)
179+
}
180+
181+
entry := makeStructuredEntry(ctx,
182+
sev,
183+
ch,
184+
depth+1,
185+
event)
186+
187+
if sp := getSpan(ctx); sp != nil {
188+
// Prevent `entry` from moving to the heap when this branch is not taken.
189+
heapEntry := entry
190+
eventInternal(sp, entry.sev >= severity.ERROR, &heapEntry)
191+
}
192+
193+
logger := logging.getLogger(entry.ch)
194+
logger.outputLogEntry(entry)
195+
}

pkg/util/log/migrator.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package log
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/settings"
12+
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
13+
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
14+
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
15+
)
16+
17+
// ChannelCompatibilityModeEnabled controls whether certain logs are routed
18+
// to newly defined logging channels or continue to use their original ones.
19+
var ChannelCompatibilityModeEnabled = settings.RegisterBoolSetting(
20+
settings.ApplicationLevel,
21+
"log.channel_compatibility_mode.enabled",
22+
"when true, logs will continue to log to the expected logging channels; "+
23+
"when false, logs will be moved to new logging channels as part of a "+
24+
"logging channel consolidation effort",
25+
metamorphic.ConstantWithTestBool("log.channel_compatibility_mode.enabled", true),
26+
settings.WithPublic,
27+
)
28+
29+
func ShouldMigrateEvent(sv *settings.Values) bool {
30+
return !ChannelCompatibilityModeEnabled.Get(sv)
31+
}
32+
33+
// StructuredEventMigrator handles conditional routing of structured events
34+
// between old and new logging channels based on a shouldMigrate function.
35+
type StructuredEventMigrator struct {
36+
shouldMigrate func() bool
37+
newChannel Channel
38+
}
39+
40+
// NewStructuredEventMigrator creates a new StructuredEventMigrator that routes
41+
// structured events to either the specified new channel (when shouldMigrate returns
42+
// true) or to the event's original channel (when shouldMigrate returns false).
43+
func NewStructuredEventMigrator(
44+
shouldMigrate func() bool, newChannel Channel,
45+
) StructuredEventMigrator {
46+
return StructuredEventMigrator{
47+
shouldMigrate: shouldMigrate,
48+
newChannel: newChannel,
49+
}
50+
}
51+
52+
// StructuredEvent logs a structured event using the migrator's routing logic.
53+
func (sem StructuredEventMigrator) StructuredEvent(
54+
ctx context.Context, sev logpb.Severity, event logpb.EventPayload,
55+
) {
56+
sem.structuredEventDepth(ctx, sev, 1, event)
57+
}
58+
59+
// StructuredEventDepth logs a structured event with a custom stack depth
60+
// for accurate caller identification in logs.
61+
func (sem StructuredEventMigrator) StructuredEventDepth(
62+
ctx context.Context, sev logpb.Severity, depth int, event logpb.EventPayload,
63+
) {
64+
sem.structuredEventDepth(ctx, sev, depth+1, event)
65+
}
66+
67+
// structuredEventDepth is the internal implementation that performs the actual
68+
// channel routing based on the shouldMigrate function value.
69+
func (sem StructuredEventMigrator) structuredEventDepth(
70+
ctx context.Context, sev logpb.Severity, depth int, event logpb.EventPayload,
71+
) {
72+
if sem.shouldMigrate() {
73+
structuredEventDepth(ctx, sev, depth+1, sem.newChannel, event)
74+
} else {
75+
structuredEventDepth(ctx, sev, depth+1, event.LoggingChannel(), event)
76+
77+
}
78+
}
79+
80+
// Migrator handles conditional routing of formatted log messages between
81+
// deprecated and new logging channels based on a migration setting.
82+
type Migrator struct {
83+
shouldMigrate func() bool
84+
oldChannel Channel
85+
newChannel Channel
86+
}
87+
88+
// NewMigrator creates a new Migrator that routes log messages between old and new
89+
// channels based on the shouldMigrate function.
90+
func NewMigrator(shouldMigrate func() bool, oldChannel Channel, newChannel Channel) Migrator {
91+
return Migrator{shouldMigrate: shouldMigrate, oldChannel: oldChannel, newChannel: newChannel}
92+
}
93+
94+
// logfDepth is the internal helper that routes log messages to either the
95+
// new channel (when shouldMigrate returns true) or old channel (when shouldMigrate returns false).
96+
func (m Migrator) logfDepth(
97+
ctx context.Context, sev Severity, depth int, format string, args ...interface{},
98+
) {
99+
if m.shouldMigrate() {
100+
logfDepth(ctx, depth+1, sev, m.newChannel, format, args...)
101+
} else {
102+
logfDepth(ctx, depth+1, sev, m.oldChannel, format, args...)
103+
}
104+
}
105+
106+
// Infof logs an info-level message using the migrator's routing logic.
107+
func (m Migrator) Infof(ctx context.Context, format string, args ...interface{}) {
108+
m.logfDepth(ctx, severity.INFO, 1, format, args...)
109+
}
110+
111+
// Warningf logs a warning-level message using the migrator's routing logic.
112+
func (m Migrator) Warningf(ctx context.Context, format string, args ...interface{}) {
113+
m.logfDepth(ctx, severity.WARNING, 1, format, args...)
114+
}
115+
116+
// Errorf logs an error-level message using the migrator's routing logic.
117+
func (m Migrator) Errorf(ctx context.Context, format string, args ...interface{}) {
118+
m.logfDepth(ctx, severity.ERROR, 1, format, args...)
119+
}
120+
121+
// Fatalf logs a fatal-level message using the migrator's routing logic.
122+
func (m Migrator) Fatalf(ctx context.Context, format string, args ...interface{}) {
123+
m.logfDepth(ctx, severity.FATAL, 1, format, args...)
124+
}
125+
126+
func (m Migrator) VEventf(ctx context.Context, level Level, format string, args ...interface{}) {
127+
selectedChannel := m.oldChannel
128+
if m.shouldMigrate() {
129+
selectedChannel = m.newChannel
130+
}
131+
vEventf(ctx, false /* isErr */, 1, level, selectedChannel, format, args...)
132+
}

0 commit comments

Comments
 (0)