diff --git a/simple-agentic-flow-flink-sql/1-create-connections-with-confluent-cli.md b/simple-agentic-flow-flink-sql/1-create-connections-with-confluent-cli.md new file mode 100644 index 00000000..c00f040e --- /dev/null +++ b/simple-agentic-flow-flink-sql/1-create-connections-with-confluent-cli.md @@ -0,0 +1,39 @@ +# Setting up connections to OpenAI and Pinecone + +Documentation reference: https://docs.confluent.io/confluent-cli/current/command-reference/flink/connection/confluent_flink_connection_create.html + + +## Pinecone + +```bash +confluent flink connection create pinecone-connection --environment your-confluent-environment-id \ +--cloud AWS \ +--region us-east-1 \ +--type pinecone \ +--endpoint https://change-with-your-pinecone-endpoint.pinecone.io/query \ +--api-key change-with-your-pinecone-key +``` + +## OpenAI embedding + +```bash +confluent flink connection create openai-connection-vector-embeddings \ +--environment your-confluent-environment-id \ +--cloud AWS \ +--region us-east-1 \ +--type openai \ +--endpoint https://api.openai.com/v1/embeddings \ +--api-key change-with-your-open-ai-key +``` + +## OpenAI completions + +```bash +confluent flink connection create openai-connection-completions \ +--cloud AWS \ +--region us-east-1 \ +--environment your-confluent-environment-id \ +--type openai \ +--endpoint https://api.openai.com/v1/chat/completions \ +--api-key change-with-your-open-ai-key +``` diff --git a/simple-agentic-flow-flink-sql/2-create-tables-and-models.sql b/simple-agentic-flow-flink-sql/2-create-tables-and-models.sql new file mode 100644 index 00000000..74b26250 --- /dev/null +++ b/simple-agentic-flow-flink-sql/2-create-tables-and-models.sql @@ -0,0 +1,295 @@ +------ ##### SETTING UP TABLES ##### ------- + +---------------------------------- INPUT TOPIC ------------------------------------------------------------------------------------------------------------- + +-------- Input table for customer messages from the chat + +CREATE TABLE customer_message +( + conversation_id STRING NOT NULL, + customer_id STRING NOT NULL, + cusomer_message STRING NOT NULL, + `timestamp` BIGINT NOT NULL +); + +---------------------------------- EMBEDDINGS MODEL and TOPICS -------------------------------------------------------------------------------------------------- + +-------- Add embedding model + +CREATE +MODEL openai_embeddings +INPUT (input STRING) +OUTPUT (embedding ARRAY) +WITH( + 'task' = 'embedding', + 'provider'= 'openai', + 'openai.input_format'='OPENAI-EMBED', + 'openai.model_version'='text-embedding-3-small', + 'openai.connection' = 'openai-connection-vector-embeddings' ); + +CREATE TABLE vector_store_docs +( + text STRING, + embedding ARRAY +) WITH ( + 'connector' = 'pinecone', + 'pinecone.connection' = 'pinecone-pet-care-connection', + 'pinecone.embedding_column' = 'embedding' + ); + +CREATE TABLE customer_message_and_embedding +( + conversation_id STRING NOT NULL, + customer_id STRING NOT NULL, + cusomer_message STRING, + `timestamp` BIGINT NOT NULL, + embedding array +); + +CREATE FUNCTION CLEAN_PINCONE_VECTOR_RESULT AS 'com.example.my.CleanPineConeVectorResult' + USING JAR 'confluent-artifact://cfa-0x3jq6'; + +CREATE TABLE customer_message_and_resources +( + conversation_id STRING NOT NULL, + customer_id STRING NOT NULL, + cusomer_message STRING, + `timestamp` BIGINT NOT NULL, + relevant_documentation STRING NOT NULL +); + + +---------------------------------- CUSTOMER INFO & MONGODB -------------------------------------------------------------------------------------------------- + +-------- Get info from MongoDB +SELECT * +from `mongo.pet-care.customer_info` + +SELECT customer_id, pet_name +from `mongo.pet-care.customer_info` +DROP TABLE customer_message_and_customer_info +CREATE TABLE customer_message_and_customer_info +( + conversation_id STRING NOT NULL, + customer_id STRING NOT NULL, + cusomer_message STRING, + `timestamp` BIGINT NOT NULL, + relevant_documentation STRING NOT NULL, + pet_name STRING, + customer_email STRING, + pet_birthdate STRING, + pet_gender STRING, + allergies STRING, + pet_type STRING +); + +---------------------------------- LLM ------------------------------ + +CREATE +MODEL helpful_chatbot +INPUT(text STRING) +OUTPUT(chatbot_response STRING) +COMMENT 'chatbot based on openai gpt 3.5 turbo' +WITH ( + 'provider' = 'openai', + 'task' = 'text_generation', + 'openai.connection' = 'openai-connection-completions', + 'openai.model_version' = 'gpt-3.5-turbo', + 'openai.system_prompt' = + 'You are a helpful agent that has deep knowledge of pet care. Answer the question from the customer based on the information about customer pet and provided piece of documentation. Give your extended advice on the issue. Talk about the pet using its name. Be friendly and respectful'); + +CREATE TABLE chatbot_output +( + conversation_id STRING NOT NULL, + customer_id STRING NOT NULL, + cusomer_message STRING NOT NULL, + chatbot_response STRING +); + +------ ##### Adding triage step ##### ------ + +CREATE +MODEL initial_triage +INPUT(text STRING) +OUTPUT(triage_result STRING) +COMMENT 'chatbot based on openai gpt 3.5 turbo' +WITH ( + 'provider' = 'openai', + 'task' = 'text_generation', + 'openai.connection' = 'openai-connection-completions', + 'openai.model_version' = 'gpt-3.5-turbo', + 'openai.system_prompt' = + 'You are an agent that triages customer requests. Customers can do the following actions: +- GET_INFO, +- SCHEDULE_VET, +- BOOK_GROOMING, +- RAISE_ISSUE. + +- GET_INFO is for any questions related to pet management and health that does not require vet involvement; +- SCHEDULE_VET - when there is a need based on your analysis; +- BOOK_GROOMING - when customer asks for a grooming appointment; +- RAISE_ISSUE - for issues related to services provided by the company. + +YOUR TASK: based on content of customer message return ONE action type that fits best: GET_INFO, SCHEDULE_VET, BOOK_GROOMING or RAISE_ISSUE. DO NOT GIVE ANY OTHER INFORMATION, just type of the action.'); + +CREATE TABLE triage_output +( + conversation_id STRING NOT NULL, + customer_id STRING NOT NULL, + cusomer_message STRING NOT NULL, + `timestamp` BIGINT NOT NULL, + relevant_documentation STRING NOT NULL, + pet_name STRING, + customer_email STRING, + pet_birthdate STRING, + pet_gender STRING, + allergies STRING, + pet_type STRING, + triage_result STRING +); + + +INSERT INTO triage_output +SELECT conversation_id, + customer_id, + cusomer_message, + `timestamp`, + relevant_documentation, + pet_name, + customer_email, + pet_birthdate, + pet_gender, + allergies, + pet_type, + triage_result +FROM `customer_message_and_customer_info`, + LATERAL TABLE(ML_PREDICT('initial_triage', + CONCAT( + 'This is customer information: they has a pet who is ', pet_type, + '. It is ', pet_gender, + '. It was born on ', pet_birthdate, + '. It has these allergies: ', allergies, + '. This is customer request: ', + cusomer_message, + '. This is relevant documentation: ', + relevant_documentation, + 'YOUR TASK: return one of the following actions that best correspond to customer request: GET_INFO, SCHEDULE_VET, BOOK_GROOMING or RAISE_ISSUE' + ))); + +--------- Tables per task + +CREATE TABLE recommendation_requests +( + conversation_id STRING NOT NULL, + customer_id STRING NOT NULL, + cusomer_message STRING NOT NULL, + `timestamp` BIGINT NOT NULL, + relevant_documentation STRING NOT NULL, + pet_name STRING, + customer_email STRING, + pet_birthdate STRING, + pet_gender STRING, + allergies STRING, + pet_type STRING, +); + +CREATE TABLE schedule_vet_requests +( + conversation_id, + customer_id, + cusomer_message, + `timestamp`, + relevant_documentation, + pet_name, + customer_email, + pet_birthdate, + pet_gender, + allergies, + pet_type +); + +CREATE TABLE book_grooming_requests +( + conversation_id, + customer_id, + cusomer_message, + `timestamp`, + relevant_documentation, + pet_name, + customer_email, + pet_birthdate, + pet_gender, + allergies, + pet_type +); + +CREATE TABLE raise_issue_requests +( + conversation_id, + customer_id, + cusomer_message, + `timestamp`, + relevant_documentation, + pet_name, + customer_email, + pet_birthdate, + pet_gender, + allergies, + pet_type +); + +--------- Recommendations + +CREATE MODEL recommendation +INPUT(text STRING) +OUTPUT(recommendation STRING) +COMMENT 'chatbot based on openai gpt 3.5 turbo' +WITH ( + 'provider' = 'openai', + 'task' = 'text_generation', + 'openai.connection' = 'openai-connection-completions', + 'openai.model_version' = 'gpt-3.5-turbo', + 'openai.system_prompt' = + 'You are a helpful agent that has deep knowledge of pet care. Answer the question or concern from the customer based on the information about customer pet and provided piece of documentation. Give your extended advice on the issue. Talk about the pet using its name. Be friendly and respectful'); + + +CREATE TABLE recommendations_output( + conversation_id STRING NOT NULL, + customer_id STRING NOT NULL, + cusomer_message STRING NOT NULL, + recommendation STRING +); + +INSERT INTO `recommendations_output` +SELECT + conversation_id, + customer_id, + cusomer_message, + recommendation +FROM `customer_message_and_customer_info`, + LATERAL TABLE(ML_PREDICT('recommendation', + CONCAT( + 'This is customer information. He has a pet who is ', pet_type, + '. It is ', pet_gender, + '. It was born on ', pet_birthdate, + '. It has these allergies: ', allergies, + '. This is customer question: ', + cusomer_message, + '. This is relevant documentation: ', + relevant_documentation + ))); + + +--------- Schedule + +CREATE MODEL schedule +INPUT(text STRING) +OUTPUT(recommendation STRING) +COMMENT 'chatbot based on openai gpt 3.5 turbo' +WITH ( + 'provider' = 'openai', + 'task' = 'text_generation', + 'openai.connection' = 'openai-connection-completions', + 'openai.model_version' = 'gpt-3.5-turbo', + 'openai.system_prompt' = + 'You are a helpful agent that has deep knowledge of pet care. Answer the question or concern from the customer based on the information about customer pet and provided piece of documentation. Give your extended advice on the issue. Talk about the pet using its name. Be friendly and respectful'); diff --git a/simple-agentic-flow-flink-sql/3-setup-pipeline-flow.sql b/simple-agentic-flow-flink-sql/3-setup-pipeline-flow.sql new file mode 100644 index 00000000..566ffd0c --- /dev/null +++ b/simple-agentic-flow-flink-sql/3-setup-pipeline-flow.sql @@ -0,0 +1,88 @@ +---------------------------------- ADD TO INPUT TOPIC ------------------------------------------ + +INSERT INTO customer_message values + ('conversation_6', + 'customer_3', + 'My cat does not want to eat, what can I do?' + , UNIX_TIMESTAMP()); + + +---------------------------------- CHECK INPUT TOPIC ------------------------------------------ +SELECT * FROM customer_message WHERE customer_id = 'customer_3' + +---------------------------------- CALL TO EMBEDDING API ------------------------------------------ +INSERT INTO customer_message_and_embedding +SELECT * FROM customer_message, lateral table(ml_predict('openai_embeddings', cusomer_message)); + +---------------------------------- CHECK DATA WITH EMBEDDING ------------------------------------------ +SELECT * FROM customer_message_and_embedding WHERE customer_id = 'customer_3' + + +---------------------------------- SEMANTIC SEARCH ------------------------------------------ +INSERT INTO vector_store_result +SELECT * FROM customer_message_and_embedding, + LATERAL TABLE(FEDERATED_SEARCH('vector_store_docs', 3, embedding)); + +---------------------------------- CHECK RESULTS FROM SEMANTIC SEARCH ------------------------------------------ +SELECT * FROM vector_store_result WHERE customer_id = 'customer_3' + +---------------------------------- CLEAN SEMANTIC SEARCH RESULTS WITH UDF ------------------------------------------ + +INSERT INTO customer_message_and_resources +SELECT conversation_id, customer_id, cusomer_message, `timestamp`, + CLEAN_PINCONE_VECTOR_RESULT(search_results) AS relevant_documentation +FROM vector_store_result + +---------------------------------- CHECK CLEANED DATA ------------------------------------------ +SELECT * FROM customer_message_and_resources WHERE customer_id = 'customer_3' + + +---------------------------------- GET CUSTOMER AND ITS PET INFO FROM MONGODB ------------------------------------------ + +INSERT INTO customer_message_and_customer_info +SELECT + customer_message_and_resources.conversation_id, + customer_message_and_resources.customer_id, + customer_message_and_resources.cusomer_message, + customer_message_and_resources.`timestamp`, + customer_message_and_resources.relevant_documentation, + customers.pet_name, + customers.customer_email, + customers.pet_birthdate, + customers.pet_gender, + customers.allergies, + customers.pet_type +FROM customer_message_and_resources + INNER JOIN `mongo.pet-care.customer_info` customers + ON customer_message_and_resources.customer_id=customers.customer_id; + +---------------------------------- CHECK CUSTOMER AND ITS PET INFO ------------------------------------------ + +SELECT * FROM customer_message_and_customer_info WHERE customer_id = 'customer_3' + + +---------------------------------- SEND INFERENCE REQUEST TO LLM ------------------------------------------ + +INSERT INTO `chatbot_output` +SELECT + conversation_id, + customer_id, + cusomer_message, + chatbot_response +FROM `customer_message_and_customer_info`, + LATERAL TABLE(ML_PREDICT('helpful_chatbot', + CONCAT( + 'This is customer information. He has a pet who is ', pet_type, + '. It is ', pet_gender, + '. It was born on ', pet_birthdate, + '. It has these allergies: ', allergies, + '. This is customer question: ', + cusomer_message, + '. This is relevant documentation: ', + relevant_documentation + ))); + + +---------------------------------- SEE QUESTION AND ANSWER ------------------------------------------ + +SELECT cusomer_message, chatbot_response FROM chatbot_output WHERE customer_id = 'customer_3' \ No newline at end of file diff --git a/simple-agentic-flow-flink-sql/README.md b/simple-agentic-flow-flink-sql/README.md new file mode 100644 index 00000000..b24503c4 --- /dev/null +++ b/simple-agentic-flow-flink-sql/README.md @@ -0,0 +1,77 @@ +# Building Agentic Systems with Apache Kafka and Apache Flink + +This repository demonstrates how to create a simple agentic workflow using **Apache Kafka, Apache Flink, OpenAI models, Pinecone, and MongoDB**. + +You can watch the recording of the talk **"Building Agentic Systems with Apache Kafka and Apache Flink"** by **Olena Kutsenko** at *Current Bengaluru 2025* for further insights. + +## Use Case: Agentic System for Pet Owners + +This system is designed as a **one-stop solution** for pet owners, enabling them to: + +- Retrieve reliable pet care information +- Book veterinary appointments +- Schedule grooming sessions +- Submit support tickets for pet-related concerns + +## Technology Stack + +We use the following technologies: + +- **Apache Kafka** – For event-driven architecture +- **Apache Flink (SQL)** – To interact with models and external data sources +- **OpenAI models** – For embedding and reasoning +- **Pinecone** – For storing vector data +- **MongoDB** – For managing customer data + +In this example, we leverage **Confluent services**, using a **Confluent compute pool** with **30 Confluent Flink Units**. + + + +## Setting Up Connections Using Confluent CLI + +Once external sources like **Pinecone** and **OpenAI** are configured, use the **Confluent CLI** to establish secure connections. Refer to [`1-create-connections-with-confluent-cli.md`](1-create-connections-with-confluent-cli.md) for examples. + +For detailed documentation, see: +[Confluent CLI - Creating Flink Connections](https://docs.confluent.io/confluent-cli/current/command-reference/flink/connection/confluent_flink_connection_create.html) + + + +## Accessing External Customer Data in MongoDB + +In this demo, **MongoDB** is used to store customer data. We stream data from **MongoDB** into a **Kafka topic** using a **Kafka connector**. You can use a similar approach for other data storage solutions. + +For a more detailed MongoDB setup in an **RAG scenario**, check the example in this repository: +[Agentic RAG Example](https://github.com/confluentinc/demo-scene/tree/master/agentic-rag) + +Make sure to define the **MongoDB record schema** (see `mongodb-schema.json`). +Example records can be found in `customer-data-example.json`. + + + +## Populating Pinecone with Vector Data + +Example values used in this demo can be found in `documentation-sample.json`. Check [this article](https://www.confluent.io/blog/flink-ai-rag-with-federated-search/) for end-to-end example of semantic search with Pinecone and OpenAI. + +## Creating Tables and Models + +The SQL queries for setting up tables and models used in this demo are available in [`2-create-tables-and-models.sql`](2-create-tables-and-models.sql). + + + +## Setting Up the Pipeline + +Follow the step-by-step pipeline setup in [`3-setup-pipeline-flow.sql`](3-setup-pipeline-flow.sql). + + +## Additional resources + +- [Flink AI: Hands-On FEDERATED_SEARCH()—Search a Vector Database with Confluent Cloud for Apache Flink®](https://www.confluent.io/blog/flink-ai-rag-with-federated-search/) +- [Flink AI: Real-Time ML and GenAI Enrichment of Streaming Data with Flink SQL on Confluent Cloud](https://www.confluent.io/blog/flinkai-realtime-ml-and-genai-confluent-cloud/) +- [Using Apache Flink® for Model Inference: A Guide for Real-Time AI Applications](https://www.confluent.io/blog/using-flink-for-model-inference-a-guide-for-realtime-ai-applications/) +- [What are AI agents?](https://www.ibm.com/think/topics/ai-agents) +- [Building effective agents](https://www.anthropic.com/research/building-effective-agents) +- [Understanding the planning of LLM agents: A survey](https://arxiv.org/pdf/2402.02716) +- [MCP](https://www.anthropic.com/news/model-context-protocol) +- [MCP Confluent GitHub repo](https://github.com/confluentinc/mcp-confluent) + + diff --git a/simple-agentic-flow-flink-sql/customer-data-example.json b/simple-agentic-flow-flink-sql/customer-data-example.json new file mode 100644 index 00000000..d1e700cd --- /dev/null +++ b/simple-agentic-flow-flink-sql/customer-data-example.json @@ -0,0 +1,46 @@ +[ + { + "_id": { "$oid": "66c6469f00a6ce3528112034" }, + "customer_id": "customer_0", + "pet_id": "pet_0", + "pet_name": "Whiskers", + "customer_email": "jane.doe@example.com", + "pet_birthdate": "05.05.2017", + "pet_gender": "female", + "allergies": "none", + "pet_type": "cat" + }, + { + "_id": { "$oid": "66c6469f00a6ce3528112035" }, + "customer_id": "customer_2", + "pet_id": "pet_2", + "pet_name": "Buddy", + "customer_email": "bob.smith@example.com", + "pet_birthdate": "10.10.2016", + "pet_gender": "male", + "allergies": "beef", + "pet_type": "dog" + }, + { + "_id": { "$oid": "66c6469f00a6ce3528112036" }, + "customer_id": "customer_3", + "pet_id": "pet_3", + "pet_name": "Luna", + "customer_email": "alice.jones@example.com", + "pet_birthdate": "20.02.2018", + "pet_gender": "female", + "allergies": "fish", + "pet_type": "cat" + }, + { + "_id": { "$oid": "66c6469f00a6ce3528112037" }, + "customer_id": "customer_4", + "pet_id": "pet_4", + "pet_name": "Rocky", + "customer_email": "mike.brown@example.com", + "pet_birthdate": "15.08.2015", + "pet_gender": "male", + "allergies": "none", + "pet_type": "dog" + } +] diff --git a/simple-agentic-flow-flink-sql/documentation-sample.json b/simple-agentic-flow-flink-sql/documentation-sample.json new file mode 100644 index 00000000..1e78e8c1 --- /dev/null +++ b/simple-agentic-flow-flink-sql/documentation-sample.json @@ -0,0 +1,17 @@ +{ + "textBlocks": [ + "Cat Health and Medical Advice: If your cat is experiencing excessive grooming, loss of appetite, or unusual lethargy, it may be a sign of underlying issues such as dental pain, kidney problems, or stress. Monitor changes closely and consult your vet if symptoms persist.", + "Nutrition and Diet Recommendations for Cats: Cats require a high-protein diet with balanced fats and minimal carbohydrates. Choose premium cat food that caters to your cat’s age, weight, and health needs, and consult your vet about any specific dietary supplements or restrictions.", + "Cat Behavioral and Training Guidance: Cats can be sensitive to environmental changes. For issues like scratching furniture or litter box aversion, use positive reinforcement, provide stimulating toys, and ensure a quiet, safe space to help reduce stress and encourage positive behaviors.", + "Preventive Care and Routine Maintenance for Cats: Regular veterinary check-ups, vaccinations, and dental care are essential. A yearly exam helps detect common issues like obesity, feline diabetes, or dental disease early, ensuring your cat stays healthy and active.", + "Grooming and Daily Care for Cats: Regular brushing is key, especially for long-haired cats, to prevent matting and hairballs. Trim nails as needed, and check ears and eyes for signs of infection. Keeping grooming sessions positive helps your cat feel comfortable and relaxed.", + "Emergency and First-Aid Information for Cats: In emergencies such as poisoning, severe injury, or difficulty breathing, act quickly. Familiarize yourself with cat-specific first-aid measures and keep your vet’s emergency contact information readily available.", + "Customized Advice Based on Cat Profiles: With detailed customer data, personalized tips can be provided for cats. For example, if your cat is an indoor breed prone to obesity, advice might include interactive play sessions and portion control recommendations.", + "Accessing Expert Articles and Resources for Cats: Our semantic search tool can fetch up-to-date articles on topics like feline nutrition, behavior modification, and preventive care. Get concise summaries of expert insights to help guide your cat care decisions.", + "Seasonal Cat Care Tips: In summer, ensure your cat stays cool and hydrated, and in winter, provide a warm, cozy spot to retreat. Seasonal changes can affect your cat’s behavior and health, so adjust their care routine accordingly.", + "Mental Health and Enrichment for Cats: Cats benefit from mental stimulation and play. Offer puzzle feeders, interactive toys, or even window perches to keep them engaged and reduce stress, particularly if they are indoor-only cats.", + "Travel and Outing Preparations for Cats: Traveling with a cat requires careful planning. Secure a comfortable carrier, familiarize your cat with travel conditions gradually, and consult your vet for tips on reducing travel anxiety.", + "Holistic and Alternative Care Options for Cats: Explore complementary therapies such as acupuncture, herbal supplements, or massage for cats experiencing chronic pain or stress. Always discuss these alternatives with your vet to ensure they suit your cat's needs.", + "Post-Appointment Follow-Up for Cats: After a vet visit, monitor your cat's recovery and behavior closely. Our system can send reminders for follow-up appointments and offer tips for at-home care to support your cat's healing process." + ] +} diff --git a/simple-agentic-flow-flink-sql/mongodb-schema.json b/simple-agentic-flow-flink-sql/mongodb-schema.json new file mode 100644 index 00000000..fb5c101a --- /dev/null +++ b/simple-agentic-flow-flink-sql/mongodb-schema.json @@ -0,0 +1,44 @@ +{ + "type": "record", + "name": "PetRecord", + "fields": [ + { + "name": "_id", + "type": "string", + "doc": "MongoDB ObjectId as a string" + }, + { + "name": "customer_id", + "type": "string" + }, + { + "name": "pet_id", + "type": "string" + }, + { + "name": "pet_name", + "type": "string" + }, + { + "name": "customer_email", + "type": "string" + }, + { + "name": "pet_birthdate", + "type": "string", + "doc": "Date in dd.mm.yyyy format" + }, + { + "name": "pet_gender", + "type": "string" + }, + { + "name": "allergies", + "type": "string" + }, + { + "name": "pet_type", + "type": "string" + } + ] +}