Skip to content

Commit 7555a05

Browse files
committed
feat: 增加metrics
1 parent 3dbd5d4 commit 7555a05

File tree

3 files changed

+42
-3
lines changed

3 files changed

+42
-3
lines changed

api/src/main/java/com/ke/bella/batch/service/BatchService.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.theokanning.openai.batch.BatchRequest;
2727
import com.theokanning.openai.queue.Put;
2828
import com.theokanning.openai.queue.Task;
29+
import io.micrometer.core.instrument.MeterRegistry;
2930
import lombok.SneakyThrows;
3031
import lombok.extern.slf4j.Slf4j;
3132
import org.apache.commons.collections4.MapUtils;
@@ -65,6 +66,9 @@ public class BatchService {
6566
@Resource
6667
private BatchCompleteCountUpdater batchCompleteCountUpdater;
6768

69+
@Resource
70+
private MeterRegistry meterRegistry;
71+
6872
private static final int FLUSH_THRESHOLD = 100;
6973

7074
@Value("${batch.max.splitting:100}")
@@ -114,6 +118,8 @@ public Batch create(BatchRequest create, String queue) {
114118
.callbackUrl(StringUtils.EMPTY)
115119
.data(Map.of("batchId", batchId))
116120
.build());
121+
122+
meterRegistry.counter("batch.create.total", "queue", queueMeta.getQueue()).increment();
117123
return batch;
118124
}
119125

@@ -174,6 +180,7 @@ public void split(TaskWrapper task) {
174180
doFinalize(batchId, BatchStatus.completed.name());
175181
}
176182
result.put("success", "done");
183+
meterRegistry.counter("batch.splitted.total", "queue", queueMeta.getQueue()).increment();
177184
} catch (Exception e) {
178185
log.error("Failed to split batch batchId: {}", batchId, e);
179186
batchRepo.setFailed(batchId);
@@ -197,6 +204,7 @@ private void doSplit(BatchDB batch, Path file, QueueMetadataDB queueMeta) {
197204

198205
AtomicLong lines = new AtomicLong(skips);
199206

207+
FullQueueName fullQueueName = new FullQueueName(queueMeta.getQueue(), QueueLevel.L1.getLevel());
200208
FileUtils.processLines(filePath, skips, line -> {
201209
if(StringUtils.isEmpty(line)) {
202210
return;
@@ -205,6 +213,7 @@ private void doSplit(BatchDB batch, Path file, QueueMetadataDB queueMeta) {
205213
try {
206214
Task task = createTask(batch, queueMeta, line, lines.get());
207215
tasks.add(task);
216+
meterRegistry.counter("queue.task.put.total", "queue", fullQueueName.toString()).increment();
208217
} catch (Exception e) {
209218
Task task = new Task();
210219
task.setTaskId(StringUtils.EMPTY);
@@ -240,6 +249,10 @@ private void doFinalize(String batchId, String status) {
240249
doCancel(batchId);
241250
}
242251

252+
Long queueId = IDGenerator.parseQueueIdFromBatchId(batchId);
253+
QueueMetadataDB queueMeta = queueRepo.findMetadataById(queueId);
254+
meterRegistry.counter("batch.finalize.total", "queue"
255+
, queueMeta.getQueue(), "status", status).increment();
243256
} finally {
244257
batchCompleteCountUpdater.remove(batchId);
245258
FileUtils.removeAll(Configs.getBatchDir(batchId));

api/src/main/java/com/ke/bella/batch/service/QueueService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.commons.collections4.CollectionUtils;
3232
import org.apache.commons.collections4.MapUtils;
3333
import org.apache.commons.lang3.StringUtils;
34+
import io.micrometer.core.instrument.MeterRegistry;
3435
import org.springframework.beans.factory.annotation.Autowired;
3536
import org.springframework.beans.factory.annotation.Value;
3637
import org.springframework.stereotype.Service;
@@ -90,6 +91,9 @@ public class QueueService {
9091
@Autowired
9192
private BatchService batchService;
9293

94+
@Resource
95+
private MeterRegistry meterRegistry;
96+
9397
@PostConstruct
9498
@SuppressWarnings("all")
9599
public void init() {
@@ -149,6 +153,7 @@ public Task put(Put put) {
149153
throw new IllegalArgumentException("Unsupported response mode: " + put.getResponseMode());
150154
}
151155

156+
meterRegistry.counter("queue.task.put.total", "queue", put.getFullQueueName()).increment();
152157
return task;
153158
}
154159

@@ -210,6 +215,8 @@ public Map<String, List<Task>> take(Take take) {
210215
trackProcessTimeout(allTasks, take.getProcessTimeout());
211216
}
212217

218+
tasksByQueue.forEach((queue, tasks)
219+
-> meterRegistry.counter("queue.task.take.total", "queue", queue).increment(tasks.size()));
213220
return tasksByQueue;
214221
}
215222

@@ -254,6 +261,7 @@ public void complete(String taskId, Map<String, Object> result) {
254261
FullQueueName fullQueueName = new FullQueueName(task.getQueue(), level);
255262
queueHeadUpdater.increaseCompletedCnt(fullQueueName.toString(), 1L);
256263
releaseSequentialLock(fullQueueName.toString(), taskId);
264+
meterRegistry.counter("queue.task.complete.total", "queue", fullQueueName.toString()).increment();
257265
}
258266

259267
private void reportUsage(QueueDB queueDB, Map<String, Object> result) {

api/src/test/java/com/ke/bella/batch/service/BatchServiceTest.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
import com.theokanning.openai.batch.Batch;
2020
import com.theokanning.openai.batch.BatchRequest;
2121
import com.theokanning.openai.batch.RequestCounts;
22-
import com.theokanning.openai.queue.Task;
22+
import io.micrometer.core.instrument.Counter;
23+
import io.micrometer.core.instrument.MeterRegistry;
2324
import org.junit.Before;
2425
import org.junit.Test;
2526
import org.junit.runner.RunWith;
@@ -52,9 +53,15 @@ public class BatchServiceTest {
5253
@Mock
5354
private BatchCompleteCountUpdater batchCompleteCountUpdater;
5455

55-
@Mock
56+
@Mock
5657
private QueueService queueService;
5758

59+
@Mock
60+
private MeterRegistry meterRegistry;
61+
62+
@Mock
63+
private Counter counter;
64+
5865
@InjectMocks
5966
private BatchService batchService;
6067

@@ -73,6 +80,14 @@ public void setUpBatchService() {
7380
// Set maxSplittingBatches field using reflection to fix test failure
7481
ReflectionTestUtils.setField(batchService, "maxSplittingBatches", 500);
7582

83+
// Mock meterRegistry.counter() to return a mock Counter
84+
when(meterRegistry.counter(anyString(), anyString(), anyString())).thenReturn(counter);
85+
when(meterRegistry.counter(anyString(), anyString(), anyString(), anyString(), anyString())).thenReturn(counter);
86+
87+
// Mock queueService.put() to return a mock Task
88+
Task mockTask = mock(Task.class);
89+
when(queueService.put(any())).thenReturn(mockTask);
90+
7691
createRequest = BatchRequest.builder()
7792
.endpoint("/api/test")
7893
.inputFileId("file123")
@@ -274,11 +289,14 @@ public void testStatWhenBatchIsCompletedButStatusNotSet() {
274289

275290
when(batchRepo.setFinalizing(batchId)).thenReturn(true);
276291
when(batchRepo.completeBatch(batchId)).thenReturn(true);
292+
when(queueRepo.findMetadataById(1L)).thenReturn(queueMetadata);
277293

278294
try (MockedStatic<Configs> mockedConfigs = mockStatic(Configs.class);
279295
MockedStatic<FileUtils> mockedFileUtils = mockStatic(FileUtils.class);
280-
MockedStatic<Files> mockedFiles = mockStatic(Files.class)) {
296+
MockedStatic<Files> mockedFiles = mockStatic(Files.class);
297+
MockedStatic<IDGenerator> mockedIDGenerator = mockStatic(IDGenerator.class)) {
281298

299+
mockedIDGenerator.when(() -> IDGenerator.parseQueueIdFromBatchId(batchId)).thenReturn(1L);
282300
mockedConfigs.when(() -> Configs.getBatchDir(batchId)).thenReturn(java.nio.file.Paths.get("/tmp/test-batch"));
283301
mockedConfigs.when(() -> Configs.getBatchOutputFile(batchId)).thenReturn(java.nio.file.Paths.get("/tmp/output"));
284302
mockedConfigs.when(() -> Configs.getBatchErrorFile(batchId)).thenReturn(java.nio.file.Paths.get("/tmp/error"));

0 commit comments

Comments
 (0)