2626import com .theokanning .openai .batch .BatchRequest ;
2727import com .theokanning .openai .queue .Put ;
2828import com .theokanning .openai .queue .Task ;
29+ import io .micrometer .core .instrument .MeterRegistry ;
2930import lombok .SneakyThrows ;
3031import lombok .extern .slf4j .Slf4j ;
3132import org .apache .commons .collections4 .MapUtils ;
@@ -65,6 +66,9 @@ public class BatchService {
6566 @ Resource
6667 private BatchCompleteCountUpdater batchCompleteCountUpdater ;
6768
69+ @ Resource
70+ private MeterRegistry meterRegistry ;
71+
6872 private static final int FLUSH_THRESHOLD = 100 ;
6973
7074 @ Value ("${batch.max.splitting:100}" )
@@ -114,6 +118,8 @@ public Batch create(BatchRequest create, String queue) {
114118 .callbackUrl (StringUtils .EMPTY )
115119 .data (Map .of ("batchId" , batchId ))
116120 .build ());
121+
122+ meterRegistry .counter ("batch.create.total" , "queue" , queueMeta .getQueue ()).increment ();
117123 return batch ;
118124 }
119125
@@ -174,6 +180,7 @@ public void split(TaskWrapper task) {
174180 doFinalize (batchId , BatchStatus .completed .name ());
175181 }
176182 result .put ("success" , "done" );
183+ meterRegistry .counter ("batch.splitted.total" , "queue" , queueMeta .getQueue ()).increment ();
177184 } catch (Exception e ) {
178185 log .error ("Failed to split batch batchId: {}" , batchId , e );
179186 batchRepo .setFailed (batchId );
@@ -197,6 +204,7 @@ private void doSplit(BatchDB batch, Path file, QueueMetadataDB queueMeta) {
197204
198205 AtomicLong lines = new AtomicLong (skips );
199206
207+ FullQueueName fullQueueName = new FullQueueName (queueMeta .getQueue (), QueueLevel .L1 .getLevel ());
200208 FileUtils .processLines (filePath , skips , line -> {
201209 if (StringUtils .isEmpty (line )) {
202210 return ;
@@ -205,6 +213,7 @@ private void doSplit(BatchDB batch, Path file, QueueMetadataDB queueMeta) {
205213 try {
206214 Task task = createTask (batch , queueMeta , line , lines .get ());
207215 tasks .add (task );
216+ meterRegistry .counter ("queue.task.put.total" , "queue" , fullQueueName .toString ()).increment ();
208217 } catch (Exception e ) {
209218 Task task = new Task ();
210219 task .setTaskId (StringUtils .EMPTY );
@@ -240,6 +249,10 @@ private void doFinalize(String batchId, String status) {
240249 doCancel (batchId );
241250 }
242251
252+ Long queueId = IDGenerator .parseQueueIdFromBatchId (batchId );
253+ QueueMetadataDB queueMeta = queueRepo .findMetadataById (queueId );
254+ meterRegistry .counter ("batch.finalize.total" , "queue"
255+ , queueMeta .getQueue (), "status" , status ).increment ();
243256 } finally {
244257 batchCompleteCountUpdater .remove (batchId );
245258 FileUtils .removeAll (Configs .getBatchDir (batchId ));
0 commit comments