Skip to content

Commit dd8d0ac

Browse files
authored
feat: allow multiple records from a single audio/text (#81)
Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com>
1 parent b851aaf commit dd8d0ac

File tree

8 files changed

+72
-88
lines changed

8 files changed

+72
-88
lines changed

timeless-api/src/main/java/dev/matheuscruz/infra/ai/ImageAiService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import dev.langchain4j.data.image.Image;
44
import dev.langchain4j.service.UserMessage;
5-
import dev.matheuscruz.infra.ai.data.RecordInfo;
5+
import dev.matheuscruz.infra.ai.data.RecognizedTransaction;
66
import io.quarkiverse.langchain4j.RegisterAiService;
77

88
@RegisterAiService(modelName = "gpt-4-turbo")
@@ -59,5 +59,5 @@ If a block of text exists between three dashes (---), use that content as the fi
5959
{description}
6060
---
6161
""")
62-
RecordInfo handleTransactionImage(Image image, String description);
62+
RecognizedTransaction handleTransactionImage(Image image, String description);
6363
}

timeless-api/src/main/java/dev/matheuscruz/infra/ai/TextAiService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dev.matheuscruz.infra.ai;
22

33
import dev.langchain4j.service.UserMessage;
4+
import dev.matheuscruz.infra.ai.data.AllRecognizedOperations;
45
import dev.matheuscruz.infra.ai.tools.GetBalanceTool;
56
import io.quarkiverse.langchain4j.RegisterAiService;
67

@@ -64,7 +65,7 @@ public interface TextAiService {
6465
Output:
6566
{
6667
"operation": "GET_BALANCE",
67-
"content": "Você possui R$ 2.384,20 disponíveis na sua conta principal."
68+
"content": { "description": "Você possui R$ 2.384,20 disponíveis na sua conta principal." }
6869
}
6970
7071
Input:
@@ -79,6 +80,6 @@ public interface TextAiService {
7980
{message}
8081
---
8182
""")
82-
String handleMessage(String message);
83+
AllRecognizedOperations handleMessage(String message);
8384

8485
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package dev.matheuscruz.infra.ai.data;
2+
3+
import java.util.List;
4+
5+
public record AllRecognizedOperations(List<RecognizedOperation> all) {
6+
}

timeless-api/src/main/java/dev/matheuscruz/infra/ai/data/ContextMessage.java

Lines changed: 0 additions & 4 deletions
This file was deleted.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package dev.matheuscruz.infra.ai.data;
2+
3+
public record RecognizedOperation(AiOperations operation, RecognizedTransaction recognizedTransaction) {
4+
}

timeless-api/src/main/java/dev/matheuscruz/infra/ai/data/RecordInfo.java renamed to timeless-api/src/main/java/dev/matheuscruz/infra/ai/data/RecognizedTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44
import dev.matheuscruz.domain.Transactions;
55
import java.math.BigDecimal;
66

7-
public record RecordInfo(BigDecimal amount, String description, Transactions type, boolean withError,
7+
public record RecognizedTransaction(BigDecimal amount, String description, Transactions type, boolean withError,
88
Categories category) {
99
}

timeless-api/src/main/java/dev/matheuscruz/infra/queue/SQS.java

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99
import dev.matheuscruz.domain.UserRepository;
1010
import dev.matheuscruz.infra.ai.TextAiService;
1111
import dev.matheuscruz.infra.ai.data.AiOperations;
12-
import dev.matheuscruz.infra.ai.data.ContextMessage;
13-
import dev.matheuscruz.infra.ai.data.RecordInfo;
12+
import dev.matheuscruz.infra.ai.data.AllRecognizedOperations;
13+
import dev.matheuscruz.infra.ai.data.RecognizedOperation;
14+
import dev.matheuscruz.infra.ai.data.RecognizedTransaction;
1415
import dev.matheuscruz.infra.ai.data.SimpleMessage;
1516
import io.quarkus.narayana.jta.QuarkusTransaction;
1617
import io.quarkus.scheduler.Scheduled;
@@ -36,7 +37,7 @@ public class SQS {
3637

3738
private static final ObjectReader INCOMING_MESSAGE_READER = new ObjectMapper().readerFor(IncomingMessage.class);
3839

39-
private static final ObjectReader AI_RESPONSE_READER = new ObjectMapper().readerFor(ContextMessage.class);
40+
private static final ObjectReader AI_RESPONSE_READER = new ObjectMapper().readerFor(RecognizedOperation.class);
4041

4142
public SQS(SqsClient sqs, @ConfigProperty(name = "whatsapp.incoming-messages.queue-url") String incomingMessagesUrl,
4243
@ConfigProperty(name = "whatsapp.messages-processed.queue-url") String messagesProcessedUrl,
@@ -76,31 +77,35 @@ private void processMessage(String body, String receiptHandle) {
7677

7778
private void handleUserMessage(User user, IncomingMessage message, String receiptHandle) {
7879
try {
79-
ContextMessage contextMessage = parseAiResponse(aiService.handleMessage(message.messageBody()));
80-
81-
switch (contextMessage.operation()) {
82-
case AiOperations.ADD_TRANSACTION ->
83-
processTransactionMessage(user, message, receiptHandle, contextMessage);
84-
case AiOperations.GET_BALANCE -> processSimpleMessage(user, message, receiptHandle, contextMessage);
85-
default -> logger.warnf("Unknown operation type: %s", contextMessage.operation());
80+
AllRecognizedOperations allRecognizedOperations = aiService.handleMessage(message.messageBody());
81+
82+
for (RecognizedOperation recognizedOperation : allRecognizedOperations.all()) {
83+
switch (recognizedOperation.operation()) {
84+
case AiOperations.ADD_TRANSACTION ->
85+
processAddTransactionMessage(user, message, receiptHandle, recognizedOperation);
86+
case AiOperations.GET_BALANCE -> {
87+
logger.info("Processing GET_BALANCE operation" + recognizedOperation.recognizedTransaction());
88+
processSimpleMessage(user, message, receiptHandle, recognizedOperation);
89+
}
90+
default -> logger.warnf("Unknown operation type: %s", recognizedOperation.operation());
91+
}
8692
}
8793

8894
} catch (Exception e) {
8995
logger.error("Failed to process message: " + message.messageId(), e);
9096
}
9197
}
9298

93-
private void processTransactionMessage(User user, IncomingMessage message, String receiptHandle,
94-
ContextMessage contextMessage) throws IOException {
95-
RecordInfo transaction = objectMapper.readValue(contextMessage.content(), RecordInfo.class);
96-
97-
sendProcessedMessage(
98-
new TransactionMessageProcessed(AiOperations.ADD_TRANSACTION.commandName(), message.messageId(),
99-
MessageStatus.PROCESSED, user.getPhoneNumber(), transaction.withError(), transaction));
99+
private void processAddTransactionMessage(User user, IncomingMessage message, String receiptHandle,
100+
RecognizedOperation recognizedOperation) throws IOException {
101+
RecognizedTransaction recognizedTransaction = recognizedOperation.recognizedTransaction();
102+
sendProcessedMessage(new TransactionMessageProcessed(AiOperations.ADD_TRANSACTION.commandName(),
103+
message.messageId(), MessageStatus.PROCESSED, user.getPhoneNumber(), recognizedTransaction.withError(),
104+
recognizedTransaction));
100105

101-
Record record = new Record.Builder().userId(user.getId()).amount(transaction.amount())
102-
.description(transaction.description()).transaction(transaction.type()).category(transaction.category())
103-
.build();
106+
Record record = new Record.Builder().userId(user.getId()).amount(recognizedTransaction.amount())
107+
.description(recognizedTransaction.description()).transaction(recognizedTransaction.type())
108+
.category(recognizedTransaction.category()).build();
104109

105110
QuarkusTransaction.requiringNew().run(() -> recordRepository.persist(record));
106111

@@ -110,8 +115,9 @@ private void processTransactionMessage(User user, IncomingMessage message, Strin
110115
}
111116

112117
private void processSimpleMessage(User user, IncomingMessage message, String receiptHandle,
113-
ContextMessage contextMessage) throws IOException {
114-
SimpleMessage response = new SimpleMessage(contextMessage.content());
118+
RecognizedOperation recognizedOperation) throws IOException {
119+
logger.infof("Processing simple message for user %s", recognizedOperation.recognizedTransaction());
120+
SimpleMessage response = new SimpleMessage(recognizedOperation.recognizedTransaction().description());
115121
sendProcessedMessage(new SimpleMessageProcessed(AiOperations.GET_BALANCE.commandName(), message.messageId(),
116122
MessageStatus.PROCESSED, user.getPhoneNumber(), response));
117123
deleteMessageUsing(receiptHandle);
@@ -124,14 +130,6 @@ private void sendProcessedMessage(Object processedMessage) throws JsonProcessing
124130
.messageDeduplicationId(UUID.randomUUID().toString()).queueUrl(processedMessagesUrl));
125131
}
126132

127-
private ContextMessage parseAiResponse(String response) {
128-
try {
129-
return AI_RESPONSE_READER.readValue(response);
130-
} catch (IOException e) {
131-
throw new RuntimeException("Failed to parse AI response", e);
132-
}
133-
}
134-
135133
private IncomingMessage parseIncomingMessage(String messageBody) {
136134
try {
137135
return INCOMING_MESSAGE_READER.readValue(messageBody);
@@ -145,7 +143,7 @@ private void deleteMessageUsing(String receiptHandle) {
145143
}
146144

147145
public record TransactionMessageProcessed(String kind, String messageId, MessageStatus status, String user,
148-
Boolean withError, RecordInfo content) {
146+
Boolean withError, RecognizedTransaction content) {
149147
}
150148

151149
public record SimpleMessageProcessed(String kind, String messageId, MessageStatus status, String user,

timeless-api/src/main/java/dev/matheuscruz/presentation/MessageResource.java

Lines changed: 27 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package dev.matheuscruz.presentation;
22

3-
import com.fasterxml.jackson.core.JsonProcessingException;
43
import com.fasterxml.jackson.databind.ObjectMapper;
54
import dev.langchain4j.data.image.Image;
65
import dev.matheuscruz.domain.Record;
@@ -10,9 +9,8 @@
109
import dev.matheuscruz.infra.ai.ImageAiService;
1110
import dev.matheuscruz.infra.ai.TextAiService;
1211
import dev.matheuscruz.infra.ai.data.AiOperations;
13-
import dev.matheuscruz.infra.ai.data.ContextMessage;
14-
import dev.matheuscruz.infra.ai.data.RecordInfo;
15-
import io.quarkus.logging.Log;
12+
import dev.matheuscruz.infra.ai.data.RecognizedOperation;
13+
import dev.matheuscruz.infra.ai.data.RecognizedTransaction;
1614
import io.quarkus.narayana.jta.QuarkusTransaction;
1715
import jakarta.validation.Valid;
1816
import jakarta.validation.constraints.NotBlank;
@@ -23,6 +21,9 @@
2321
import jakarta.ws.rs.Produces;
2422
import jakarta.ws.rs.core.MediaType;
2523
import jakarta.ws.rs.core.Response;
24+
import java.util.List;
25+
import java.util.function.Predicate;
26+
import java.util.stream.Stream;
2627

2728
@Path("/api/messages")
2829
public class MessageResource {
@@ -31,14 +32,12 @@ public class MessageResource {
3132
private final TextAiService aiService;
3233
private final ImageAiService imageAiService;
3334
private final RecordRepository recordRepository;
34-
private final ObjectMapper objectMapper;
3535

3636
public MessageResource(TextAiService aiService, ImageAiService imageAiService, RecordRepository recordRepository,
3737
ObjectMapper mapper, UserRepository userRepository) {
3838
this.aiService = aiService;
3939
this.imageAiService = imageAiService;
4040
this.recordRepository = recordRepository;
41-
this.objectMapper = mapper;
4241
this.userRepository = userRepository;
4342
}
4443

@@ -57,70 +56,50 @@ public Response message(@Valid MessageRequest req) {
5756
public Response image(@Valid ImageRequest req) {
5857

5958
User user = userRepository.findByPhoneNumber(req.from()).orElseThrow(NotFoundException::new);
60-
RecordInfo imageResponse = imageAiService.handleTransactionImage(
59+
RecognizedTransaction recognizedTransaction = imageAiService.handleTransactionImage(
6160
Image.builder().base64Data(req.base64()).mimeType(req.mimeType()).build(), req.text());
6261

63-
if (imageResponse.withError()) {
64-
return Response.status(Response.Status.BAD_REQUEST).entity(imageResponse).build();
62+
if (recognizedTransaction.withError()) {
63+
return Response.status(Response.Status.BAD_REQUEST).entity(recognizedTransaction).build();
6564
}
6665

6766
QuarkusTransaction.requiringNew().run(() -> {
6867

69-
Record record = new Record.Builder().userId(user.getId()).amount(imageResponse.amount())
70-
.description(imageResponse.description()).transaction(imageResponse.type())
71-
.category(imageResponse.category()).build();
68+
Record record = new Record.Builder().userId(user.getId()).amount(recognizedTransaction.amount())
69+
.description(recognizedTransaction.description()).transaction(recognizedTransaction.type())
70+
.category(recognizedTransaction.category()).build();
7271

7372
this.recordRepository.persist(record);
7473
});
7574

76-
return Response.status(Response.Status.CREATED).entity(imageResponse).build();
75+
return Response.status(Response.Status.CREATED).entity(recognizedTransaction).build();
7776
}
7877

7978
private Response handleMessage(User user, String message) {
80-
String response = aiService.handleMessage(message);
81-
return processAiResponse(user, response);
79+
List<RecognizedOperation> response = aiService.handleMessage(message).all();
80+
return processOnlyAddTransaction(user, response);
8281
}
8382

84-
private Response processAiResponse(User user, String response) {
83+
private Response processOnlyAddTransaction(User user, List<RecognizedOperation> messages) {
8584

86-
ContextMessage contextMessage;
87-
try {
88-
contextMessage = objectMapper.readValue(response, ContextMessage.class);
89-
} catch (JsonProcessingException e) {
90-
Log.error("Failed to parse AI response", e);
91-
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
92-
}
85+
List<RecognizedTransaction> onlyAddTransaction = messages.stream()
86+
.filter(message -> AiOperations.ADD_TRANSACTION.equals(message.operation()))
87+
.map(recognizedOperation -> recognizedOperation.recognizedTransaction()).toList();
9388

94-
if (AiOperations.ADD_TRANSACTION.equals(contextMessage.operation())) {
95-
return handleTransaction(contextMessage, user);
96-
}
89+
handleTransactions(onlyAddTransaction, user);
9790

98-
return Response.status(Response.Status.BAD_REQUEST).entity(contextMessage).build();
91+
return Response.status(Response.Status.NO_CONTENT).build();
9992
}
10093

101-
private Response handleTransaction(ContextMessage contextMessage, User user) {
102-
RecordInfo transaction;
103-
try {
104-
transaction = objectMapper.readValue(contextMessage.content(), RecordInfo.class);
105-
} catch (JsonProcessingException e) {
106-
Log.error("Failed to parse transaction content", e);
107-
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
108-
}
109-
110-
if (transaction.withError()) {
111-
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
112-
}
113-
94+
private void handleTransactions(List<RecognizedTransaction> transactions, User user) {
11495
QuarkusTransaction.requiringNew().run(() -> {
115-
116-
Record record = new Record.Builder().userId(user.getId()).amount(transaction.amount())
117-
.description(transaction.description()).category(transaction.category())
118-
.transaction(transaction.type()).build();
119-
120-
recordRepository.persist(record);
96+
Stream<Record> recordStream = transactions.stream().filter(Predicate.not(RecognizedTransaction::withError))
97+
.map(recognizedTransaction -> new Record.Builder().userId(user.getId())
98+
.amount(recognizedTransaction.amount()).description(recognizedTransaction.description())
99+
.category(recognizedTransaction.category()).transaction(recognizedTransaction.type())
100+
.build());
101+
recordRepository.persist(recordStream);
121102
});
122-
123-
return Response.ok(contextMessage).build();
124103
}
125104

126105
public record MessageRequest(@NotBlank String from, @NotBlank String message) {

0 commit comments

Comments
 (0)