Skip to content

Commit 311a8e3

Browse files
authored
Longw/retina networkflow logs configmap telemetry (#1398)
* add telemetry for networkFlowLogsFlushedRateMetric networkFlowLogsRateMetric networkFlowLogsSizeMetric
1 parent fc4886a commit 311a8e3

File tree

2 files changed

+41
-20
lines changed

2 files changed

+41
-20
lines changed

source/plugins/go/src/network_flow_logs.go

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
package main
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"net"
7+
"strconv"
68
"sync"
79
"time"
8-
"strconv"
9-
"encoding/json"
1010

1111
"github.com/fluent/fluent-bit-go/output"
12-
"github.com/tinylib/msgp/msgp"
1312
"github.com/google/uuid"
13+
"github.com/tinylib/msgp/msgp"
1414
)
1515

1616
// Stream name for retina networkflow logs
@@ -25,10 +25,10 @@ var (
2525

2626
var (
2727
// networkflow telemetry
28-
NetworkFlowTelemetryMutex = &sync.Mutex{}
29-
NetworkFlowLogsMDSDClientCreateErrors float64
30-
MdsdNetworkFlowClient net.Conn
31-
NetworkFlowTagRefreshTracker time.Time
28+
NetworkFlowTelemetryMutex = &sync.Mutex{}
29+
NetworkFlowLogsMDSDClientCreateErrors float64
30+
MdsdNetworkFlowClient net.Conn
31+
NetworkFlowTagRefreshTracker time.Time
3232
)
3333

3434
// NetworkFlowMsgPackEntry represents the object corresponding to a single messagepack event in the messagepack stream
@@ -40,9 +40,10 @@ type NetworkFlowMsgPackEntry struct {
4040
// PostNetworkFlowRecords sends data to the mdsd and amacoreagent
4141
func PostNetworkFlowRecords(tailPluginRecords []map[interface{}]interface{}) int {
4242
if IsNetworkFlowLogsEnabled {
43-
Log(fmt.Sprintf("Debug: PostNetworkFlowRecords starting"))
4443
start := time.Now()
4544
var elapsed time.Duration
45+
var bts int
46+
var er error
4647

4748
var dataMap map[string]interface{}
4849
var networkFlowLogsMsgPackEntries []NetworkFlowMsgPackEntry
@@ -93,7 +94,7 @@ func PostNetworkFlowRecords(tailPluginRecords []map[interface{}]interface{}) int
9394
}
9495
}
9596

96-
bts, er := writeNetworkFlowMsgPackEntries(MdsdNetworkFlowClient, MdsdNetworkFlowLogsStreamTagName, networkFlowLogsMsgPackEntries)
97+
bts, er = writeNetworkFlowMsgPackEntries(MdsdNetworkFlowClient, MdsdNetworkFlowLogsStreamTagName, networkFlowLogsMsgPackEntries)
9798
elapsed = time.Since(start)
9899

99100
if er != nil {
@@ -114,18 +115,14 @@ func PostNetworkFlowRecords(tailPluginRecords []map[interface{}]interface{}) int
114115
}
115116
}
116117

117-
//TODO Telemetry
118-
// NetworkFlowTelemetryMutex.Lock()
119-
// defer NetworkFlowTelemetryMutex.Unlock()
118+
NetworkFlowTelemetryMutex.Lock()
119+
defer NetworkFlowTelemetryMutex.Unlock()
120120

121-
// if numNetworkLogRecords > 0 {
122-
// FlushedRecordsCount += float64(numNetworkLogRecords)
123-
// FlushedRecordsTimeTaken += float64(elapsed / time.Millisecond)
124-
125-
// if maxLatency >= AgentLogProcessingMaxLatencyMs {
126-
// AgentLogProcessingMaxLatencyMs = maxLatency
127-
// }
128-
// }
121+
if numNetworkLogRecords > 0 {
122+
NetworkFlowLogsFlushedCount += float64(numNetworkLogRecords)
123+
NetworkFlowLogsFlushedSize += float64(bts)
124+
NetworkFlowLogsFlushedTimeTaken += float64(elapsed / time.Millisecond)
125+
}
129126
}
130127
return output.FLB_OK
131128
}

source/plugins/go/src/telemetry.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ var (
2525
FlushedMetadataSize float64
2626
// FlushedRecordsTimeTaken indicates the cumulative time taken to flush the records for the current period
2727
FlushedRecordsTimeTaken float64
28+
// NetworkFlowLogsFlushedCount indicates the number of flushed network flow logs in the current period
29+
NetworkFlowLogsFlushedCount float64
30+
// NetworkFlowLogsFlushedSize indicates the size of the flushed network flow logs in the current period
31+
NetworkFlowLogsFlushedSize float64
32+
// NetworkFlowLogsFlushedTimeTaken indicates the cumulative time taken to flush the network flow logs for the current period
33+
NetworkFlowLogsFlushedTimeTaken float64
2834
// This is telemetry for how old/latent logs we are processing in milliseconds (max over a period of time)
2935
AgentLogProcessingMaxLatencyMs float64
3036
// This is telemetry for which container logs were latent (max over a period of time)
@@ -105,6 +111,9 @@ const (
105111
metricNameAvgLogGenerationRate = "ContainerLogsGeneratedPerSec"
106112
metricNameLogSize = "ContainerLogsSize"
107113
metricNameMetadataSize = "ContainerLogsMetadataSize"
114+
metricNameNetworkFlowFlushRate = "NetworkFlowLogsAvgRecordsFlushedPerSec"
115+
metricNameNetworkFlowLogGenerationRate = "NetworkFlowLogsGeneratedPerSec"
116+
metricNameNetworkFlowLogSize = "NetworkFlowLogsSize"
108117
metricNameAgentLogProcessingMaxLatencyMs = "ContainerLogsAgentSideLatencyMs"
109118
metricNameNumberofTelegrafMetricsSentSuccessfully = "TelegrafMetricsSentCount"
110119
metricNameNumberofSendErrorsTelegrafMetrics = "TelegrafMetricsSendErrorCount"
@@ -151,6 +160,9 @@ func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) {
151160
logRate := FlushedRecordsCount / float64(elapsed/time.Second)
152161
logSizeRate := FlushedRecordsSize / float64(elapsed/time.Second)
153162
metadataSizeRate := FlushedMetadataSize / float64(elapsed/time.Second)
163+
networkFlowLogsFlushedRate := NetworkFlowLogsFlushedCount / NetworkFlowLogsFlushedTimeTaken * 1000
164+
networkFlowLogsRate := NetworkFlowLogsFlushedCount / float64(elapsed/time.Second)
165+
networkFlowLogsSizeRate := NetworkFlowLogsFlushedSize / float64(elapsed/time.Second)
154166
telegrafMetricsSentCount := TelegrafMetricsSentCount
155167
telegrafMetricsSendErrorCount := TelegrafMetricsSendErrorCount
156168
telegrafMetricsSend429ErrorCount := TelegrafMetricsSend429ErrorCount
@@ -184,6 +196,9 @@ func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) {
184196
FlushedRecordsSize = 0.0
185197
FlushedMetadataSize = 0.0
186198
FlushedRecordsTimeTaken = 0.0
199+
NetworkFlowLogsFlushedCount = 0.0
200+
NetworkFlowLogsFlushedSize = 0.0
201+
NetworkFlowLogsFlushedTimeTaken = 0.0
187202
logLatencyMs := AgentLogProcessingMaxLatencyMs
188203
logLatencyMsContainer := AgentLogProcessingMaxLatencyMsContainer
189204
AgentLogProcessingMaxLatencyMs = 0
@@ -315,6 +330,15 @@ func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) {
315330
logLatencyMetric := appinsights.NewMetricTelemetry(metricNameAgentLogProcessingMaxLatencyMs, logLatencyMs)
316331
logLatencyMetric.Properties["Container"] = logLatencyMsContainer
317332
TelemetryClient.Track(logLatencyMetric)
333+
334+
if IsNetworkFlowLogsEnabled {
335+
networkFlowLogsFlushedRateMetric := appinsights.NewMetricTelemetry(metricNameNetworkFlowFlushRate, networkFlowLogsFlushedRate)
336+
networkFlowLogsRateMetric := appinsights.NewMetricTelemetry(metricNameNetworkFlowLogGenerationRate, networkFlowLogsRate)
337+
networkFlowLogsSizeMetric := appinsights.NewMetricTelemetry(metricNameNetworkFlowLogSize, networkFlowLogsSizeRate)
338+
TelemetryClient.Track(networkFlowLogsFlushedRateMetric)
339+
TelemetryClient.Track(networkFlowLogsRateMetric)
340+
TelemetryClient.Track(networkFlowLogsSizeMetric)
341+
}
318342
}
319343
}
320344
telegrafConfig := make(map[string]string)

0 commit comments

Comments
 (0)