Skip to content

Commit 9d0fbff

Browse files
authored
Add an optional monitoring of topic processing delay to ingester pipeline (#4453)
This will: - Expose metrics to show how far behind each partition of the topic being consume is in ms - We utilise peekMessages on the pulsar REST/admin api to get this information - It is optional and off by default The purpose of this is so you can see how far behind the ingester is in time (i.e 3 minutes) - pulsar only exposes backlog in number of messages, so its hard to tell how much impact a delay is having (i.e 1000 messages could be 1 second behind or 10 minutes) Also exposes a metric that shows the publish time of messages being processed by the pipeline for each partition **Note** I've had to "fork" some of pulsar-client-go due to a bug in the client. This is being fixed in apache/pulsar-client-go#1419. Once that is in the upstream client we can remove this internal code copy/fork --------- Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
1 parent bb2c0f0 commit 9d0fbff

File tree

13 files changed

+868
-31
lines changed

13 files changed

+868
-31
lines changed

config/eventingester/config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@ redis:
66
poolSize: 1000
77
pulsar:
88
URL: pulsar://pulsar:6650
9+
restURL: "http://localhost:8090"
910
jobsetEventsTopic: events
1011
backoffTime: 1s
1112
receiverQueueSize: 100
13+
delayMonitor:
14+
enabled: false
15+
interval: 30s
1216
subscriptionName: "events-ingester"
1317
minMessageCompressionSize: 1024
1418
maxOutputMessageSizeBytes: 1048576 #1MB

config/lookoutingester/config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,13 @@ postgres:
99
metricsPort: 9002
1010
pulsar:
1111
URL: "pulsar://pulsar:6650"
12+
restURL: "http://localhost:8090"
1213
jobsetEventsTopic: "events"
1314
backoffTime: 1s
1415
receiverQueueSize: 100
16+
delayMonitor:
17+
enabled: false
18+
interval: 30s
1519
subscriptionName: "lookout-ingester-v2"
1620
batchSize: 10000
1721
batchDuration: 500ms

config/scheduleringester/config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,14 @@ metrics:
1010
port: 9003
1111
pulsar:
1212
URL: "pulsar://localhost:6650"
13+
restURL: "http://localhost:8090"
1314
jobsetEventsTopic: "events"
1415
controlPlaneEventsTopic: "control-plane"
1516
backoffTime: 1s
1617
receiverQueueSize: 100
18+
delayMonitor:
19+
enabled: false
20+
interval: 30s
1721
subscriptionName: "scheduler-ingester"
1822
batchSize: 10000
1923
batchDuration: 500ms

docker-compose.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ services:
4141
entrypoint: sh -c "bin/apply-config-from-env.py conf/standalone.conf && bin/pulsar standalone"
4242
ports:
4343
- 0.0.0.0:6650:6650
44+
- 0.0.0.0:8090:8080
4445
networks:
4546
- kind
4647

internal/common/config/pulsar.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99
type PulsarConfig struct {
1010
// Pulsar URL
1111
URL string `validate:"required"`
12+
// Pulsar REST API URL (Pulsar admin API)
13+
// If not set, event latency metrics will not be published
14+
RestURL string
1215
// Path to the trusted TLS certificate file (must exist)
1316
TLSTrustCertsFilePath string
1417
// Whether Pulsar client accept untrusted TLS certificate from broker
@@ -23,6 +26,8 @@ type PulsarConfig struct {
2326
AuthenticationType string
2427
// Path to the JWT token (must exist). This must be set if AuthenticationType is "JWT"
2528
JwtTokenPath string
29+
// The config for topic processing delay monitor
30+
DelayMonitor TopicDelayMonitor
2631
// The pulsar topic that Jobset Events will be published to
2732
JobsetEventsTopic string
2833
// The pulsar topic that Control Plane Events will be published to
@@ -44,3 +49,11 @@ type PulsarConfig struct {
4449
// Number of pulsar messages that will be queued by the pulsar consumer.
4550
ReceiverQueueSize int
4651
}
52+
53+
type TopicDelayMonitor struct {
54+
// If the topic processing delay component should be enabled
55+
// When enabled we'll expose metrics that show the processing delay for each partition
56+
Enabled bool
57+
// How often the monitor will check the delay for a partition
58+
Interval time.Duration
59+
}

internal/common/ingest/ingestion_pipeline.go

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ingest
22

33
import (
44
"context"
5+
"fmt"
56
"sync"
67
"time"
78

@@ -117,12 +118,19 @@ func (i *IngestionPipeline[T, U]) Run(ctx *armadacontext.Context) error {
117118
wg.Add(1)
118119

119120
if i.consumer == nil {
120-
consumer, closePulsar, err := i.subscribe()
121+
client, consumer, closePulsar, err := i.subscribe()
121122
if err != nil {
122123
return err
123124
}
124125
i.consumer = consumer
125126
defer closePulsar()
127+
128+
if i.pulsarConfig.DelayMonitor.Enabled {
129+
err := i.startProcessingDelayMonitor(ctx, client)
130+
if err != nil {
131+
return errors.WithMessage(err, "error starting topic delay monitoring")
132+
}
133+
}
126134
}
127135
pulsarMessageChannel := i.consumer.Chan()
128136
pulsarMessages := make(chan pulsar.ConsumerMessage)
@@ -141,6 +149,7 @@ func (i *IngestionPipeline[T, U]) Run(ctx *armadacontext.Context) error {
141149
// Channel closed
142150
break loop
143151
}
152+
i.metrics.RecordPulsarMessagePublishTime(i.pulsarSubscriptionName, int(msg.ID().PartitionIdx()), msg.PublishTime())
144153
pulsarMessages <- msg
145154
lastReceivedTime = time.Now()
146155
case <-ticker.C:
@@ -243,11 +252,38 @@ func (i *IngestionPipeline[T, U]) Run(ctx *armadacontext.Context) error {
243252
return nil
244253
}
245254

246-
func (i *IngestionPipeline[T, U]) subscribe() (pulsar.Consumer, func(), error) {
255+
func (i *IngestionPipeline[T, U]) startProcessingDelayMonitor(ctx *armadacontext.Context, pulsarClient pulsar.Client) error {
256+
if i.pulsarConfig.RestURL == "" {
257+
return fmt.Errorf("cannot enable topic delay monitoring as pulsar RestURL not configured")
258+
}
259+
pulsarAdminClient, err := pulsarutils.NewPulsarAdminClient(&i.pulsarConfig)
260+
if err != nil {
261+
return errors.WithMessage(err, "error creating pulsar admin client")
262+
}
263+
264+
topicDelayMonitor := NewTopicProcessingDelayMonitor(pulsarClient, pulsarAdminClient, i.pulsarTopic, i.pulsarSubscriptionName, i.pulsarConfig.DelayMonitor.Interval, i.metrics)
265+
err = topicDelayMonitor.Initialise(ctx)
266+
if err != nil {
267+
return errors.WithMessage(err, "failed to initialise topic delay monitor")
268+
}
269+
go func() {
270+
log.Infof("starting topic delay monitor")
271+
err = topicDelayMonitor.Run(ctx)
272+
if err != nil {
273+
log.Errorf("topic delay monitor stopped with error %s", err)
274+
} else {
275+
log.Infof("topic delay monitor stopped")
276+
}
277+
}()
278+
279+
return nil
280+
}
281+
282+
func (i *IngestionPipeline[T, U]) subscribe() (pulsar.Client, pulsar.Consumer, func(), error) {
247283
// Subscribe to Pulsar and receive messages
248284
pulsarClient, err := pulsarutils.NewPulsarClient(&i.pulsarConfig)
249285
if err != nil {
250-
return nil, nil, errors.WithMessage(err, "Error creating pulsar client")
286+
return nil, nil, nil, errors.WithMessage(err, "error creating pulsar client")
251287
}
252288

253289
consumer, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{
@@ -258,10 +294,10 @@ func (i *IngestionPipeline[T, U]) subscribe() (pulsar.Consumer, func(), error) {
258294
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
259295
})
260296
if err != nil {
261-
return nil, nil, errors.WithMessage(err, "Error creating pulsar consumer")
297+
return nil, nil, nil, errors.WithMessage(err, "error creating pulsar consumer")
262298
}
263299

264-
return consumer, func() {
300+
return pulsarClient, consumer, func() {
265301
consumer.Close()
266302
pulsarClient.Close()
267303
}, nil

internal/common/ingest/metrics/metrics.go

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package metrics
22

33
import (
4+
"strconv"
5+
"time"
6+
47
"github.com/prometheus/client_golang/prometheus"
58
"github.com/prometheus/client_golang/prometheus/promauto"
69
)
@@ -30,11 +33,13 @@ const (
3033
)
3134

3235
type Metrics struct {
33-
dbErrorsCounter *prometheus.CounterVec
34-
pulsarConnectionError prometheus.Counter
35-
pulsarMessageError *prometheus.CounterVec
36-
pulsarMessagesProcessed prometheus.Counter
37-
eventsProcessed *prometheus.CounterVec
36+
dbErrorsCounter *prometheus.CounterVec
37+
pulsarConnectionError prometheus.Counter
38+
pulsarMessageError *prometheus.CounterVec
39+
pulsarMessagesProcessed prometheus.Counter
40+
pulsarMessagePublishTime *prometheus.GaugeVec
41+
pulsarMessageProcessingDelay *prometheus.GaugeVec
42+
eventsProcessed *prometheus.CounterVec
3843
}
3944

4045
func NewMetrics(prefix string) *Metrics {
@@ -54,17 +59,27 @@ func NewMetrics(prefix string) *Metrics {
5459
Name: prefix + "pulsar_messages_processed",
5560
Help: "Number of pulsar messages processed",
5661
}
62+
pulsarMessagePublishTime := prometheus.GaugeOpts{
63+
Name: prefix + "pulsar_message_publish_time",
64+
Help: "Publish time of pulsar message being processed",
65+
}
66+
pulsarMessageProcessingDelayOpts := prometheus.GaugeOpts{
67+
Name: prefix + "pulsar_message_processing_delay",
68+
Help: "Delay in ms of pulsar messages",
69+
}
5770
eventsProcessedOpts := prometheus.CounterOpts{
5871
Name: prefix + "events_processed",
5972
Help: "Number of events processed",
6073
}
6174

6275
return &Metrics{
63-
dbErrorsCounter: promauto.NewCounterVec(dbErrorsCounterOpts, []string{"operation"}),
64-
pulsarMessageError: promauto.NewCounterVec(pulsarMessageErrorOpts, []string{"error"}),
65-
pulsarConnectionError: promauto.NewCounter(pulsarConnectionErrorOpts),
66-
pulsarMessagesProcessed: promauto.NewCounter(pulsarMessagesProcessedOpts),
67-
eventsProcessed: promauto.NewCounterVec(eventsProcessedOpts, []string{"queue", "eventType", "msgType"}),
76+
dbErrorsCounter: promauto.NewCounterVec(dbErrorsCounterOpts, []string{"operation"}),
77+
pulsarMessageError: promauto.NewCounterVec(pulsarMessageErrorOpts, []string{"error"}),
78+
pulsarConnectionError: promauto.NewCounter(pulsarConnectionErrorOpts),
79+
pulsarMessageProcessingDelay: promauto.NewGaugeVec(pulsarMessageProcessingDelayOpts, []string{"subscription", "partition"}),
80+
pulsarMessagePublishTime: promauto.NewGaugeVec(pulsarMessagePublishTime, []string{"subscription", "partition"}),
81+
pulsarMessagesProcessed: promauto.NewCounter(pulsarMessagesProcessedOpts),
82+
eventsProcessed: promauto.NewCounterVec(eventsProcessedOpts, []string{"queue", "eventType", "msgType"}),
6883
}
6984
}
7085

@@ -84,6 +99,16 @@ func (m *Metrics) RecordPulsarMessageProcessed() {
8499
m.pulsarMessagesProcessed.Inc()
85100
}
86101

102+
func (m *Metrics) RecordPulsarMessagePublishTime(subscriptionName string, partition int, publishTime time.Time) {
103+
partitionStr := strconv.Itoa(partition)
104+
m.pulsarMessagePublishTime.WithLabelValues(subscriptionName, partitionStr).Set(float64(publishTime.UTC().Unix()))
105+
}
106+
107+
func (m *Metrics) RecordPulsarProcessingDelay(subscriptionName string, partition int, delayInMs float64) {
108+
partitionStr := strconv.Itoa(partition)
109+
m.pulsarMessageProcessingDelay.WithLabelValues(subscriptionName, partitionStr).Set(delayInMs)
110+
}
111+
87112
func (m *Metrics) RecordEventSequenceProcessed(queue string, msgType string) {
88113
m.eventsProcessed.With(map[string]string{"queue": queue, "eventType": JobSetEventsLabel, "msgType": msgType}).Inc()
89114
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// This file is largely a copy of https://github.com/apache/pulsar-client-go/blob/230d11b82ba8b60c971013516c4922afea4a022d/pulsaradmin/pkg/admin/admin.go#L1
2+
// With simplifications to only cover the functionality we need
3+
// This is to work around a bug in the client which is tracked here:
4+
// https://github.com/apache/pulsar-client-go/pull/1419
5+
// If pulsar-client-go fix the issue, we should move back to the standard pulsaradmin.Client
6+
7+
package pulsarclient
8+
9+
import (
10+
"net/http"
11+
"net/url"
12+
"path"
13+
"time"
14+
15+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/auth"
16+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config"
17+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
18+
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
19+
)
20+
21+
const (
22+
DefaultWebServiceURL = "http://localhost:8080"
23+
DefaultHTTPTimeOutDuration = 5 * time.Minute
24+
ReleaseVersion = "None"
25+
)
26+
27+
type PulsarClient struct {
28+
Client *rest.Client
29+
APIVersion config.APIVersion
30+
}
31+
32+
type Client interface {
33+
Subscriptions() Subscriptions
34+
}
35+
36+
// New returns a new client
37+
func New(config *config.Config) (*PulsarClient, error) {
38+
authProvider, err := auth.GetAuthProvider(config)
39+
if err != nil {
40+
return nil, err
41+
}
42+
return NewPulsarClientWithAuthProvider(config, authProvider)
43+
}
44+
45+
// NewPulsarClientWithAuthProvider create a client with auth provider.
46+
func NewPulsarClientWithAuthProvider(config *config.Config, authProvider auth.Provider) (*PulsarClient, error) {
47+
if len(config.WebServiceURL) == 0 {
48+
config.WebServiceURL = DefaultWebServiceURL
49+
}
50+
51+
return &PulsarClient{
52+
APIVersion: config.PulsarAPIVersion,
53+
Client: &rest.Client{
54+
ServiceURL: config.WebServiceURL,
55+
VersionInfo: ReleaseVersion,
56+
HTTPClient: &http.Client{
57+
Timeout: DefaultHTTPTimeOutDuration,
58+
Transport: authProvider,
59+
},
60+
},
61+
}, nil
62+
}
63+
64+
func (c *PulsarClient) endpoint(componentPath string, parts ...string) string {
65+
escapedParts := make([]string, len(parts))
66+
for i, part := range parts {
67+
escapedParts[i] = url.PathEscape(part)
68+
}
69+
return path.Join(
70+
utils.MakeHTTPPath(c.APIVersion.String(), componentPath),
71+
path.Join(escapedParts...),
72+
)
73+
}

0 commit comments

Comments
 (0)