Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/src/main/java/com/ke/bella/batch/service/BatchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.theokanning.openai.batch.BatchRequest;
import com.theokanning.openai.queue.Put;
import com.theokanning.openai.queue.Task;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
Expand Down Expand Up @@ -65,6 +66,9 @@ public class BatchService {
@Resource
private BatchCompleteCountUpdater batchCompleteCountUpdater;

@Resource
private MeterRegistry meterRegistry;

private static final int FLUSH_THRESHOLD = 100;

@Value("${batch.max.splitting:100}")
Expand Down Expand Up @@ -114,6 +118,8 @@ public Batch create(BatchRequest create, String queue) {
.callbackUrl(StringUtils.EMPTY)
.data(Map.of("batchId", batchId))
.build());

meterRegistry.counter("batch.create.total", "queue", queueMeta.getQueue()).increment();
return batch;
}

Expand Down Expand Up @@ -174,6 +180,7 @@ public void split(TaskWrapper task) {
doFinalize(batchId, BatchStatus.completed.name());
}
result.put("success", "done");
meterRegistry.counter("batch.splitted.total", "queue", queueMeta.getQueue()).increment();
} catch (Exception e) {
log.error("Failed to split batch batchId: {}", batchId, e);
batchRepo.setFailed(batchId);
Expand All @@ -197,6 +204,7 @@ private void doSplit(BatchDB batch, Path file, QueueMetadataDB queueMeta) {

AtomicLong lines = new AtomicLong(skips);

FullQueueName fullQueueName = new FullQueueName(queueMeta.getQueue(), QueueLevel.L1.getLevel());
FileUtils.processLines(filePath, skips, line -> {
if(StringUtils.isEmpty(line)) {
return;
Expand All @@ -205,6 +213,7 @@ private void doSplit(BatchDB batch, Path file, QueueMetadataDB queueMeta) {
try {
Task task = createTask(batch, queueMeta, line, lines.get());
tasks.add(task);
meterRegistry.counter("queue.task.put.total", "queue", fullQueueName.toString()).increment();
} catch (Exception e) {
Task task = new Task();
task.setTaskId(StringUtils.EMPTY);
Expand Down Expand Up @@ -240,6 +249,10 @@ private void doFinalize(String batchId, String status) {
doCancel(batchId);
}

Long queueId = IDGenerator.parseQueueIdFromBatchId(batchId);
QueueMetadataDB queueMeta = queueRepo.findMetadataById(queueId);
meterRegistry.counter("batch.finalize.total", "queue"
, queueMeta.getQueue(), "status", status).increment();
} finally {
batchCompleteCountUpdater.remove(batchId);
FileUtils.removeAll(Configs.getBatchDir(batchId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -90,6 +91,9 @@ public class QueueService {
@Autowired
private BatchService batchService;

@Resource
private MeterRegistry meterRegistry;

@PostConstruct
@SuppressWarnings("all")
public void init() {
Expand Down Expand Up @@ -149,6 +153,7 @@ public Task put(Put put) {
throw new IllegalArgumentException("Unsupported response mode: " + put.getResponseMode());
}

meterRegistry.counter("queue.task.put.total", "queue", put.getFullQueueName()).increment();
return task;
}

Expand Down Expand Up @@ -210,6 +215,8 @@ public Map<String, List<Task>> take(Take take) {
trackProcessTimeout(allTasks, take.getProcessTimeout());
}

tasksByQueue.forEach((queue, tasks)
-> meterRegistry.counter("queue.task.take.total", "queue", queue).increment(tasks.size()));
return tasksByQueue;
}

Expand Down Expand Up @@ -254,6 +261,7 @@ public void complete(String taskId, Map<String, Object> result) {
FullQueueName fullQueueName = new FullQueueName(task.getQueue(), level);
queueHeadUpdater.increaseCompletedCnt(fullQueueName.toString(), 1L);
releaseSequentialLock(fullQueueName.toString(), taskId);
meterRegistry.counter("queue.task.complete.total", "queue", fullQueueName.toString()).increment();
}

private void reportUsage(QueueDB queueDB, Map<String, Object> result) {
Expand Down
24 changes: 21 additions & 3 deletions api/src/test/java/com/ke/bella/batch/service/BatchServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import com.theokanning.openai.batch.Batch;
import com.theokanning.openai.batch.BatchRequest;
import com.theokanning.openai.batch.RequestCounts;
import com.theokanning.openai.queue.Task;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -52,9 +53,15 @@ public class BatchServiceTest {
@Mock
private BatchCompleteCountUpdater batchCompleteCountUpdater;

@Mock
@Mock
private QueueService queueService;

@Mock
private MeterRegistry meterRegistry;

@Mock
private Counter counter;

@InjectMocks
private BatchService batchService;

Expand All @@ -73,6 +80,14 @@ public void setUpBatchService() {
// Set maxSplittingBatches field using reflection to fix test failure
ReflectionTestUtils.setField(batchService, "maxSplittingBatches", 500);

// Mock meterRegistry.counter() to return a mock Counter
when(meterRegistry.counter(anyString(), anyString(), anyString())).thenReturn(counter);
when(meterRegistry.counter(anyString(), anyString(), anyString(), anyString(), anyString())).thenReturn(counter);

// Mock queueService.put() to return a mock Task
Task mockTask = mock(Task.class);
when(queueService.put(any())).thenReturn(mockTask);

createRequest = BatchRequest.builder()
.endpoint("/api/test")
.inputFileId("file123")
Expand Down Expand Up @@ -274,11 +289,14 @@ public void testStatWhenBatchIsCompletedButStatusNotSet() {

when(batchRepo.setFinalizing(batchId)).thenReturn(true);
when(batchRepo.completeBatch(batchId)).thenReturn(true);
when(queueRepo.findMetadataById(1L)).thenReturn(queueMetadata);

try (MockedStatic<Configs> mockedConfigs = mockStatic(Configs.class);
MockedStatic<FileUtils> mockedFileUtils = mockStatic(FileUtils.class);
MockedStatic<Files> mockedFiles = mockStatic(Files.class)) {
MockedStatic<Files> mockedFiles = mockStatic(Files.class);
MockedStatic<IDGenerator> mockedIDGenerator = mockStatic(IDGenerator.class)) {

mockedIDGenerator.when(() -> IDGenerator.parseQueueIdFromBatchId(batchId)).thenReturn(1L);
mockedConfigs.when(() -> Configs.getBatchDir(batchId)).thenReturn(java.nio.file.Paths.get("/tmp/test-batch"));
mockedConfigs.when(() -> Configs.getBatchOutputFile(batchId)).thenReturn(java.nio.file.Paths.get("/tmp/output"));
mockedConfigs.when(() -> Configs.getBatchErrorFile(batchId)).thenReturn(java.nio.file.Paths.get("/tmp/error"));
Expand Down