Skip to content

Commit ebef830

Browse files
committed
feat: Enhance DDS Consumer with shutdown coordination and error handling
1 parent 2e29dfc commit ebef830

File tree

1 file changed

+81
-17
lines changed

1 file changed

+81
-17
lines changed

plugins/inputs/dds_consumer/dds_consumer.go

Lines changed: 81 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
package dds_consumer
1414

1515
import (
16+
"context"
1617
"log"
18+
"sync"
1719
"time"
1820

1921
"github.com/influxdata/telegraf"
@@ -41,6 +43,11 @@ type DDSConsumer struct {
4143
// Telegraf entities
4244
parser *json.Parser
4345
acc telegraf.Accumulator
46+
47+
// Shutdown coordination
48+
ctx context.Context
49+
cancel context.CancelFunc
50+
wg sync.WaitGroup
4451
}
4552

4653
// Default configurations
@@ -85,39 +92,82 @@ func (d *DDSConsumer) Start(acc telegraf.Accumulator) error {
8592
// Keep the Telegraf accumulator internally
8693
d.acc = acc
8794

95+
// Initialize shutdown coordination
96+
d.ctx, d.cancel = context.WithCancel(context.Background())
97+
8898
var err error
8999

90100
// Create a Connector entity
91101
d.connector, err = rti.NewConnector(d.ParticipantConfig, d.ConfigFilePath)
92-
checkFatalError(err)
102+
if err != nil {
103+
return err
104+
}
93105

94106
// Get a DDS reader
95107
d.reader, err = d.connector.GetInput(d.ReaderConfig)
96-
checkFatalError(err)
108+
if err != nil {
109+
d.connector.Delete()
110+
return err
111+
}
97112

98113
// Initialize JSON parser
99114
d.parser = &json.Parser{
100115
MetricName: "dds",
101116
TagKeys: d.TagKeys,
102117
}
103118
err = d.parser.Init()
104-
checkFatalError(err)
119+
if err != nil {
120+
d.connector.Delete()
121+
return err
122+
}
105123

106124
// Start a thread for ingesting DDS
125+
d.wg.Add(1)
107126
go d.process()
108127

109128
return nil
110129
}
111130

112131
func (d *DDSConsumer) Stop() {
113-
d.connector.Delete()
132+
// Signal the process goroutine to stop
133+
if d.cancel != nil {
134+
d.cancel()
135+
}
136+
137+
// Wait for the process goroutine to finish
138+
d.wg.Wait()
139+
140+
// Now safely delete the connector
141+
if d.connector != nil {
142+
d.connector.Delete()
143+
d.connector = nil
144+
}
114145
}
115146

116147
// Take DDS samples from the DataReader and ingest them to Telegraf outputs
117148
func (d *DDSConsumer) process() {
149+
defer d.wg.Done()
150+
118151
for {
119-
d.connector.Wait(-1)
120-
d.reader.Take()
152+
select {
153+
case <-d.ctx.Done():
154+
// Shutdown signal received
155+
log.Println("DDS Consumer: Stopping processing loop")
156+
return
157+
default:
158+
// Continue processing
159+
}
160+
161+
// Use a timeout for Wait to avoid blocking indefinitely
162+
waitTimeout := 1000 // 1 second timeout in milliseconds
163+
d.connector.Wait(waitTimeout)
164+
165+
err := d.reader.Take()
166+
if err != nil {
167+
checkError(err)
168+
continue
169+
}
170+
121171
numOfSamples, err := d.reader.Samples.GetLength()
122172
checkError(err)
123173
if err != nil {
@@ -133,24 +183,38 @@ func (d *DDSConsumer) process() {
133183
if valid {
134184
json, err := d.reader.Samples.GetJSON(i)
135185
checkError(err)
186+
if err != nil {
187+
continue
188+
}
136189
ts, err := d.reader.Infos.GetSourceTimestamp(i)
137190
checkError(err)
138-
go func(jsonStr string) {
139-
// Parse the JSON object to metrics
140-
metrics, err := d.parser.Parse([]byte(jsonStr))
141-
checkError(err)
142-
143-
// Iterate the metrics
144-
for _, metric := range metrics {
145-
// Add a metric to an output plugin
146-
d.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), time.Unix(0, ts))
147-
}
148-
}(json)
191+
if err != nil {
192+
continue
193+
}
194+
195+
// Process synchronously to avoid goroutine leaks on shutdown
196+
d.processMessage(json, ts)
149197
}
150198
}
151199
}
152200
}
153201

202+
// Helper function to process individual messages
203+
func (d *DDSConsumer) processMessage(jsonStr string, ts int64) {
204+
// Parse the JSON object to metrics
205+
metrics, err := d.parser.Parse([]byte(jsonStr))
206+
checkError(err)
207+
if err != nil {
208+
return
209+
}
210+
211+
// Iterate the metrics
212+
for _, metric := range metrics {
213+
// Add a metric to an output plugin
214+
d.acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), time.Unix(0, ts))
215+
}
216+
}
217+
154218
func (d *DDSConsumer) Gather(acc telegraf.Accumulator) error {
155219
return nil
156220
}

0 commit comments

Comments
 (0)