-
Notifications
You must be signed in to change notification settings - Fork 2k
[WIP] Refactor AdvisedRequest and introduce AdvisedResponse #1332
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Refactor AdvisedRequest and introduce AdvisedResponse #1332
Conversation
a44856d to
ffd645d
Compare
| for (ResponseAdvisor advisor : aggregateResponseAdvisors) { | ||
| AdvisorObservableHelper.adviseResponse(parentObservation, advisor, chatResponse, | ||
| advisorContext); | ||
| AtomicReference<Map<String, Object>> adviseContext = new AtomicReference<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's happening here? Why do you want this to be stateful? The code that follows can seem confusing, as in - it seems to be imperative, where you should get the enhanced context in the following processing but that context will only get the current snapshot of alterations per each chunk. So the ceremony seems fruitless in a sense. It doesn't join the mutated context either, so it is also unnecessary for the aggregated bit.
|
|
||
| private static final Logger logger = LoggerFactory.getLogger(MessageAggregator.class); | ||
|
|
||
| public Flux<AdvisedResponse> aggregateAdvisedResponse(Flux<AdvisedResponse> advisedResponseFlux, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic looks like a duplicate from DefaultChatClient, which does not call this method, am I correct?
| .thenReturn(new ChatResponse(List.of(new Generation(new AssistantMessage("Hello John"))))) | ||
| .thenReturn(new ChatResponse(List.of(new Generation(new AssistantMessage("Your name is John"))))); | ||
|
|
||
| when(chatModel.call(promptCaptor.capture())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Is there duplication here?
| .containsEntry("aroundCallAfter", "AROUND_CALL_AFTER") | ||
| .doesNotContainKeys("afterCall"); | ||
|
|
||
| assertThat(advisedResponse.adviseContext()).containsEntry("key1", "value1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion is a duplicate of the above, right?
| assertThat(mockAroundAdvisor.advisedResponse.adviseContext()).containsEntry("key1", "value1") | ||
| .containsEntry("key2", "value2") | ||
| .containsEntry("before", "BEFORE") | ||
| .containsEntry("aroundCallBefore", "AROUND_CALL_BEFORE") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't these be AROUND_STREAM_* ?
| boolean withFinishReason = response.getResults() | ||
| for (AfterAdvisor advisor : onFinishElementResponseAdvisors) { | ||
| boolean withFinishReason = advisedResponse.response() | ||
| .getResults() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to iterate over the response multiple times? I assume this bit iterates over it just to find the condition, but later processing will actually go over it to do something useful with the data. It's just a question :)
| Flux<AdvisedResponse> advisedResponseFlux = chain.nextAroundStream(advisedRequest); | ||
|
|
||
| return new MessageAggregator().aggregate(fluxChatResponse, chatResponse -> { | ||
| return new MessageAggregator().aggregateAdvisedResponse(advisedResponseFlux, chatResponse -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels inefficient to apply multiple aggregators to achieve some end goal. Can this particular advisor be split into two - one being the around-advisor which can serve a cached response, while the other, the storing one, would be an aggregate-advisor. WDYT?
| Map<String, Object> advisorParams, Map<String, Object> adviseContext) { | ||
|
|
||
| public AdvisedRequest augmentContext(Function<Map<String, Object>, Map<String, Object>> contextTransform) { | ||
| return from(this) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a compact constructor of record to assign the adviseContext to the Collections.unmodifiableMap of the passed in argument at record creation. This will encapsulate the immutability of the map. Currently, any implementor is able to access the record's map directly and modify it. Adding that safeguard will also simplify the code here.
| return advisors.stream() | ||
| .filter(advisor -> advisor instanceof RequestAdvisor) | ||
| .map(a -> (RequestAdvisor) a) | ||
| .filter(advisor -> advisor instanceof BeforeAdvisor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider pre-processing the advisors at their definition to avoid the expensive instanceof and casting at runtime, which can add up a performance hit. Perhaps specialized API methods can be added for specific advisor types to optimize the reading aspect of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps in ChatClient the signature ChatClientRequestSpec advisors(Consumer<AdvisorSpec> consumer) could be the one offered and the AdvisorSpec would offer type-safe, yet specific, advisor registrations for particular types. But without changing the API the DefaultChatClient could internally store particular implementations into well-defined slots upon specification by the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments. I really like the direction of having the context embedded in the Advised(Request|Response) and the reactive manipulations look well optimized. There are a few more things that can be improved I think.
| */ | ||
| default Flux<ChatResponse> adviseResponse(Flux<ChatResponse> fluxResponse, Map<String, Object> adviseContext) { | ||
| return fluxResponse; | ||
| default Flux<AdvisedResponse> afterStream(Flux<AdvisedResponse> advisedResponseStream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API feels that it should belong elsewhere, to a specific custom advisor type. Currently, from my understanding, this is only called for the custom case. For most regular advisors this would be confusing that this can be overridden.
| * @return the advised {@link ChatResponse}. | ||
| */ | ||
| ChatResponse adviseResponse(ChatResponse response, Map<String, Object> adviseContext); | ||
| AdvisedResponse afterCall(AdvisedResponse advisedResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name might be better aligned with BeforeAdvisor's before() signature if the Call part is skipped, or the other one also has the Call suffix.
| */ | ||
| Flux<ChatResponse> aroundStream(AdvisedRequest advisedRequest, Map<String, Object> adviceContext, | ||
| AroundAdvisorChain chain); | ||
| Flux<AdvisedResponse> aroundStream(AdvisedRequest advisedRequest, AroundAdvisorChain chain); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's stopping the "Custom" implementations to be considered as the "around" advisors, making the "custom" abstraction unnecessary?
| default StreamResponseMode getStreamResponseMode() { | ||
| return StreamResponseMode.ON_FINISH_ELEMENT; | ||
| default AfterStreamMode getAfterStreamMode() { | ||
| return AfterStreamMode.ON_FINISH_ELEMENT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it not be more natural if a default behaviour was to apply the advisor per each chunk in the stream using the imperative implementation? Or perhaps applying it over the aggregation? If there's no easy answer, perhaps picking a default should not happen at all and stream advisors should not belong to the imperative type at all?
…psulation - Introduce AdvisedResponse to encapsulate ChatResponse and advice context - Refactor RequestAdvisor and ResponseAdvisor interface to use AdvisedRequest/AdvisedResponse records - Update DefaultAroundAdvisorChain to work with new advisor interfaces - Modify DefaultChatClient to use new advisor framework - Adjust MessageChatMemoryAdvisor for compatibility with new advisor structure - Update integration tests to reflect changes in advisor framework - Update AdvisedRequest to include adviseContext and add toPrompt() method - Modify advisor interfaces to remove explicit context parameter - Introduce AdvisedResponse to encapsulate ChatResponse with adviseContext - Update DefaultChatClient to use new advisor interfaces and AdvisedResponse - Adjust integration tests in OpenAiPaymentTransactionIT and VertexAiGeminiPaymentTransactionIT - Refactor the RequestResponseAdvisor interface to make it a backward compatible adabror for ReqeustAdvisor and ResponseAdvisors. Aslo mark it as deprecated. - Update various advisor implementations to work with new AdvisedRequest and AdvisedResponse - Modify AroundAdvisorChain to use AdvisedResponse in both call and stream methods - Implement new adviseResponse method for streaming responses in RequestResponseAdvisor - Update MessageChatMemoryAdvisor, PromptChatMemoryAdvisor, QuestionAnswerAdvisor, SimpleLoggerAdvisor, and VectorStoreChatMemoryAdvisor to use new advisor interfaces - Add new AdvisedResponse class to encapsulate ChatResponse with advice context - Update CallAroundAdvisor, StreamAroundAdvisor, RequestAdvisor, and ResponseAdvisor interfaces to use AdvisedResponse and remove adviceContext parameter - Modify CacheAroundAdvisor and SafeGuardAroundAdvisor to work with new advisor interfaces and AdvisedResponse - Update AdvisorObservableHelper to use AdvisedResponse and remove explicit context parameter - Refactor MessageAggregator to support aggregation of AdvisedResponse objects - Rename extractRequestAdvisors and extractResponseAdvisors methods to requestAdvisors and responseAdvisors in AdvisorObservableHelper - Add null checks and return empty lists for advisors in AdvisorObservableHelper
- Rename RequestAdvisor to BeforeAdvisor and ResponseAdvisor to AfterAdvisor - Change adviseRequest method to before and adviseResponse to afterCall - Update StreamResponseMode to AfterStreamMode - Adjust implementing classes and related code to use new naming conventions - Remove deprecated RequestResponseAdvisor interface
- Move AdvisedRequest from chat.client to chat.client.advisor.api - Update import statements across multiple files - Adjust related advisor classes to use the new import path - Remove unused imports and optimize imports in affected files - Add AdvisorsTests
ffd645d to
65f373a
Compare
|
Replaced by #1422 |
Introduce AdvisedResponse to encapsulate ChatResponse and advice context
Refactor RequestAdvisor and ResponseAdvisor interface to use AdvisedRequest/AdvisedResponse records
Update DefaultAroundAdvisorChain to work with new advisor interfaces
Modify DefaultChatClient to use new advisor framework
Adjust MessageChatMemoryAdvisor for compatibility with new advisor structure
Update integration tests to reflect changes in advisor framework
Update AdvisedRequest to include adviseContext and add toPrompt() method
Modify advisor interfaces to remove explicit context parameter
Introduce AdvisedResponse to encapsulate ChatResponse with adviseContext
Update DefaultChatClient to use new advisor interfaces and AdvisedResponse
Adjust integration tests in OpenAiPaymentTransactionIT and VertexAiGeminiPaymentTransactionIT
Refactor the RequestResponseAdvisor interface to make it a backward compatible adabror for ReqeustAdvisor and ResponseAdvisors. Aslo mark it as deprecated.
Update various advisor implementations to work with new AdvisedRequest and AdvisedResponse
Modify AroundAdvisorChain to use AdvisedResponse in both call and stream methods
Implement new adviseResponse method for streaming responses in RequestResponseAdvisor
Update MessageChatMemoryAdvisor, PromptChatMemoryAdvisor, QuestionAnswerAdvisor, SimpleLoggerAdvisor, and VectorStoreChatMemoryAdvisor to use new advisor interfaces
Add new AdvisedResponse class to encapsulate ChatResponse with advice context
Update CallAroundAdvisor, StreamAroundAdvisor, RequestAdvisor, and ResponseAdvisor interfaces to use AdvisedResponse and remove adviceContext parameter
Modify CacheAroundAdvisor and SafeGuardAroundAdvisor to work with new advisor interfaces and AdvisedResponse
Update AdvisorObservableHelper to use AdvisedResponse and remove explicit context parameter
Refactor MessageAggregator to support aggregation of AdvisedResponse objects
Rename extractRequestAdvisors and extractResponseAdvisors methods to requestAdvisors and responseAdvisors in AdvisorObservableHelper
Add null checks and return empty lists for advisors in AdvisorObservableHelper