@@ -46,7 +46,7 @@ func (segment Mongodb) New(configx map[string]string) segments.Segment {
4646
4747 newsegment , err := fillSegmentWithConfig (newsegment , configx )
4848 if err != nil {
49- log .Error ().Err (err ).Msg ("Failed loading mongodb segment config" )
49+ log .Error ().Err (err ).Msg ("MongoDB: Failed loading mongodb segment config" )
5050 return nil
5151 }
5252
@@ -59,7 +59,7 @@ func (segment Mongodb) New(configx map[string]string) segments.Segment {
5959 err = client .Ping (ctx , options .Client ().ReadPreference )
6060 }
6161 if err != nil {
62- log .Error ().Err (err ).Msgf ("mongoDB : Could not open DB connection" )
62+ log .Error ().Err (err ).Msgf ("MongoDB : Could not open DB connection" )
6363 return nil
6464 }
6565 db := client .Database (newsegment .databaseName )
@@ -84,55 +84,48 @@ func (segment *Mongodb) Run(wg *sync.WaitGroup) {
8484 segment .dbCollection = db .Collection (segment .collectionName )
8585
8686 defer client .Disconnect (ctx )
87-
88- var unsaved []* pb.EnrichedFlow
89-
87+ unsavedJson := make (chan []interface {})
88+ messagesToSave := make (chan * pb.EnrichedFlow )
89+ go segment .bulkInsert (ctx , unsavedJson )
90+ go segment .prepareDataForBulkInsert (messagesToSave , unsavedJson )
9091 for msg := range segment .In {
91- unsaved = append (unsaved , msg )
92- if len (unsaved ) >= segment .BatchSize {
93- err := segment .bulkInsert (unsaved , ctx )
94- if err != nil {
95- log .Error ().Err (err ).Msg (" " )
96- }
97- unsaved = []* pb.EnrichedFlow {}
98- }
92+ messagesToSave <- msg
9993 segment .Out <- msg
10094 }
101- segment .bulkInsert (unsaved , ctx )
10295}
10396
10497func fillSegmentWithConfig (newsegment * Mongodb , config map [string ]string ) (* Mongodb , error ) {
10598 if config == nil {
106- return newsegment , errors .New ("missing configuration for segment mongodb" )
99+ return newsegment , errors .New ("MongoDB: missing configuration for segment mongodb" )
107100 }
108101
109102 if config ["mongodb_uri" ] == "" {
110- return newsegment , errors .New ("mongoDB : mongodb_uri not defined" )
103+ return newsegment , errors .New ("MongoDB : mongodb_uri not defined" )
111104 }
112105 newsegment .mongodbUri = config ["mongodb_uri" ]
113106
114107 if config ["database" ] == "" {
115- log .Info ().Msg ("mongoDB : no database defined - using default value (flowdata)" )
108+ log .Info ().Msg ("MongoDB : no database defined - using default value (flowdata)" )
116109 config ["database" ] = "flowdata"
117110 }
118111 newsegment .databaseName = config ["database" ]
119112
120113 if config ["collection" ] == "" {
121- log .Info ().Msg ("mongoDB : no collection defined - using default value (ringbuffer)" )
114+ log .Info ().Msg ("MongoDB : no collection defined - using default value (ringbuffer)" )
122115 config ["collection" ] = "ringbuffer"
123116 }
124117 newsegment .collectionName = config ["collection" ]
125118
126119 var ringbufferSize int64 = 10737418240
127120 if config ["max_disk_usage" ] == "" {
128- log .Info ().Msg ("mongoDB : no ring buffer size defined - using default value (10GB)" )
121+ log .Info ().Msg ("MongoDB : no ring buffer size defined - using default value (10GB)" )
129122 } else {
130123 size , err := sizeInBytes (config ["max_disk_usage" ])
131124 if err == nil {
132- log .Info ().Msg ("mongoDB : setting ring buffer size to " + config ["max_disk_usage" ])
125+ log .Info ().Msg ("MongoDB : setting ring buffer size to " + config ["max_disk_usage" ])
133126 ringbufferSize = size
134127 } else {
135- log .Warn ().Msg ("mongoDB : failed setting ring buffer size to " + config ["max_disk_usage" ] + " - using default as fallback (10GB)" )
128+ log .Warn ().Msg ("MongoDB : failed setting ring buffer size to " + config ["max_disk_usage" ] + " - using default as fallback (10GB)" )
136129 }
137130 }
138131 newsegment .ringbufferSize = ringbufferSize
@@ -141,18 +134,18 @@ func fillSegmentWithConfig(newsegment *Mongodb, config map[string]string) (*Mong
141134 if config ["batchsize" ] != "" {
142135 if parsedBatchSize , err := strconv .ParseInt (config ["batchsize" ], 10 , 32 ); err == nil {
143136 if parsedBatchSize <= 0 {
144- return newsegment , errors .New ("MongoDO : Batch size <= 0 is not allowed. Set this in relation to the expected flows per second" )
137+ return newsegment , errors .New ("MongoDB : Batch size <= 0 is not allowed. Set this in relation to the expected flows per second" )
145138 }
146139 if parsedBatchSize <= 0 {
147- log .Warn ().Msgf ("MongoDO : Batch size over max size - setting to %d" , math .MaxInt )
140+ log .Warn ().Msgf ("MongoDB : Batch size over max size - setting to %d" , math .MaxInt )
148141 parsedBatchSize = math .MaxInt
149142 }
150143 newsegment .BatchSize = int (parsedBatchSize )
151144 } else {
152- log .Error ().Msgf ("MongoDO : Could not parse 'batchsize' parameter %s, using default 1000." , config ["batchsize" ])
145+ log .Error ().Msgf ("MongoDB : Could not parse 'batchsize' parameter %s, using default 1000." , config ["batchsize" ])
153146 }
154147 } else {
155- log .Info ().Msg ("MongoDO : 'batchsize' set to default '1000'." )
148+ log .Info ().Msg ("MongoDB : 'batchsize' set to default '1000'." )
156149 }
157150
158151 // determine field set
@@ -162,7 +155,7 @@ func fillSegmentWithConfig(newsegment *Mongodb, config map[string]string) (*Mong
162155 for _ , field := range conffields {
163156 protofield , found := protofields .FieldByName (field )
164157 if ! found || ! protofield .IsExported () {
165- return newsegment , errors .New ("csv : Field specified in 'fields' does not exist" )
158+ return newsegment , errors .New ("MongoDB : Field specified in 'fields' does not exist" )
166159 }
167160 newsegment .fieldNames = append (newsegment .fieldNames , field )
168161 newsegment .fieldTypes = append (newsegment .fieldTypes , protofield .Type .String ())
@@ -182,40 +175,52 @@ func fillSegmentWithConfig(newsegment *Mongodb, config map[string]string) (*Mong
182175 return newsegment , nil
183176}
184177
185- func (segment Mongodb ) bulkInsert (unsavedFlows []* pb.EnrichedFlow , ctx context.Context ) error {
178+ func (segment Mongodb ) prepareDataForBulkInsert (msgChan chan * pb.EnrichedFlow , unsavedJsonFlows chan []interface {}) {
179+ unsavedFlowData := make ([]interface {}, segment .BatchSize )
180+ nrOfFlows := 0
181+ for msg := range msgChan {
182+ flowData := formatFlowToMongoDbJson (msg , segment )
183+ unsavedFlowData [nrOfFlows ] = flowData
184+ nrOfFlows += 1
185+ if nrOfFlows >= segment .BatchSize {
186+ unsavedJsonFlows <- unsavedFlowData
187+ nrOfFlows = 0
188+ }
189+ }
190+
191+ }
192+
193+ func (segment Mongodb ) bulkInsert (ctx context.Context , unsavedJsonFlows chan []interface {}) {
186194 // not using transactions due to limitations of capped collectiction
187195 // ("You cannot write to capped collections in transactions."
188196 // https://www.mongodb.com/docs/manual/core/capped-collections/)
189- if len (unsavedFlows ) == 0 {
190- return nil
197+ for unsavedFlows := range unsavedJsonFlows {
198+ _ , err := segment .dbCollection .InsertMany (ctx , unsavedFlows )
199+ if err != nil {
200+ log .Error ().Err (err ).Msg ("MongoDB: Failed to insert to mongo db" )
201+ }
191202 }
192- unsavedFlowData := bson.A {}
193- for _ , msg := range unsavedFlows {
194- singleFlowData := bson.M {}
195- values := reflect .ValueOf (msg ).Elem ()
196- for i , fieldname := range segment .fieldNames {
197- protofield := values .FieldByName (fieldname )
198- switch segment .fieldTypes [i ] {
199- case "[]uint8" : // this is neccessary for proper formatting
200- ipstring := net .IP (protofield .Interface ().([]uint8 )).String ()
201- if ipstring == "<nil>" {
202- ipstring = ""
203- }
204- singleFlowData [fieldname ] = ipstring
205- case "string" : // this is because doing nothing is also much faster than Sprint
206- singleFlowData [fieldname ] = protofield .Interface ().(string )
207- default :
208- singleFlowData [fieldname ] = fmt .Sprint (protofield )
203+ }
204+
205+ func formatFlowToMongoDbJson (msg * pb.EnrichedFlow , segment Mongodb ) bson.M {
206+ singleFlowData := bson.M {}
207+ values := reflect .ValueOf (msg ).Elem ()
208+ for i , fieldname := range segment .fieldNames {
209+ protofield := values .FieldByName (fieldname )
210+ switch segment .fieldTypes [i ] {
211+ case "[]uint8" : // this is neccessary for proper formatting
212+ ipstring := net .IP (protofield .Interface ().([]uint8 )).String ()
213+ if ipstring == "<nil>" {
214+ ipstring = ""
209215 }
216+ singleFlowData [fieldname ] = ipstring
217+ case "string" : // this is because doing nothing is also much faster than Sprint
218+ singleFlowData [fieldname ] = protofield .Interface ().(string )
219+ default :
220+ singleFlowData [fieldname ] = fmt .Sprint (protofield )
210221 }
211- unsavedFlowData = append (unsavedFlowData , singleFlowData )
212- }
213- _ , err := segment .dbCollection .InsertMany (ctx , unsavedFlowData )
214- if err != nil {
215- log .Error ().Err (err ).Msg ("mongoDB: Failed to insert to mongo db" )
216- return err
217222 }
218- return nil
223+ return singleFlowData
219224}
220225
221226func init () {
@@ -227,7 +232,7 @@ func sizeInBytes(sizeStr string) (int64, error) {
227232 // Split into number and unit
228233 parts := strings .Fields (sizeStr )
229234 if len (parts ) > 2 || len (parts ) < 1 {
230- return 0 , fmt .Errorf ("[error] invalid size format" )
235+ return 0 , fmt .Errorf ("MongoDB: invalid size format" )
231236 }
232237
233238 size , err := strconv .ParseInt (parts [0 ], 10 , 64 )
0 commit comments