@@ -86,14 +86,15 @@ type BulkIndexerConfig struct {
8686
8787// BulkIndexerStats represents the indexer statistics. 
8888type  BulkIndexerStats  struct  {
89- 	NumAdded     uint64 
90- 	NumFlushed   uint64 
91- 	NumFailed    uint64 
92- 	NumIndexed   uint64 
93- 	NumCreated   uint64 
94- 	NumUpdated   uint64 
95- 	NumDeleted   uint64 
96- 	NumRequests  uint64 
89+ 	NumAdded      uint64 
90+ 	NumFlushed    uint64 
91+ 	NumFailed     uint64 
92+ 	NumIndexed    uint64 
93+ 	NumCreated    uint64 
94+ 	NumUpdated    uint64 
95+ 	NumDeleted    uint64 
96+ 	NumRequests   uint64 
97+ 	FlushedBytes  uint64 
9798}
9899
99100// BulkIndexerItem represents an indexer item. 
@@ -266,14 +267,15 @@ type bulkIndexer struct {
266267}
267268
268269type  bulkIndexerStats  struct  {
269- 	numAdded     uint64 
270- 	numFlushed   uint64 
271- 	numFailed    uint64 
272- 	numIndexed   uint64 
273- 	numCreated   uint64 
274- 	numUpdated   uint64 
275- 	numDeleted   uint64 
276- 	numRequests  uint64 
270+ 	numAdded      uint64 
271+ 	numFlushed    uint64 
272+ 	numFailed     uint64 
273+ 	numIndexed    uint64 
274+ 	numCreated    uint64 
275+ 	numUpdated    uint64 
276+ 	numDeleted    uint64 
277+ 	numRequests   uint64 
278+ 	flushedBytes  uint64 
277279}
278280
279281// NewBulkIndexer creates a new bulk indexer. 
@@ -354,14 +356,15 @@ func (bi *bulkIndexer) Close(ctx context.Context) error {
354356// Stats returns indexer statistics. 
355357func  (bi  * bulkIndexer ) Stats () BulkIndexerStats  {
356358	return  BulkIndexerStats {
357- 		NumAdded :    atomic .LoadUint64 (& bi .stats .numAdded ),
358- 		NumFlushed :  atomic .LoadUint64 (& bi .stats .numFlushed ),
359- 		NumFailed :   atomic .LoadUint64 (& bi .stats .numFailed ),
360- 		NumIndexed :  atomic .LoadUint64 (& bi .stats .numIndexed ),
361- 		NumCreated :  atomic .LoadUint64 (& bi .stats .numCreated ),
362- 		NumUpdated :  atomic .LoadUint64 (& bi .stats .numUpdated ),
363- 		NumDeleted :  atomic .LoadUint64 (& bi .stats .numDeleted ),
364- 		NumRequests : atomic .LoadUint64 (& bi .stats .numRequests ),
359+ 		NumAdded :     atomic .LoadUint64 (& bi .stats .numAdded ),
360+ 		NumFlushed :   atomic .LoadUint64 (& bi .stats .numFlushed ),
361+ 		NumFailed :    atomic .LoadUint64 (& bi .stats .numFailed ),
362+ 		NumIndexed :   atomic .LoadUint64 (& bi .stats .numIndexed ),
363+ 		NumCreated :   atomic .LoadUint64 (& bi .stats .numCreated ),
364+ 		NumUpdated :   atomic .LoadUint64 (& bi .stats .numUpdated ),
365+ 		NumDeleted :   atomic .LoadUint64 (& bi .stats .numDeleted ),
366+ 		NumRequests :  atomic .LoadUint64 (& bi .stats .numRequests ),
367+ 		FlushedBytes : atomic .LoadUint64 (& bi .stats .flushedBytes ),
365368	}
366369}
367370
@@ -508,7 +511,9 @@ func (w *worker) flushBuffer(ctx context.Context) error {
508511		defer  func () { w .bi .config .OnFlushEnd (ctx ) }()
509512	}
510513
511- 	if  w .buf .Len () <  1  {
514+ 	bufLen  :=  w .buf .Len ()
515+ 
516+ 	if  bufLen  <  1  {
512517		if  w .bi .config .DebugLogger  !=  nil  {
513518			w .bi .config .DebugLogger .Printf ("[worker-%03d] Flush: Buffer empty\n " , w .id )
514519		}
@@ -631,6 +636,8 @@ func (w *worker) flushBuffer(ctx context.Context) error {
631636		}
632637	}
633638
639+ 	atomic .AddUint64 (& w .bi .stats .flushedBytes , uint64 (bufLen ))
640+ 
634641	return  err 
635642}
636643
0 commit comments