@@ -86,14 +86,15 @@ type BulkIndexerConfig struct {
8686
8787// BulkIndexerStats represents the indexer statistics.
8888type BulkIndexerStats struct {
89- NumAdded uint64
90- NumFlushed uint64
91- NumFailed uint64
92- NumIndexed uint64
93- NumCreated uint64
94- NumUpdated uint64
95- NumDeleted uint64
96- NumRequests uint64
89+ NumAdded uint64
90+ NumFlushed uint64
91+ NumFailed uint64
92+ NumIndexed uint64
93+ NumCreated uint64
94+ NumUpdated uint64
95+ NumDeleted uint64
96+ NumRequests uint64
97+ FlushedBytes uint64
9798}
9899
99100// BulkIndexerItem represents an indexer item.
@@ -266,14 +267,15 @@ type bulkIndexer struct {
266267}
267268
268269type bulkIndexerStats struct {
269- numAdded uint64
270- numFlushed uint64
271- numFailed uint64
272- numIndexed uint64
273- numCreated uint64
274- numUpdated uint64
275- numDeleted uint64
276- numRequests uint64
270+ numAdded uint64
271+ numFlushed uint64
272+ numFailed uint64
273+ numIndexed uint64
274+ numCreated uint64
275+ numUpdated uint64
276+ numDeleted uint64
277+ numRequests uint64
278+ flushedBytes uint64
277279}
278280
279281// NewBulkIndexer creates a new bulk indexer.
@@ -354,14 +356,15 @@ func (bi *bulkIndexer) Close(ctx context.Context) error {
354356// Stats returns indexer statistics.
355357func (bi * bulkIndexer ) Stats () BulkIndexerStats {
356358 return BulkIndexerStats {
357- NumAdded : atomic .LoadUint64 (& bi .stats .numAdded ),
358- NumFlushed : atomic .LoadUint64 (& bi .stats .numFlushed ),
359- NumFailed : atomic .LoadUint64 (& bi .stats .numFailed ),
360- NumIndexed : atomic .LoadUint64 (& bi .stats .numIndexed ),
361- NumCreated : atomic .LoadUint64 (& bi .stats .numCreated ),
362- NumUpdated : atomic .LoadUint64 (& bi .stats .numUpdated ),
363- NumDeleted : atomic .LoadUint64 (& bi .stats .numDeleted ),
364- NumRequests : atomic .LoadUint64 (& bi .stats .numRequests ),
359+ NumAdded : atomic .LoadUint64 (& bi .stats .numAdded ),
360+ NumFlushed : atomic .LoadUint64 (& bi .stats .numFlushed ),
361+ NumFailed : atomic .LoadUint64 (& bi .stats .numFailed ),
362+ NumIndexed : atomic .LoadUint64 (& bi .stats .numIndexed ),
363+ NumCreated : atomic .LoadUint64 (& bi .stats .numCreated ),
364+ NumUpdated : atomic .LoadUint64 (& bi .stats .numUpdated ),
365+ NumDeleted : atomic .LoadUint64 (& bi .stats .numDeleted ),
366+ NumRequests : atomic .LoadUint64 (& bi .stats .numRequests ),
367+ FlushedBytes : atomic .LoadUint64 (& bi .stats .flushedBytes ),
365368 }
366369}
367370
@@ -508,7 +511,9 @@ func (w *worker) flushBuffer(ctx context.Context) error {
508511 defer func () { w .bi .config .OnFlushEnd (ctx ) }()
509512 }
510513
511- if w .buf .Len () < 1 {
514+ bufLen := w .buf .Len ()
515+
516+ if bufLen < 1 {
512517 if w .bi .config .DebugLogger != nil {
513518 w .bi .config .DebugLogger .Printf ("[worker-%03d] Flush: Buffer empty\n " , w .id )
514519 }
@@ -631,6 +636,8 @@ func (w *worker) flushBuffer(ctx context.Context) error {
631636 }
632637 }
633638
639+ atomic .AddUint64 (& w .bi .stats .flushedBytes , uint64 (bufLen ))
640+
634641 return err
635642}
636643
0 commit comments