Skip to content

Commit 35c5b41

Browse files
committed
chore(outputs.mongodb): Cleanup plugin
1 parent 191e575 commit 35c5b41

File tree

2 files changed

+99
-98
lines changed

2 files changed

+99
-98
lines changed

plugins/outputs/mongodb/mongodb.go

Lines changed: 90 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -25,32 +25,6 @@ import (
2525
//go:embed sample.conf
2626
var sampleConfig string
2727

28-
func (s *MongoDB) getCollections(ctx context.Context) error {
29-
collections, err := s.client.Database(s.MetricDatabase).ListCollections(ctx, bson.M{})
30-
if err != nil {
31-
return fmt.Errorf("unable to execute ListCollections: %w", err)
32-
}
33-
s.collections = make(map[string]bson.M, collections.RemainingBatchLength())
34-
for collections.Next(ctx) {
35-
var collection bson.M
36-
if err = collections.Decode(&collection); err != nil {
37-
return fmt.Errorf("unable to decode ListCollections: %w", err)
38-
}
39-
name, ok := collection["name"].(string)
40-
if !ok {
41-
return fmt.Errorf("non-string name in %v", collection)
42-
}
43-
s.collections[name] = collection
44-
}
45-
return nil
46-
}
47-
48-
func (s *MongoDB) insertDocument(ctx context.Context, databaseCollection string, bdoc bson.D) error {
49-
collection := s.client.Database(s.MetricDatabase).Collection(databaseCollection)
50-
_, err := collection.InsertOne(ctx, &bdoc)
51-
return err
52-
}
53-
5428
type MongoDB struct {
5529
Dsn string `toml:"dsn"`
5630
AuthenticationType string `toml:"authentication"`
@@ -61,10 +35,11 @@ type MongoDB struct {
6135
ServerSelectTimeout config.Duration `toml:"timeout"`
6236
TTL config.Duration `toml:"ttl"`
6337
Log telegraf.Logger `toml:"-"`
64-
client *mongo.Client
65-
clientOptions *options.ClientOptions
66-
collections map[string]bson.M
6738
tls.ClientConfig
39+
40+
client *mongo.Client
41+
options *options.ClientOptions
42+
collections map[string]bool
6843
}
6944

7045
func (*MongoDB) SampleConfig() string {
@@ -92,7 +67,7 @@ func (s *MongoDB) Init() error {
9267
}
9368

9469
serverAPIOptions := options.ServerAPI(options.ServerAPIVersion1) // use new mongodb versioned api
95-
s.clientOptions = options.Client().SetServerAPIOptions(serverAPIOptions)
70+
s.options = options.Client().SetServerAPIOptions(serverAPIOptions)
9671

9772
switch s.AuthenticationType {
9873
case "", "NONE":
@@ -120,7 +95,7 @@ func (s *MongoDB) Init() error {
12095
}
12196
username.Destroy()
12297
password.Destroy()
123-
s.clientOptions.SetAuth(credential)
98+
s.options.SetAuth(credential)
12499
case "PLAIN":
125100
if s.Username.Empty() {
126101
return errors.New("authentication for PLAIN must specify a username")
@@ -148,7 +123,7 @@ func (s *MongoDB) Init() error {
148123
Username: username,
149124
Password: password,
150125
}
151-
s.clientOptions.SetAuth(credential)
126+
s.options.SetAuth(credential)
152127

153128
// Check if TLS is enabled (via mongodb+srv:// or tls/ssl query params) and warn if not
154129
parsedDSN, err := url.Parse(s.Dsn)
@@ -191,50 +166,73 @@ func (s *MongoDB) Init() error {
191166
AuthSource: "$external",
192167
AuthMechanism: "MONGODB-X509",
193168
}
194-
s.clientOptions.SetAuth(credential)
169+
s.options.SetAuth(credential)
195170
default:
196171
return fmt.Errorf("unsupported authentication type %q", s.AuthenticationType)
197172
}
198173

199174
if s.ServerSelectTimeout != 0 {
200-
s.clientOptions.SetServerSelectionTimeout(time.Duration(s.ServerSelectTimeout))
175+
s.options.SetServerSelectionTimeout(time.Duration(s.ServerSelectTimeout))
201176
}
202177

203-
s.clientOptions.ApplyURI(s.Dsn)
178+
s.options.ApplyURI(s.Dsn)
204179
return nil
205180
}
206181

207-
func (s *MongoDB) createTimeSeriesCollection(databaseCollection string) error {
208-
_, collectionExists := s.collections[databaseCollection]
209-
if !collectionExists {
210-
ctx := context.Background()
211-
tso := options.TimeSeries()
212-
tso.SetTimeField("timestamp")
213-
tso.SetMetaField("tags")
214-
tso.SetGranularity(s.MetricGranularity)
215-
cco := options.CreateCollection()
216-
if s.TTL != 0 {
217-
cco.SetExpireAfterSeconds(int64(time.Duration(s.TTL).Seconds()))
182+
func (s *MongoDB) Connect() error {
183+
// Connect to the database
184+
ctx := context.Background()
185+
client, err := mongo.Connect(ctx, s.options)
186+
if err != nil {
187+
return fmt.Errorf("connecting to server failed: %w", err)
188+
}
189+
s.client = client
190+
191+
// Cache the existing collections to prevent recreating those during write
192+
collections, err := s.client.Database(s.MetricDatabase).ListCollections(ctx, bson.M{})
193+
if err != nil {
194+
return fmt.Errorf("listing collections failed: %w", err)
195+
}
196+
197+
s.collections = make(map[string]bool, collections.RemainingBatchLength())
198+
for collections.Next(ctx) {
199+
var collection bson.M
200+
if err = collections.Decode(&collection); err != nil {
201+
return fmt.Errorf("decoding collections failed: %w", err)
218202
}
219-
cco.SetTimeSeriesOptions(tso)
220-
err := s.client.Database(s.MetricDatabase).CreateCollection(ctx, databaseCollection, cco)
221-
if err != nil {
222-
return fmt.Errorf("unable to create time series collection: %w", err)
203+
204+
raw, found := collection["name"]
205+
if !found {
206+
return fmt.Errorf("name does not exist in collection %+v", collection)
207+
}
208+
name, ok := raw.(string)
209+
if !ok {
210+
return fmt.Errorf("non-string name %v (%T) in collection", raw, raw)
223211
}
224-
s.collections[databaseCollection] = bson.M{}
212+
s.collections[name] = true
225213
}
214+
226215
return nil
227216
}
228217

229-
func (s *MongoDB) Connect() error {
218+
func (s *MongoDB) Write(metrics []telegraf.Metric) error {
230219
ctx := context.Background()
231-
client, err := mongo.Connect(ctx, s.clientOptions)
232-
if err != nil {
233-
return fmt.Errorf("unable to connect: %w", err)
234-
}
235-
s.client = client
236-
if err = s.getCollections(ctx); err != nil {
237-
return fmt.Errorf("unable to get collections from specified metric database: %w", err)
220+
221+
// Write one metric at a time
222+
for _, metric := range metrics {
223+
name := metric.Name()
224+
// Create a new collection if it doesn't exist
225+
if !s.collections[name] {
226+
if err := s.createCollection(ctx, name); err != nil {
227+
return fmt.Errorf("creating time series collection %q failed: %w", name, err)
228+
}
229+
}
230+
doc := marshal(metric)
231+
232+
collection := s.client.Database(s.MetricDatabase).Collection(metric.Name())
233+
if _, err := collection.InsertOne(ctx, &doc); err != nil {
234+
return fmt.Errorf("getting collection %q failed: %w", metric.Name(), err)
235+
}
238236
}
239237
return nil
240238
}
@@ -244,37 +242,45 @@ func (s *MongoDB) Close() error {
244242
return s.client.Disconnect(ctx)
245243
}
246244

247-
// all metric/measurement fields are parent level of document
248-
// metadata field is named "tags"
249-
// mongodb stores timestamp as UTC. conversion should be performed during reads in app or in aggregation pipeline
250-
func marshalMetric(metric telegraf.Metric) bson.D {
251-
var bdoc bson.D
252-
for k, v := range metric.Fields() {
253-
bdoc = append(bdoc, primitive.E{Key: k, Value: v})
245+
func (s *MongoDB) createCollection(ctx context.Context, name string) error {
246+
// Setup a new timeseries collection for the given metric name
247+
series := options.TimeSeries()
248+
series.SetTimeField("timestamp")
249+
series.SetMetaField("tags")
250+
series.SetGranularity(s.MetricGranularity)
251+
252+
collection := options.CreateCollection()
253+
if s.TTL != 0 {
254+
collection.SetExpireAfterSeconds(int64(time.Duration(s.TTL).Seconds()))
255+
}
256+
collection.SetTimeSeriesOptions(series)
257+
258+
// Create the new collection
259+
if err := s.client.Database(s.MetricDatabase).CreateCollection(ctx, name, collection); err != nil {
260+
return err
261+
}
262+
s.collections[name] = true
263+
264+
return nil
265+
}
266+
267+
// Convert a metric into a MongoDB document with all fields being parent level
268+
// of document and the metadata field is named "tags. Mongodb stores timestamp
269+
// as UTC so conversion should be performed on the query or aggregation side.
270+
func marshal(metric telegraf.Metric) bson.D {
271+
var doc bson.D
272+
for _, f := range metric.FieldList() {
273+
doc = append(doc, primitive.E{Key: f.Key, Value: f.Value})
254274
}
255275
var tags bson.D
256-
for k, v := range metric.Tags() {
257-
tags = append(tags, primitive.E{Key: k, Value: v})
276+
for _, t := range metric.TagList() {
277+
tags = append(tags, primitive.E{Key: t.Key, Value: t.Value})
258278
}
259-
bdoc = append(bdoc,
279+
doc = append(doc,
260280
primitive.E{Key: "tags", Value: tags},
261281
primitive.E{Key: "timestamp", Value: metric.Time()},
262282
)
263-
return bdoc
264-
}
265-
266-
func (s *MongoDB) Write(metrics []telegraf.Metric) error {
267-
ctx := context.Background()
268-
for _, metric := range metrics {
269-
if err := s.createTimeSeriesCollection(metric.Name()); err != nil {
270-
return err
271-
}
272-
bdoc := marshalMetric(metric)
273-
if err := s.insertDocument(ctx, metric.Name(), bdoc); err != nil {
274-
return err
275-
}
276-
}
277-
return nil
283+
return doc
278284
}
279285

280286
func init() {

plugins/outputs/mongodb/mongodb_test.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -318,23 +318,18 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) {
318318
for _, tt := range tests {
319319
t.Run(tt.name, func(t *testing.T) {
320320
// validate config
321-
err := tt.plugin.Init()
322-
require.NoError(t, err)
321+
require.NoError(t, tt.plugin.Init())
323322

324-
if err == nil {
325-
// connect
326-
err = tt.plugin.Connect()
327-
tt.connErrFunc(t, err)
323+
// connect
324+
err = tt.plugin.Connect()
325+
tt.connErrFunc(t, err)
328326

329-
if err == nil {
330-
// insert mock metrics
331-
err = tt.plugin.Write(testutil.MockMetrics())
332-
require.NoError(t, err)
327+
if err == nil {
328+
// insert mock metrics
329+
require.NoError(t, tt.plugin.Write(testutil.MockMetrics()))
333330

334-
// cleanup
335-
err = tt.plugin.Close()
336-
require.NoError(t, err)
337-
}
331+
// cleanup
332+
require.NoError(t, tt.plugin.Close())
338333
}
339334
})
340335
}

0 commit comments

Comments
 (0)