Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions docs/content/90.cli/2.commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,14 @@ veadk deploy

## 本地调试

可以通过`adk web`或`veadk studio`来启动Web页面,运行智能体:
可以通过`adk web`或`veadk web`来启动Web页面,运行智能体:

```bash
# basic usage:
adk web

# if you need to use long-term memory, you should use `veadk web`.
# if the `session_service_uri` is not set, it will use `opensearch` as your long-term memory backend
veadk web --session_service_uri="mysql+pymysql://{user}:{password}@{host}/{database}"
# or
veadk web
```

它们能够自动读取执行命令目录中的`agent.py`文件,并加载`root_agent`全局变量。
Expand Down
211 changes: 60 additions & 151 deletions veadk/cli/cli_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,99 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional
from functools import wraps

import click

from veadk.memory.long_term_memory import LongTermMemory
from veadk.memory.short_term_memory import ShortTermMemory
from veadk.utils.logger import get_logger


def _get_stm_from_module(module) -> ShortTermMemory:
return module.agent_run_config.short_term_memory


def _get_stm_from_env() -> ShortTermMemory:
import os

from veadk.utils.logger import get_logger

logger = get_logger(__name__)

short_term_memory_backend = os.getenv("SHORT_TERM_MEMORY_BACKEND")
if not short_term_memory_backend: # prevent None or empty string
short_term_memory_backend = "local"
logger.info(f"Short term memory: backend={short_term_memory_backend}")

return ShortTermMemory(backend=short_term_memory_backend) # type: ignore


def _get_ltm_from_module(module) -> LongTermMemory | None:
agent = module.agent_run_config.agent

if not hasattr(agent, "long_term_memory"):
return None
else:
return agent.long_term_memory


def _get_ltm_from_env() -> LongTermMemory | None:
import os

from veadk.utils.logger import get_logger

logger = get_logger(__name__)

long_term_memory_backend = os.getenv("LONG_TERM_MEMORY_BACKEND")
app_name = os.getenv("VEADK_WEB_APP_NAME", "")
user_id = os.getenv("VEADK_WEB_USER_ID", "")

if long_term_memory_backend:
logger.info(f"Long term memory: backend={long_term_memory_backend}")
return LongTermMemory(
backend=long_term_memory_backend, app_name=app_name, user_id=user_id
) # type: ignore
else:
logger.warning("No long term memory backend settings detected.")
return None


def _get_memory(
module_path: str,
) -> tuple[ShortTermMemory, LongTermMemory | None]:
from veadk.utils.logger import get_logger
from veadk.utils.misc import load_module_from_file

logger = get_logger(__name__)

# 1. load user module
try:
module_file_path = module_path
module = load_module_from_file(
module_name="agent_and_mem", file_path=f"{module_file_path}/agent.py"
)
except Exception as e:
logger.error(
f"Failed to get memory config from `agent.py`: {e}. Fallback to get memory from environment variables."
)
return _get_stm_from_env(), _get_ltm_from_env()

if not hasattr(module, "agent_run_config"):
logger.error(
"You must export `agent_run_config` as a global variable in `agent.py`. Fallback to get memory from environment variables."
)
return _get_stm_from_env(), _get_ltm_from_env()

# 2. try to get short term memory
# short term memory must exist in user code, as we use `default_factory` to init it
short_term_memory = _get_stm_from_module(module)

# 3. try to get long term memory
long_term_memory = _get_ltm_from_module(module)
if not long_term_memory:
long_term_memory = _get_ltm_from_env()

return short_term_memory, long_term_memory
logger = get_logger(__name__)


def patch_adkwebserver_disable_openapi():
Expand Down Expand Up @@ -133,71 +47,66 @@ def wrapped_get_fast_api(self, *args, **kwargs):
google.adk.cli.adk_web_server.AdkWebServer.get_fast_api_app = wrapped_get_fast_api


@click.command()
@click.option("--host", default="127.0.0.1", help="Host to run the web server on")
@click.option(
"--app_name", default="", help="The `app_name` for initializing long term memory"
)
@click.option(
"--user_id", default="", help="The `user_id` for initializing long term memory"
@click.command(
context_settings=dict(ignore_unknown_options=True, allow_extra_args=True)
)
def web(host: str, app_name: str, user_id: str) -> None:
@click.pass_context
def web(ctx, *args, **kwargs) -> None:
"""Launch web with long term and short term memory."""
import os
from typing import Any

from google.adk.cli.utils.shared_value import SharedValue

from veadk.utils.logger import get_logger

logger = get_logger(__name__)

def init_for_veadk(
self,
*,
agent_loader: Any,
session_service: Any,
memory_service: Any,
artifact_service: Any,
credential_service: Any,
eval_sets_manager: Any,
eval_set_results_manager: Any,
agents_dir: str,
extra_plugins: Optional[list[str]] = None,
**kwargs: Any,
):
self.agent_loader = agent_loader
self.artifact_service = artifact_service
self.credential_service = credential_service
self.eval_sets_manager = eval_sets_manager
self.eval_set_results_manager = eval_set_results_manager
self.agents_dir = agents_dir
self.runners_to_clean = set()
self.current_app_name_ref = SharedValue(value="")
self.runner_dict = {}
self.extra_plugins = extra_plugins or []

for key, value in kwargs.items():
setattr(self, key, value)

# parse VeADK memories
short_term_memory, long_term_memory = _get_memory(module_path=agents_dir)
self.session_service = short_term_memory.session_service
self.memory_service = long_term_memory

os.environ["VEADK_WEB_APP_NAME"] = app_name
os.environ["VEADK_WEB_USER_ID"] = user_id

import google.adk.cli.adk_web_server
from google.adk.cli import adk_web_server
from google.adk.runners import Runner as ADKRunner

from veadk import Agent
from veadk.agents.loop_agent import LoopAgent
from veadk.agents.parallel_agent import ParallelAgent
from veadk.agents.sequential_agent import SequentialAgent

def before_get_runner_async(func):
logger.info("Hook before `get_runner_async`")

@wraps(func)
async def wrapper(*args, **kwargs) -> ADKRunner:
self: adk_web_server.AdkWebServer = args[0]
app_name: str = args[1]
"""Returns the cached runner for the given app."""
agent_or_app = self.agent_loader.load_agent(app_name)

if isinstance(agent_or_app, (SequentialAgent, LoopAgent, ParallelAgent)):
logger.warning(
"Detect VeADK workflow agent, the short-term memory and long-term memory of each sub agent are useless."
)

if isinstance(agent_or_app, Agent):
logger.info("Detect VeADK Agent.")

if agent_or_app.short_term_memory:
self.session_service = (
agent_or_app.short_term_memory.session_service
)

if agent_or_app.long_term_memory:
self.memory_service = agent_or_app.long_term_memory
logger.info(
f"Long term memory backend is {self.memory_service.backend}"
)

logger.info(
f"Current session_service={self.session_service.__class__.__name__}, memory_service={self.memory_service.__class__.__name__}"
)

runner = await func(*args, **kwargs)
return runner

return wrapper

adk_web_server.AdkWebServer.get_runner_async = before_get_runner_async(
adk_web_server.AdkWebServer.get_runner_async
)

google.adk.cli.adk_web_server.AdkWebServer.__init__ = init_for_veadk
patch_adkwebserver_disable_openapi()

import google.adk.cli.cli_tools_click as cli_tools_click

agents_dir = os.getcwd()
logger.info(f"Load agents from {agents_dir}")
from google.adk.cli.cli_tools_click import cli_web

cli_tools_click.cli_web.main(
args=[agents_dir, "--host", host, "--log_level", "ERROR"]
)
extra_args = ctx.args
logger.debug(f"User args: {ctx.args}")
cli_web.main(args=extra_args, standalone_mode=False)