docker compose run- Run the class
com.evoura.ververica.langchain4j.stream.VervericaLangchain4jApplicationto start Flink App - Runt the helper class
com.evoura.ververica.langchain4j.stream.ConsoleVervericaApplicationto start a chat
- Java 11 (project using jenv)
- Docker 27.X
- Run
./up.shto run the project
The application can run locally by executing the main method from the VervericaLangchain4jApplication class.
Also, a docker-compose file is available, it contains all required services
- Kafka Cluster
- Kafka UI
- Flink Cluster
- Ollama
- Open WebUI (to interact with Ollama)
This is a model used by Kafka to represent a chat message between the user and the AI model.
It is used in the following Kafka topics:
user-message- the user messagesai-response- the AI model response for the user messagechat-memory- the chat memory between the user and the AI model
userId- type: long
- description: the id of the user
chatId- type: long
- description: the id of the chat
messageId- type: long
- description: the id of the message
message- type: string
- description: the actual message from the user
response- type: string
- description: the response from the AI model
timestamp- type: timestamp
- description: the timestamp of when the message was created
{
"userId": 1,
"chatId": 1,
"messageId": 1,
"message": "Hello!",
"response": null,
"timestamp": 1694465095000
}{
"userId": 1,
"chatId": 1,
"messageId": 1,
"message": "Hello!",
"response": "Hello to you too!",
"timestamp": 1694465095000
}This is a Kafka record that represents the LLM configuration for a specific user.
TOPIC: llm-config
userId- type: long
- description: the id of the user
aiModel- type: string
- description: the name of the AI model
- possible values: "OLLAMA", "OPENAI"
properties- type: map<string,string>
- description: a map with all properties specific to an AI model
systemMessage- type: string
- description: a message that provides context, rules, or guidelines for the conversation
{
"userId": 1,
"aiModel": "OLLAMA",
"properties": {
"baseUrl": "http://localhost:11434",
"modelName": "llama3.1:latest"
},
"systemMessage": "You are an experienced coding mentor, help users understand programming concepts."
}A MapState is used as a Chat Memory storage in the ChatMemoryProcessingFunction.
It stores both the user message and the response at a key that represents the message id.
NOT IMPLEMENTED
- The state shall only retain a configurable maximum amount of messages in a sliding window type cleanup.
- The state shall be fully cleaned up if:
- The chat was closed (
isClosed=true) - A configurable amount of time has passed since the last interaction
- The chat was closed (
Leveraging Apache Flink in a Retrieval-Augmented Generation (RAG) system offers several key benefits:
- Granular performance optimization: By decoupling the retrieval and generation processes, Flink allows for more precise control over each stage.
- Real-time data querying: Flink can manage continuous data ingestion with a dedicated job, ensuring up-to-date data availability.
- Data transformation support: Flink's built-in functions enable efficient data transformation within the pipeline.
Here's a simplified diagram of the system:
Here, we see the system split into 3 main parts:
This stage processes data from multiple services, documents, or other sources via Kafka topics.
It then converts the data into embeddings using the embedding model, and stores the embeddings in the embedding store.
This component is responsible for taking the user's prompt and finding relevant embeddings.
Using these embeddings, it builds the context and publishes both the context and the prompt to a Kafka topic.
This stage consumes the user's prompt along with the context and sends it to the LLM (Large Language Model) to generate a response.
The generated answer is then sent to a Kafka topic.
Interacting with an LLM in Flink SQL can be done by implementing a UDF (User-Defined Function)
that is based on the AsyncTableFunction.
This is similar to a Table Function that is executed asynchronously.
The implementation should be similar to what we have done in the Flink DataStream API, check LangChainAsyncFunction.java for details.
Note: The AsyncTableFunction can only be used in a Lookup Connector.
Thus, a new connector that extends the LookupTableSource class is required.
To embed data and save it in an embedding store, we first need to define a table using the langchain-embedding connector, as shown below:
CREATE TABLE langchain_embedding (
input_data STRING,
response STRING
) WITH (
'connector' = 'langchain-embedding',
'langchain-embedding.model' = 'OLLAMA',
'langchain-embedding.model.base.url' = 'http://localhost:11434',
'langchain-embedding.model.name' = 'nomic-embed-text:latest',
'langchain-embedding.store' = 'QDRANT',
'langchain-embedding.store.host' = '<qdrant_host',
'langchain-embedding.store.port' = '<qdrant_port>',
'langchain-embedding.store.api.key' = '<qdrant_api_key',
'langchain-embedding.store.collection.name' = 'ververica'
);Then, we need to define a source for the input data:
CREATE TEMPORARY VIEW input_table AS
SELECT * FROM (
VALUES
('Ewok language is called Ewokese.', PROCTIME()),
('Ewokese was created by Ben Burtt.', PROCTIME()),
('Ewok language is Tibetan mixed with Kalmyk languages.', PROCTIME())
) AS input_table(input_data, `timestamp`);Finally, we to process the input data, we perform a lookup on the langchain_embedding table using the input data records:
SELECT e.*
FROM
input_table AS i
JOIN
langchain_embedding FOR SYSTEM_TIME AS OF i.`timestamp` AS e
ON
i.input_data = e.input_data;To interact with the LLM, we need to define a table using the langchain connector as shown below:
CREATE TABLE langchain_llm (
prompt STRING,
response STRING
) WITH (
'connector' = 'langchain',
'langchain.model' = 'OLLAMA',
'langchain.model.base.url' = 'http://localhost:11434',
'langchain.model.name' = 'llama3.1:latest',
'langchain.embedding.model' = 'OLLAMA',
'langchain.embedding.model.base.url' = 'http://localhost:11434',
'langchain.embedding.model.name' = 'nomic-embed-text:latest',
'langchain.embedding.store' = 'QDRANT',
'langchain.embedding.store.host' = '<qdrant_host>'',
'langchain.embedding.store.port' = '<qdrant_port>'',
'langchain.embedding.store.api.key' = '<qdrant_api_key>'',
'langchain.embedding.store.collection.name' = 'ververica'
);Then, we need a source for the user messages:
CREATE TEMPORARY VIEW user_messages AS
SELECT * FROM (
VALUES
('How is the Ewok language called?', PROCTIME()),
('Who created the Ewok language?', PROCTIME()),
('What are the languages behind Ewokese?', PROCTIME())
) AS predefined_messages(user_message, `timestamp`);Finally, to query the LLM, we perform a lookup on the langchain_llm table using the user's messages:
SELECT llm.*
FROM
user_messages AS chat
JOIN
langchain_llm FOR SYSTEM_TIME AS OF chat.`timestamp` AS llm
ON
chat.user_message = llm.prompt;| Config entry | Explanation | Default value | Required |
|---|---|---|---|
| langchain.model | AI model, possible values: DEFAULT, OPENAI, OLLAMA |
None | Required |
| langchain.model.base.url | Base URL of the AI model | None | Optional |
| langchain.model.api.key | API key to access the model | None | Optional |
| langchain.model.name | Name of the AI model | None | Optional |
| langchain.system.message | Sets the context or guidelines for the model’s behavior | None | Optional |
| langchain.prompt.template | Template for structuring prompts | See Prompt Template | Optional |
| langchain.embedding.model | Embedding model, possible values: DEFAULT, OLLAMA |
DEFAULT | Optional |
| langchain.embedding.model.base.url | Base URL of the embedding model | None | Optional |
| langchain.embedding.model.name | Name of the embedding model | None | Optional |
| langchain.embedding.store | Embedding store, possible values: DEFAULT, QDRANT |
DEFAULT | Optional |
| langchain.embedding.store.host | Host address of the embedding store | None | Optional |
| langchain.embedding.store.port | Port for the embedding store connection | None | Optional |
| langchain.embedding.store.api.key | API key for authenticating with the embedding store | None | Optional |
| langchain.embedding.store.collection.name | Collection name in the embedding store | None | Optional |
| langchain.embedding.store.min.score | Minimum score threshold for results in the embedding store | 0.7 | Optional |
| langchain.embedding.store.max.results | Maximum number of results to return from the embedding store | 1000 | Optional |
The prompt template is a structured guide for formatting input queries, helping the model retrieve and generate responses based on relevant external information. The following variables can be used to insert data into the template:
message, representing the user messageinformation, representing the relevant information from the embedded store
By default, it has the following value:
Answer the following message:
Message:
{{message}}
Base your answer only on the following information:
{{information}}
Do not include any extra information.
| Config entry | Explanation | Default value | Required |
|---|---|---|---|
| langchain-embedding.model | Embedding AI model, possible values: DEFAULT, OLLAMA |
None | Required |
| langchain-embedding.model.base.url | Base URL for the embedding model | None | Optional |
| langchain-embedding.model.name | Name of the embedding model | None | Optional |
| langchain-embedding.store | Embedding store, possible values: DEFAULT, QDRANT |
None | Required |
| langchain-embedding.store.host | Host address for the embedding store | None | Optional |
| langchain-embedding.store.port | Port for the embedding store | None | Optional |
| langchain-embedding.store.api.key | API key for the embedding store | None | Optional |
| langchain-embedding.store.collection.name | Collection name in the embedding store | None | Optional |
Maintaining the lookup syntax is required for Flink to properly interrogate using the lookup connector.
A thing to keep in mind is that built-in support for ML models will be available in Flink soon.
More details about this change can be found in FLIP-437.
Also, there is FLIP-440 which will add support for user-defined SQL operators that are stateful and might be a better option.
But, this FLIP is in a very early stage and there was no official discussion around it.
The Flink SQL POC with the Langchain Lookup connector can be executed by running the main method from the VervericaLangchain4jSqlApplication class.
It initializes a Flink Table Execution environment, executes the Flink SQL statements, and prints the chat to the console.
The Langchain connector is configured to connect to OLLAMA, which can be deployed using the docker-compose file.