Skip to content

Conversation

@Leomrlin
Copy link
Contributor

We're excited to introduce initial support for context-aware memory operations in Apache GeaFlow (incubating) through the integration of two key retrieval operators: Lucene-powered keyword search and embedding-based semantic search. This enhancement lays the foundational layer for building dynamic, AI-driven graph memory systems — enabling real-time, hybrid querying over structured graph data and unstructured semantic intent.

✅ Key Features Implemented

  • KeywordVector + Lucene Indexing: Enables fast, full-text retrieval of entities using BM25-style keyword matching. Ideal for surfacing exact or near-exact matches from entity attributes (e.g., names, emails, titles).
  • EmbeddingVector + Vector Index Store: Supports semantic search via high-dimensional embeddings. Queries are encoded using a configured embedding model and matched against pre-indexed node representations.
  • Hybrid VectorSearch Interface: Combines multiple vector types (keyword, embedding, traversal hints) into a single search context, paving the way for multimodal retrieval.
  • End-to-End Query Pipeline: From query ingestion → hybrid indexing → graph retrieval → context verbalization, demonstrated with LDBC-scale data.

🧪 Validated Use Cases

Our GraphMemoryTest suite demonstrates:

  • Resolving ambiguous queries like "Chaim Azriel" into multiple candidate persons using keyword + embedding fusion.
  • Traversing relationships (e.g., Comment_hasCreator_Person) in follow-up rounds via contextual refinement.
  • Iterative context expansion across multiple search cycles — mimicking agent memory evolution.

🔮 Why This Matters

This work represents the first step toward Graphiti-inspired, relationship-aware AI memory within GeaFlow:

Instead of treating context as static text, we model it as a dynamic, evolving subgraph, enriched by both semantic similarity and topological structure.

By leveraging GeaFlow’s native streaming graph engine, we aim to go beyond batch RAG — supporting incremental updates, temporal reasoning, and multi-hop inference at low latency.


Next Steps:
We propose incubating this as the GeaFlow Memory Engine, with upcoming support for:

  • Graph traversal-guided re-ranking
  • Agent session management with episodic memory
  • Integration with LLM agents for autonomous reasoning

This PR sets the stage: from graph analytics to graph-native AI memory.

Let’s build the future of contextual intelligence — on streaming graphs. 🚀

@yaozhongq yaozhongq requested a review from cbqiao December 29, 2025 07:52
Copy link

@Appointat Appointat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test

Copy link

@Appointat Appointat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your PR. Left some comments.

context.userSay(sentence);
return model.chat(context);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name singleSentence() is not accurate; it actually sends a single message and retrieves a reply.
Suggestion: rename to chat() or sendMessage()

Copy link
Contributor Author

@Leomrlin Leomrlin Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chat() has been replaced here

builder.append(json);
}
return builder.toString();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChatRobot is responsible for both chat and embedding, with a mixed set of responsibilities.

Suggestion: consider splitting into ChatService and EmbeddingService? May be better?

Copy link
Contributor Author

@Leomrlin Leomrlin Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two functions have been separate into ChatService and EmbeddingService.

public EmbeddingResult(String input, double[] embedding) {
this.input = input;
this.embedding = embedding;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about merge it to "EmbeddingResponse"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EmbeddingResponse and EmbeddingResult have no common fields and serve different purposes, so they are not suitable for merging


package org.apache.geaflow.ai.common.model;

public class ModelInfo {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to ModelConfig may be better. This is the mainstream naming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's now renamed to ModelConfig.

import java.util.List;


public class Response {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you need to integrate an API response compatible with the OpenAI/Gemini API, please import these variables. Viewing the confusing point in this code, why is the choice needed? If the Response class is a generic class (not only for LLM responses), then I think usage and choice may be not necessary; these information could be stored as meta attributes of the message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

choice is part of the interface and must be retained. Currently, Response is not yet a generic class, so it only converts the model's response output.

@Override
public String toString() {

StringBuilder builder = new StringBuilder();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Override
public String toString() {
    StringBuilder sb = new StringBuilder("TraversalVector{vec=");
    
    for (int i = 0; i < vec.length; i++) {
        if (i > 0) {
            sb.append(i % 3 == 0 ? "; " : "-");
        }
        sb.append(vec[i]);
        if (i % 3 == 2) {
            sb.append(">");
        }
    }
    
    return sb.append('}').toString();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been modified accordingly.

ChatRobot chatRobot = new ChatRobot();
chatRobot.setModelInfo(modelInfo);

final int BATCH_SIZE = 32;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Magic numbers should be extracted as constants or configured or environment variables. Other magic numbers should also be changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, model.Constants and GraphMemoryConfigKeys have been added, and all necessary literal values have been moved and declared within them.

@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not reviewed the io folder yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's okay.


private GraphSearchStore initSearchStore(Map<GraphEntity, List<IVector>> entityIndexMap) {
GraphSearchStore searchStore = new GraphSearchStore();
for (Map.Entry<GraphEntity, List<IVector>> entry : entityIndexMap.entrySet()) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we rebuild the in-memory index on every search (GraphSearchStore includes Lucene)? Is there a better optimization method? For example, clearing/initializing the GraphSearchStore instead of rebuilding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, there is no better solution at the moment. The algorithm requires that during each iteration, the Operator searches within the potential traversal area. Using the previous GraphSearchStore—whether based on the previous iteration or the entire graph—would inevitably lead to an expanding search scope. Fortunately, though, the retrieval area is not particularly large in the graph.

public EmbeddingOperator(GraphAccessor accessor, IndexStore store) {
this.graphAccessor = Objects.requireNonNull(accessor);
this.indexStore = Objects.requireNonNull(store);
this.threshold = 0.50;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be placed in the configuration, or, another optiona -- as a configurable parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, already modified.

@Leomrlin Leomrlin changed the title feat(dsl): Adding Lucene & Embedding-Based Search Operators to Apache GeaFlow (incubating) for Lightweight Context Memory feat(ai): Adding Lucene & Embedding-Based Search Operators to Apache GeaFlow (incubating) for Lightweight Context Memory Jan 6, 2026
}

public void setSubGraph(String sessionId, List<SubGraph> subGraphs) {
this.session2Graphs.put(sessionId, subGraphs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SessionManagement.createSession(String) only writes the time to session2ActiveTime but does not create the corresponding empty list in session2Graphs. GraphMemoryServer.verbalize directly calls sessionManagement.getSubGraph(sessionId); if subGraphList is null when calling new ArrayList<>(subGraphList.size()), it will throw a NullPointerException.

It is recommended to call session2Graphs.put(sessionId, new ArrayList<>()) when createSession(String) and createSession() are successful.
Also, change getSubGraph to return a non-null value.

Copy link
Contributor

@kitalkuyo-gita kitalkuyo-gita Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

samples:

// 将 Map 改为并发实现
private final ConcurrentMap<String, Long> session2ActiveTime = new ConcurrentHashMap<>();
private final ConcurrentMap<String, List<SubGraph>> session2Graphs = new ConcurrentHashMap<>();

public boolean createSession(String sessionId) {
    if (sessionId == null) {
        return false;
    }
    Long prev = session2ActiveTime.putIfAbsent(sessionId, System.nanoTime());
    if (prev != null) {
        return false;
    }
    // 初始化 subgraphs 为可变空列表,避免 NPE
    session2Graphs.putIfAbsent(sessionId, new ArrayList<>());
    return true;
}

public String createSession() {
    String sessionId = Constants.PREFIX_TMP_SESSION + System.nanoTime()
            + UUID.randomUUID().toString().replace("-", "").substring(0, 8);
    return createSession(sessionId) ? sessionId : null;
}

// 返回不可为 null 的 List(防止调用者 NPE)
public List<SubGraph> getSubGraph(String sessionId) {
    List<SubGraph> l = this.session2Graphs.get(sessionId);
    return l == null ? new ArrayList<>() : l;
}

public void setSubGraph(String sessionId, List<SubGraph> subGraphs) {
    // 安全性:确保 map 存在 key
    this.session2Graphs.put(sessionId, subGraphs == null ? new ArrayList<>() : subGraphs);
}

* @return an unmodifiable set of ignored characters
*/
private static Set<Character> buildIgnoredChars() {
Set<Character> ignored = new HashSet<>(EXCLUDED_CHARS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is recommended not to include EXCLUDED_CHARS in IGNORE_CHARS, as this may cause errors in SearchStore query string construction and semantic filtering.

For example, SubgraphSemanticPromptFunction.verbalize filters strings using .filter(str -> !SearchUtils.isAllAllowedChars(str)). An incorrect set of allowed characters will lead to incorrect filtering behavior (strings that should be kept are removed, and strings that should be removed are kept).

samples:

// SearchUtils.java: 修复 buildIgnoredChars()
private static final Set<Character> EXCLUDED_CHARS = new HashSet<>(Arrays.asList(
        '*', '#', '-', '?', '`', '{', '}', '[', ']', '(', ')', '>', '<', ':', '/', '.'
));

private static final Set<Character> IGNORE_CHARS = buildIgnoredChars();

private static Set<Character> buildIgnoredChars() {
    Set<Character> allowed = new HashSet<>();
    // 加入英文字母(大小写)
    for (char c = 'a'; c <= 'z'; c++) allowed.add(c);
    for (char c = 'A'; c <= 'Z'; c++) allowed.add(c);
    // 加入数字
    for (char c = '0'; c <= '9'; c++) allowed.add(c);
    // 加入常用安全字符(空格、下划线等)
    allowed.add(' ');
    allowed.add('_');
    allowed.add('-');
    allowed.add('@');
    allowed.add('+');
    allowed.add('!');
    allowed.add('$');
    allowed.add('%');
    allowed.add('&');
    allowed.add('=');
    allowed.add('~');
    // 不要加入 EXCLUDED_CHARS !
    return Collections.unmodifiableSet(allowed);
}

}
}
//recall compute
GraphSearchStore searchStore = initSearchStore(extendEntityIndexMap);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it would be better to explicitly call writer.commit() (or close()) in initSearchStore after all the addDoc operations are completed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants