Skip to content

Commit c8e0418

Browse files
authored
stream-to-topic-configuration (#113)
1 parent 20df3fa commit c8e0418

File tree

1 file changed

+53
-2
lines changed

1 file changed

+53
-2
lines changed

building-applications/topics.md

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ You can also customise all the properties provided by the underlying broker by s
8888

8989
```yaml
9090
topics:
91-
- name: "offset-topic"
91+
- name: "offset-topic"
9292
creation-mode: create-if-not-exists
9393
partitions: 1
9494
options:
@@ -126,7 +126,7 @@ But you must be aware that the main way to control the ordering of messages is b
126126

127127
### **Implicit topics**
128128

129-
The LangStream planner may decide to create additional topics to connect the agents. This is because most of the agents may run together in the same kubernetes pod, but under some conditions this is not possible, for example:
129+
The LangStream planner may decide to create additional topics to connect the agents. This is because most of the agents may run together in the same Kubernetes pod, but under some conditions this is not possible, for example:
130130

131131
* two agents in the same pipeline have different resource requirements, so they must live in separate pods
132132
* some agents require a direct connection to a topic
@@ -207,3 +207,54 @@ When you mark an agent with **on-failure: deadletter**, this means that in case
207207
In this case, the LangStream planner automatically creates a topic next to the input topic of the agent, with the same schema and with a name as `topicname + “-deadletter”`.
208208

209209
You can read more about error handling [here](error-handling.md).
210+
211+
### Stream-to-topic parameter
212+
213+
Some agents allow you to configure the "stream-to-topic" parameter in the pipeline as below:
214+
215+
```yaml
216+
- name: "ai-chat-completions"
217+
type: "ai-chat-completions"
218+
output: "history-topic"
219+
configuration:
220+
model: "${secrets.open-ai.chat-completions-model}"
221+
# on the log-topic we add a field with the answer
222+
completion-field: "value.answer"
223+
# we are also logging the prompt we sent to the LLM
224+
log-field: "value.prompt"
225+
# here we configure the streaming behavior
226+
# as soon as the LLM answers with a chunk we send it to the answers-topic
227+
stream-to-topic: "output-topic"
228+
# on the streaming answer we send the answer as whole message
229+
# the 'value' syntax is used to refer to the whole value of the message
230+
stream-response-completion-field: "value"
231+
# we want to stream the answer as soon as we have 10 chunks
232+
# in order to reduce latency for the first message the agent sends the first message
233+
# with 1 chunk, then with 2 chunks....up to the min-chunks-per-message value
234+
# eventually we want to send bigger messages to reduce the overhead of each message on the topic
235+
min-chunks-per-message: 10
236+
messages:
237+
- role: user
238+
content: "You are a helpful assistant. Below you can find a question from the user. Please try to help them the best way you can.\n\n{{ value.question}}"
239+
```
240+
241+
In this case the agent writes any tokens coming from the LLM to the topic defined in "stream-to-topic".
242+
243+
In fact, LLMs internally work "one token at a time", and the native streaming capabilities of LangStream leverage this behavior for more "real-time" LLM interactions with lower latency.
244+
245+
There are two main configuration properties:
246+
247+
* stream-to-topic: the name of the topic to stream to
248+
* stream-response-completion-field: the field to set in the records sent to the stream-to-topic topic
249+
250+
Usually the value for "stream-response-completion-field" is "value". This means that the token from the LLM replaces the entire content of the "value" part of the message and you can serve it with a [gateway](./api-gateways/README.md) directly. Use "value" to write the result without a structured schema, or use "value.<field>" to write the result in a specific field.
251+
252+
The regular output of the agent is not changed by using "stream-to-topic". The message is still sent to the downstream agent (or output topic) when the whole sequence of tokens is received.
253+
254+
The agent groups tokens to limit the number of writes to the broker by creating sequences of up to "min-chunks-per-message". The first token is sent as soon as possible, then 2 chunks, then 4 chunks, and continues doubling until reaching the limit defined in "min-chunks-per-message".
255+
256+
Messages sent on the "stream-to-topic" are marked with special properties:
257+
258+
* stream-id: this is a string, that is the id of the whole answer
259+
* stream-index: this is a number (as string) of the index of the token in the sequence
260+
* stream-last-message: this is a boolean (as string, "true" or "false") that if "true" then the message is the last of the answer

0 commit comments

Comments
 (0)