Skip to content

Commit 338e921

Browse files
authored
chore: consolidation time metric (#35)
* chore: add ConsolidationRunDuration metric * refactor: add env attribute to the provider instead of every metric * go mod tidy * tidy again
1 parent ccb1509 commit 338e921

File tree

6 files changed

+35
-27
lines changed

6 files changed

+35
-27
lines changed

cmd/etracker/start.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ import (
3333
"github.com/storacha/etracker/internal/presets"
3434
"github.com/storacha/etracker/internal/server"
3535
"github.com/storacha/etracker/internal/service"
36-
"go.opentelemetry.io/otel/attribute"
37-
"go.opentelemetry.io/otel/metric"
3836
)
3937

4038
var startCmd = &cobra.Command{
@@ -179,8 +177,6 @@ func startService(cmd *cobra.Command, args []string) error {
179177
}
180178
}
181179

182-
env := cfg.MetricsEnvironment
183-
184180
// Create DynamoDB client
185181
dynamoClient := dynamodb.NewFromConfig(cfg.AWSConfig)
186182

@@ -203,7 +199,7 @@ func startService(cmd *cobra.Command, args []string) error {
203199

204200
// Initialize metrics if metrics are configured
205201
if cfg.MetricsAuthToken != "" {
206-
if err := metrics.Init(); err != nil {
202+
if err := metrics.Init(cfg.MetricsEnvironment); err != nil {
207203
return fmt.Errorf("initializing metrics: %w", err)
208204
}
209205

@@ -212,16 +208,14 @@ func startService(cmd *cobra.Command, args []string) error {
212208
if err != nil {
213209
log.Warnf("failed to reconcile unprocessed batches count: %v", err)
214210
} else {
215-
envAttr := attribute.String("env", env)
216-
metrics.UnprocessedBatches.Add(ctx, count, metric.WithAttributeSet(attribute.NewSet(envAttr)))
211+
metrics.UnprocessedBatches.Add(ctx, count)
217212
log.Infof("reconciled unprocessed batches count: %d", count)
218213
}
219214
}
220215

221216
// Create service
222217
svc, err := service.New(
223218
id,
224-
env,
225219
egressTable,
226220
consolidatedTable,
227221
storageProviderTable,
@@ -272,7 +266,6 @@ func startService(cmd *cobra.Command, args []string) error {
272266

273267
cons, err := consolidator.New(
274268
id,
275-
env,
276269
egressTable,
277270
consolidatedTable,
278271
spaceStatsTable,

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
go.opentelemetry.io/otel v1.38.0
2121
go.opentelemetry.io/otel/exporters/prometheus v0.60.0
2222
go.opentelemetry.io/otel/metric v1.38.0
23+
go.opentelemetry.io/otel/sdk v1.38.0
2324
go.opentelemetry.io/otel/sdk/metric v1.38.0
2425
)
2526

@@ -116,7 +117,6 @@ require (
116117
github.com/ucan-wg/go-ucan v0.0.0-20240916120445-37f52863156c // indirect
117118
github.com/whyrusleeping/cbor-gen v0.3.1 // indirect
118119
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
119-
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
120120
go.opentelemetry.io/otel/trace v1.38.0 // indirect
121121
go.uber.org/atomic v1.11.0 // indirect
122122
go.uber.org/mock v0.5.2 // indirect

internal/consolidator/consolidator.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ var ErrNotFound = consolidated.ErrNotFound
4545

4646
type Consolidator struct {
4747
id principal.Signer
48-
environment string
4948
egressTable egress.EgressTable
5049
consolidatedTable consolidated.ConsolidatedTable
5150
spaceStatsTable spacestats.SpaceStatsTable
@@ -61,7 +60,6 @@ type Consolidator struct {
6160

6261
func New(
6362
id principal.Signer,
64-
environment string,
6563
egressTable egress.EgressTable,
6664
consolidatedTable consolidated.ConsolidatedTable,
6765
spaceStatsTable spacestats.SpaceStatsTable,
@@ -91,7 +89,6 @@ func New(
9189

9290
c := &Consolidator{
9391
id: id,
94-
environment: environment,
9592
egressTable: egressTable,
9693
consolidatedTable: consolidatedTable,
9794
spaceStatsTable: spaceStatsTable,
@@ -145,8 +142,12 @@ func (c *Consolidator) Stop() {
145142
func (c *Consolidator) Consolidate(ctx context.Context) error {
146143
log.Info("Starting consolidation cycle")
147144

148-
// Environment attribute for metrics
149-
envAttr := attribute.String("env", c.environment)
145+
// Track consolidation run duration
146+
startTime := time.Now()
147+
defer func() {
148+
durationMs := time.Since(startTime).Milliseconds()
149+
metrics.ConsolidationRunDuration.Record(ctx, durationMs)
150+
}()
150151

151152
// Get unprocessed records
152153
records, err := c.egressTable.GetUnprocessed(ctx, c.batchSize)
@@ -226,7 +227,7 @@ func (c *Consolidator) Consolidate(ctx context.Context) error {
226227

227228
// Increment consolidated bytes counter for this node
228229
nodeAttr := attribute.String("node", record.Node.String())
229-
metrics.ConsolidatedBytesPerNode.Add(ctx, int64(totalEgress), metric.WithAttributeSet(attribute.NewSet(nodeAttr, envAttr)))
230+
metrics.ConsolidatedBytesPerNode.Add(ctx, int64(totalEgress), metric.WithAttributeSet(attribute.NewSet(nodeAttr)))
230231

231232
bLog.Infof("Consolidated %d bytes", totalEgress)
232233
}
@@ -236,7 +237,7 @@ func (c *Consolidator) Consolidate(ctx context.Context) error {
236237
return fmt.Errorf("marking records as processed: %w", err)
237238
}
238239

239-
metrics.UnprocessedBatches.Add(ctx, int64(-len(records)), metric.WithAttributeSet(attribute.NewSet(envAttr)))
240+
metrics.UnprocessedBatches.Add(ctx, int64(-len(records)))
240241

241242
log.Infof("Consolidation cycle completed. Processed %d records", len(records))
242243

internal/consolidator/consolidator_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ func TestValidateRetrievalReceipt(t *testing.T) {
8383
// Create a consolidator instance to test the validation context it creates works as expected
8484
c, err := New(
8585
consolidatorID,
86-
"test",
8786
nil,
8887
nil,
8988
nil,

internal/metrics/metrics.go

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import (
55

66
logging "github.com/ipfs/go-log/v2"
77
"go.opentelemetry.io/otel"
8+
"go.opentelemetry.io/otel/attribute"
89
"go.opentelemetry.io/otel/exporters/prometheus"
910
"go.opentelemetry.io/otel/metric"
1011
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
12+
"go.opentelemetry.io/otel/sdk/resource"
1113
)
1214

1315
var log = logging.Logger("metrics")
@@ -21,17 +23,26 @@ var (
2123

2224
// UnprocessedBatches keeps track of the total number of batches pending consolidation
2325
UnprocessedBatches metric.Int64UpDownCounter
26+
27+
// ConsolidationRunDuration tracks the time (in milliseconds) each consolidation run takes to process all batches
28+
ConsolidationRunDuration metric.Int64Histogram
2429
)
2530

2631
// Init initializes the OpenTelemetry metrics with Prometheus exporter
27-
func Init() error {
32+
func Init(environment string) error {
2833
exporter, err := prometheus.New()
2934
if err != nil {
3035
return fmt.Errorf("failed to create prometheus exporter: %w", err)
3136
}
3237

33-
// Create a MeterProvider with the Prometheus exporter
34-
provider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(exporter))
38+
// Create a resource with the environment attribute
39+
res := resource.NewSchemaless(attribute.String("env", environment))
40+
41+
// Create a MeterProvider with the Prometheus exporter and resource
42+
provider := sdkmetric.NewMeterProvider(
43+
sdkmetric.WithReader(exporter),
44+
sdkmetric.WithResource(res),
45+
)
3546

3647
// Set the global MeterProvider
3748
otel.SetMeterProvider(provider)
@@ -64,6 +75,14 @@ func Init() error {
6475
return fmt.Errorf("failed to create UnprocessedBatches counter: %w", err)
6576
}
6677

78+
ConsolidationRunDuration, err = meter.Int64Histogram(
79+
"etracker_consolidation_run_duration_ms",
80+
metric.WithDescription("Time in milliseconds for each consolidation run to process all batches"),
81+
)
82+
if err != nil {
83+
return fmt.Errorf("failed to create ConsolidationRunDuration histogram: %w", err)
84+
}
85+
6786
log.Info("OpenTelemetry metrics initialized with Prometheus exporter")
6887
return nil
6988
}

internal/service/service.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ type Service interface {
107107

108108
type service struct {
109109
id principal.Signer
110-
environment string
111110
egressTable egress.EgressTable
112111
consolidatedTable consolidated.ConsolidatedTable
113112
storageProviderTable storageproviders.StorageProviderTable
@@ -118,7 +117,6 @@ type service struct {
118117

119118
func New(
120119
id principal.Signer,
121-
environment string,
122120
egressTable egress.EgressTable,
123121
consolidatedTable consolidated.ConsolidatedTable,
124122
storageProviderTable storageproviders.StorageProviderTable,
@@ -128,7 +126,6 @@ func New(
128126
) (*service, error) {
129127
return &service{
130128
id: id,
131-
environment: environment,
132129
egressTable: egressTable,
133130
consolidatedTable: consolidatedTable,
134131
storageProviderTable: storageProviderTable,
@@ -144,9 +141,8 @@ func (s *service) Record(ctx context.Context, node did.DID, receipts ucan.Link,
144141
}
145142

146143
nodeAttr := attribute.String("node", node.String())
147-
envAttr := attribute.String("env", s.environment)
148-
metrics.TrackedBatchesPerNode.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(nodeAttr, envAttr)))
149-
metrics.UnprocessedBatches.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(envAttr)))
144+
metrics.TrackedBatchesPerNode.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(nodeAttr)))
145+
metrics.UnprocessedBatches.Add(ctx, 1)
150146

151147
return nil
152148
}

0 commit comments

Comments
 (0)