Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.configuration.ReadableConfiguration;
import org.apache.flink.agents.api.memory.BaseLongTermMemory;
import org.apache.flink.agents.api.metrics.FlinkAgentsMetricGroup;
import org.apache.flink.agents.api.resource.Resource;
import org.apache.flink.agents.api.resource.ResourceType;
Expand Down Expand Up @@ -56,6 +57,13 @@ public interface RunnerContext {
*/
MemoryObject getShortTermMemory() throws Exception;

/**
* Gets the long-term memory.
*
* @return The long-term memory instance
*/
BaseLongTermMemory getLongTermMemory() throws Exception;

/**
* Gets the metric group for Flink Agents.
*
Expand Down Expand Up @@ -100,4 +108,7 @@ public interface RunnerContext {
* @return the option value of the action config.
*/
Object getActionConfigValue(String key);

/** Clean up the resource. */
void close() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.agents.api.memory;

import org.apache.flink.agents.api.memory.compaction.CompactionStrategy;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Map;

/**
* Base interface for long-term memory management. It provides operations to create, retrieve,
* delete, and search memory sets, which are collections of memory items. A memory set can store
* items of a specific type (e.g., String or ChatMessage) and has a capacity limit. When the
* capacity is exceeded, a compaction strategy is applied to manage the memory set size.
*/
public interface BaseLongTermMemory extends AutoCloseable {

/**
* Gets an existing memory set or creates a new one if it doesn't exist.
*
* @param name the name of the memory set
* @param itemType the type of items stored in the memory set
* @param capacity the maximum number of items the memory set can hold
* @param strategy the compaction strategy to use when the capacity is exceeded
* @return the existing or newly created memory set
* @throws Exception if the memory set cannot be created or retrieved
*/
MemorySet getOrCreateMemorySet(
String name, Class<?> itemType, int capacity, CompactionStrategy strategy)
throws Exception;

/**
* Gets an existing memory set by name.
*
* @param name the name of the memory set to retrieve
* @return the memory set with the given name
* @throws Exception if the memory set does not exist or cannot be retrieved
*/
MemorySet getMemorySet(String name) throws Exception;

/**
* Deletes a memory set by name.
*
* @param name the name of the memory set to delete
* @return true if the memory set was successfully deleted, false if it didn't exist
* @throws Exception if the deletion operation fails
*/
boolean deleteMemorySet(String name) throws Exception;

/**
* Gets the number of items in the memory set.
*
* @param memorySet the memory set to count items in
* @return the number of items in the memory set
* @throws Exception if the size cannot be determined
*/
long size(MemorySet memorySet) throws Exception;

/**
* Adds items to the memory set. If IDs are not provided, they will be automatically generated.
* This method may trigger compaction if the memory set capacity is exceeded.
*
* @param memorySet the memory set to add items to
* @param memoryItems the items to be added to the memory set
* @param ids optional list of IDs for the items. If null or shorter than memoryItems, IDs will
* be auto-generated for missing items
* @param metadatas optional list of metadata maps for the items. Each metadata map corresponds
* to an item at the same index
* @return list of IDs of the added items
* @throws Exception if items cannot be added to the memory set
*/
List<String> add(
MemorySet memorySet,
List<?> memoryItems,
@Nullable List<String> ids,
@Nullable List<Map<String, Object>> metadatas)
throws Exception;

/**
* Retrieves memory items from the memory set. If no IDs are provided, all items in the memory
* set are returned.
*
* @param memorySet the memory set to retrieve items from
* @param ids optional list of item IDs to retrieve. If null, all items are returned
* @return list of memory set items. If ids is provided, returns items matching those IDs. If
* ids is null, returns all items in the memory set
* @throws Exception if items cannot be retrieved from the memory set
*/
List<MemorySetItem> get(MemorySet memorySet, @Nullable List<String> ids) throws Exception;

/**
* Deletes memory items from the memory set. If no IDs are provided, all items in the memory set
* are deleted.
*
* @param memorySet the memory set to delete items from
* @param ids optional list of item IDs to delete. If null, all items in the memory set are
* deleted
* @throws Exception if items cannot be deleted from the memory set
*/
void delete(MemorySet memorySet, @Nullable List<String> ids) throws Exception;

/**
* Performs semantic search on the memory set to find items related to the query string.
*
* @param memorySet the memory set to search in
* @param query the query string for semantic search
* @param limit the maximum number of items to return
* @param extraArgs additional arguments for the search operation (e.g., filters, distance
* metrics)
* @return list of memory set items that are most relevant to the query, ordered by relevance
* @throws Exception if the search operation fails
*/
List<MemorySetItem> search(
MemorySet memorySet, String query, int limit, Map<String, Object> extraArgs)
throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.agents.api.memory;

import org.apache.flink.agents.api.configuration.ConfigOption;

public class LongTermMemoryOptions {
public enum LongTermMemoryBackend {
EXTERNAL_VECTOR_STORE("external_vector_store");

private final String value;

LongTermMemoryBackend(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}

/** The backend for long-term memory. */
public static final ConfigOption<LongTermMemoryBackend> BACKEND =
new ConfigOption<>("long-term-memory.backend", LongTermMemoryBackend.class, null);

/** The name of the vector store to server as the backend for long-term memory. */
public static final ConfigOption<String> EXTERNAL_VECTOR_STORE_NAME =
new ConfigOption<>("long-term-memory.external-vector-store-name", String.class, null);

/** Whether execute compaction asynchronously . */
public static final ConfigOption<Boolean> ASYNC_COMPACTION =
new ConfigOption<>("long-term-memory.async-compaction", Boolean.class, false);

/** The thread count of executor for async compaction. */
public static final ConfigOption<Integer> THREAD_COUNT =
new ConfigOption<>("long-term-memory.async-compaction.thread-count", Integer.class, 16);
}
159 changes: 159 additions & 0 deletions api/src/main/java/org/apache/flink/agents/api/memory/MemorySet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.agents.api.memory;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.agents.api.memory.compaction.CompactionStrategy;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class MemorySet {
private final String name;
private final Class<?> itemType;
private final int capacity;
private final CompactionStrategy strategy;
private @JsonIgnore BaseLongTermMemory ltm;

@JsonCreator
public MemorySet(
@JsonProperty("name") String name,
@JsonProperty("itemType") Class<?> itemType,
@JsonProperty("capacity") int capacity,
@JsonProperty("strategy") CompactionStrategy strategy) {
this.name = name;
this.itemType = itemType;
this.capacity = capacity;
this.strategy = strategy;
}

/**
* Gets the number of items in this memory set.
*
* @return the number of items in the memory set
* @throws Exception if the size cannot be determined
*/
public long size() throws Exception {
return this.ltm.size(this);
}

/**
* Adds items to this memory set. If IDs are not provided, they will be automatically generated.
* This method may trigger compaction if the memory set capacity is exceeded.
*
* @param memoryItems the items to be added to the memory set
* @param ids optional list of IDs for the items. If null or shorter than memoryItems, IDs will
* be auto-generated for missing items
* @param metadatas optional list of metadata maps for the items. Each metadata map corresponds
* to an item at the same index
* @return list of IDs of the added items
* @throws Exception if items cannot be added to the memory set
*/
public List<String> add(
List<?> memoryItems,
@Nullable List<String> ids,
@Nullable List<Map<String, Object>> metadatas)
throws Exception {
return this.ltm.add(this, memoryItems, ids, metadatas);
}

/**
* Retrieves memory items from this memory set. If no IDs are provided, all items in the memory
* set are returned.
*
* @param ids optional list of item IDs to retrieve. If null, all items are returned
* @return list of memory set items. If ids is provided, returns items matching those IDs. If
* ids is null, returns all items in the memory set
* @throws Exception if items cannot be retrieved from the memory set
*/
public List<MemorySetItem> get(@Nullable List<String> ids) throws Exception {
return this.ltm.get(this, ids);
}

/**
* Performs semantic search on this memory set to find items related to the query string.
*
* @param query the query string for semantic search
* @param limit the maximum number of items to return
* @param extraArgs optional additional arguments for the search operation (e.g., filters,
* distance metrics). If null, an empty map is used
* @return list of memory set items that are most relevant to the query, ordered by relevance
* @throws Exception if the search operation fails
*/
public List<MemorySetItem> search(
String query, int limit, @Nullable Map<String, Object> extraArgs) throws Exception {
return this.ltm.search(
this, query, limit, extraArgs == null ? Collections.emptyMap() : extraArgs);
}

public void setLtm(BaseLongTermMemory ltm) {
this.ltm = ltm;
}

public String getName() {
return name;
}

public Class<?> getItemType() {
return itemType;
}

public int getCapacity() {
return capacity;
}

public CompactionStrategy getStrategy() {
return strategy;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
MemorySet memorySet = (MemorySet) o;
return capacity == memorySet.capacity
&& Objects.equals(name, memorySet.name)
&& Objects.equals(itemType, memorySet.itemType)
&& Objects.equals(strategy, memorySet.strategy);
}

@Override
public int hashCode() {
return Objects.hash(name, itemType, capacity, strategy);
}

@Override
public String toString() {
return "MemorySet{"
+ "name='"
+ name
+ '\''
+ ", itemType="
+ itemType
+ ", capacity="
+ capacity
+ ", strategy="
+ strategy
+ '}';
}
}
Loading