diff --git a/README.md b/README.md
index 638ddd4f3..c719f7a02 100644
--- a/README.md
+++ b/README.md
@@ -3,9 +3,8 @@
[](https://www.apache.org/licenses/LICENSE-2.0.html)
[](https://deepwiki.com/apache/incubator-hugegraph-ai)
-`hugegraph-ai` aims to explore the integration of [HugeGraph](https://github.com/apache/hugegraph) with artificial
-intelligence (AI) and provide comprehensive support for developers to leverage HugeGraph's AI capabilities
-in their projects.
+`hugegraph-ai` aims to explore the integration of HugeGraph with artificial intelligence (AI) and provide comprehensive support for
+developers to leverage HugeGraph's AI capabilities in their projects.
## Modules
@@ -22,28 +21,11 @@ to seamlessly connect with third-party graph-related ML frameworks.
It is used to define graph structures and perform CRUD operations on graph data. Both the `hugegraph-llm` and
`hugegraph-ml` modules will depend on this foundational library.
-## Learn More
-
-The [project homepage](https://hugegraph.apache.org/docs/quickstart/hugegraph-ai/) contains more information about
-hugegraph-ai.
-
-And here are links of other repositories:
-1. [hugegraph](https://github.com/apache/hugegraph) (graph's core component - Graph server + PD + Store)
-2. [hugegraph-toolchain](https://github.com/apache/hugegraph-toolchain) (graph tools **[loader](https://github.com/apache/incubator-hugegraph-toolchain/tree/master/hugegraph-loader)/[dashboard](https://github.com/apache/incubator-hugegraph-toolchain/tree/master/hugegraph-hubble)/[tool](https://github.com/apache/incubator-hugegraph-toolchain/tree/master/hugegraph-tools)/[client](https://github.com/apache/incubator-hugegraph-toolchain/tree/master/hugegraph-client)**)
-3. [hugegraph-computer](https://github.com/apache/hugegraph-computer) (integrated **graph computing** system)
-4. [hugegraph-website](https://github.com/apache/hugegraph-doc) (**doc & website** code)
-
-
## Contributing
-- Welcome to contribute to HugeGraph, please see [Guidelines](https://hugegraph.apache.org/docs/contribution-guidelines/) for more information.
-- Note: It's recommended to use [GitHub Desktop](https://desktop.github.com/) to greatly simplify the PR and commit process.
- Code format: Please run [`./style/code_format_and_analysis.sh`](style/code_format_and_analysis.sh) to format your code before submitting a PR. (Use `pylint` to check code style)
- Thank you to all the people who already contributed to HugeGraph!
-[](https://github.com/apache/incubator-hugegraph-ai/graphs/contributors)
-
-
## License
hugegraph-ai is licensed under [Apache 2.0 License](./LICENSE).
@@ -51,8 +33,5 @@ hugegraph-ai is licensed under [Apache 2.0 License](./LICENSE).
## Contact Us
- - [GitHub Issues](https://github.com/apache/incubator-hugegraph-ai/issues): Feedback on usage issues and functional requirements (quick response)
- - Feedback Email: [dev@hugegraph.apache.org](mailto:dev@hugegraph.apache.org) ([subscriber](https://hugegraph.apache.org/docs/contribution-guidelines/subscribe/) only)
- - WeChat public account: Apache HugeGraph, welcome to scan this QR code to follow us.
-
-
+ - 如流 HugeGraph Team/Group
+ - GraphRAG DevOps Team (🚧)
diff --git a/hugegraph-llm/README.md b/hugegraph-llm/README.md
index 4c2a2010c..9e4014ab2 100644
--- a/hugegraph-llm/README.md
+++ b/hugegraph-llm/README.md
@@ -38,7 +38,7 @@ graph systems and large language models.
3. Clone this project
```bash
- git clone https://github.com/apache/incubator-hugegraph-ai.git
+ git clone https://{username}@icode.baidu.com/baidu/starhugegraph/hugegraph-ai
```
4. Configuration dependency environment
```bash
@@ -85,8 +85,8 @@ graph systems and large language models.
- Docs:
- text: Build rag index from plain text
- file: Upload file(s) which should be TXT or .docx (Multiple files can be selected together)
-- [Schema](https://hugegraph.apache.org/docs/clients/restful-api/schema/): (Except **2 types**)
- - User-defined Schema (JSON format, follow the [template](https://github.com/apache/incubator-hugegraph-ai/blob/aff3bbe25fa91c3414947a196131be812c20ef11/hugegraph-llm/src/hugegraph_llm/config/config_data.py#L125)
+- [Schema](https://starhugegraph.github.io/hugegraph-doc/clients/restful-api-v3/schema.html): (Accept **2 types**)
+ - User-defined Schema (JSON format, follow the [template](https://console.cloud.baidu-int.com/devops/icode/repos/baidu/starhugegraph/hugegraph-ai/blob/master/hugegraph-llm/src/hugegraph_llm/config/config_data.py#L173)
to modify it)
- Specify the name of the HugeGraph graph instance, it will automatically get the schema from it (like
**"hugegraph"**)
diff --git a/hugegraph-llm/src/hugegraph_llm/api/models/rag_requests.py b/hugegraph-llm/src/hugegraph_llm/api/models/rag_requests.py
index 3170e702e..bdf171f26 100644
--- a/hugegraph-llm/src/hugegraph_llm/api/models/rag_requests.py
+++ b/hugegraph-llm/src/hugegraph_llm/api/models/rag_requests.py
@@ -15,20 +15,21 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Optional, Literal
-
+from typing import Optional, Literal, List
+from enum import Enum
from fastapi import Query
from pydantic import BaseModel
from hugegraph_llm.config import prompt
-
+from hugegraph_llm.config import huge_settings
class GraphConfigRequest(BaseModel):
url: str = Query('127.0.0.1:8080', description="hugegraph client url.")
- name: str = Query('hugegraph', description="hugegraph client name.")
- user: str = Query('', description="hugegraph client user.")
- pwd: str = Query('', description="hugegraph client pwd.")
+ graph: str = Query('hugegraph', description="hugegraph client name.")
+ user: Optional[str] = Query('', description="hugegraph client user.")
+ pwd: Optional[str] = Query('', description="hugegraph client pwd.")
gs: str = None
+ token: Optional[str] = Query('', description="hugegraph client token.")
class RAGRequest(BaseModel):
@@ -116,3 +117,31 @@ class RerankerConfigRequest(BaseModel):
class LogStreamRequest(BaseModel):
admin_token: Optional[str] = None
log_file: Optional[str] = "llm-server.log"
+
+class GremlinOutputType(str, Enum):
+ MATCH_RESULT = "match_result"
+ TEMPLATE_GREMLIN = "template_gremlin"
+ RAW_GREMLIN = "raw_gremlin"
+ TEMPLATE_EXECUTION_RESULT = "template_execution_result"
+ RAW_EXECUTION_RESULT = "raw_execution_result"
+
+class GremlinGenerateRequest(BaseModel):
+ query: str
+ example_num: Optional[int] = Query(
+ 0,
+ description="Number of Gremlin templates to use.(0 means no templates)"
+ )
+ gremlin_prompt: Optional[str] = Query(
+ prompt.gremlin_generate_prompt,
+ description="Prompt for the Text2Gremlin query.",
+ )
+ client_config: Optional[GraphConfigRequest] = Query(None, description="hugegraph server config.")
+ output_types: Optional[List[GremlinOutputType]] = Query(
+ default=[GremlinOutputType.TEMPLATE_GREMLIN],
+ description="""
+ a list can contain "match_result","template_gremlin",
+ "raw_gremlin","template_execution_result","raw_execution_result"
+ You can specify which type of result do you need. Empty means all types.
+ """
+ )
+
diff --git a/hugegraph-llm/src/hugegraph_llm/api/rag_api.py b/hugegraph-llm/src/hugegraph_llm/api/rag_api.py
index 2621220c9..fdd06ebea 100644
--- a/hugegraph-llm/src/hugegraph_llm/api/rag_api.py
+++ b/hugegraph-llm/src/hugegraph_llm/api/rag_api.py
@@ -26,6 +26,7 @@
LLMConfigRequest,
RerankerConfigRequest,
GraphRAGRequest,
+ GremlinGenerateRequest,
)
from hugegraph_llm.config import huge_settings
from hugegraph_llm.api.models.rag_response import RAGResponse
@@ -41,6 +42,7 @@ def rag_http_api(
apply_llm_conf,
apply_embedding_conf,
apply_reranker_conf,
+ gremlin_generate_selective_func,
):
@router.post("/rag", status_code=status.HTTP_200_OK)
def rag_answer_api(req: RAGRequest):
@@ -79,10 +81,11 @@ def rag_answer_api(req: RAGRequest):
def set_graph_config(req):
if req.client_config:
huge_settings.graph_url = req.client_config.url
- huge_settings.graph_name = req.client_config.name
+ huge_settings.graph_name = req.client_config.graph
huge_settings.graph_user = req.client_config.user
huge_settings.graph_pwd = req.client_config.pwd
huge_settings.graph_space = req.client_config.gs
+ huge_settings.graph_token = req.client_config.token
@router.post("/rag/graph", status_code=status.HTTP_200_OK)
def graph_rag_recall_api(req: GraphRAGRequest):
@@ -139,7 +142,7 @@ def graph_rag_recall_api(req: GraphRAGRequest):
@router.post("/config/graph", status_code=status.HTTP_201_CREATED)
def graph_config_api(req: GraphConfigRequest):
# Accept status code
- res = apply_graph_conf(req.url, req.name, req.user, req.pwd, req.gs, origin_call="http")
+ res = apply_graph_conf(req.url, req.graph, req.user, req.pwd, req.gs, origin_call="http")
return generate_response(RAGResponse(status_code=res, message="Missing Value"))
# TODO: restructure the implement of llm to three types, like "/config/chat_llm"
@@ -178,3 +181,29 @@ def rerank_config_api(req: RerankerConfigRequest):
else:
res = status.HTTP_501_NOT_IMPLEMENTED
return generate_response(RAGResponse(status_code=res, message="Missing Value"))
+
+ @router.post("/text2gremlin", status_code=status.HTTP_200_OK)
+ def text2gremlin_api(req: GremlinGenerateRequest):
+ try:
+ set_graph_config(req)
+
+ output_types_str_list = None
+ if req.output_types:
+ output_types_str_list = [ot.value for ot in req.output_types]
+
+ response_dict = gremlin_generate_selective_func(
+ inp=req.query,
+ example_num=req.example_num,
+ schema_input=huge_settings.graph_name,
+ gremlin_prompt_input=req.gremlin_prompt,
+ requested_outputs=output_types_str_list,
+ )
+ return response_dict
+ except HTTPException as e:
+ raise e
+ except Exception as e:
+ log.error(f"Error in text2gremlin_api: {e}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail="An unexpected error occurred during Gremlin generation.",
+ ) from e
diff --git a/hugegraph-llm/src/hugegraph_llm/api/vector_api.py b/hugegraph-llm/src/hugegraph_llm/api/vector_api.py
new file mode 100644
index 000000000..f7e438d8c
--- /dev/null
+++ b/hugegraph-llm/src/hugegraph_llm/api/vector_api.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from datetime import date
+from typing import Optional
+
+from fastapi import status, APIRouter, HTTPException, Body
+
+from hugegraph_llm.utils.log import log
+from hugegraph_llm.api.models.rag_requests import GraphConfigRequest
+from hugegraph_llm.config import huge_settings
+
+API_CALL_TRACKER = {}
+
+
+# pylint: disable=too-many-statements
+def vector_http_api(router: APIRouter, update_embedding_func):
+ @router.post("/vector/embedding", status_code=status.HTTP_200_OK)
+ def update_embedding_api(
+ daily_limit: int = 50,
+ graph_config: Optional[GraphConfigRequest] = Body(None)
+ ):
+ """
+ Updates the vector embedding.
+ This endpoint is rate-limited. By default, it allows 2 calls per day. (Note: Not Thread-Safe!)
+ The rate limit is tracked per day and resets at midnight.
+ """
+ today = date.today()
+ for call_date in list(API_CALL_TRACKER.keys()):
+ if call_date != today:
+ del API_CALL_TRACKER[call_date]
+ call_count = API_CALL_TRACKER.get(today, 0)
+ if call_count >= daily_limit:
+ log.error("Rate limit exceeded for update_vid_embedding. Maximum %d calls per day.", daily_limit)
+ raise HTTPException(
+ status_code=status.HTTP_429_TOO_MANY_REQUESTS,
+ detail=f"API call limit of {daily_limit} per day exceeded. Please try again tomorrow."
+ )
+ API_CALL_TRACKER[today] = call_count + 1
+ if graph_config:
+ huge_settings.graph_url = graph_config.url
+ huge_settings.graph_name = graph_config.graph
+ huge_settings.graph_user = graph_config.user
+ huge_settings.graph_pwd = graph_config.pwd
+ huge_settings.graph_space = graph_config.gs
+ huge_settings.graph_token = graph_config.token
+
+ result = update_embedding_func()
+ result = {"detail": result}
+ return result
diff --git a/hugegraph-llm/src/hugegraph_llm/config/hugegraph_config.py b/hugegraph-llm/src/hugegraph_llm/config/hugegraph_config.py
index e51008d96..3924ea502 100644
--- a/hugegraph-llm/src/hugegraph_llm/config/hugegraph_config.py
+++ b/hugegraph-llm/src/hugegraph_llm/config/hugegraph_config.py
@@ -27,6 +27,7 @@ class HugeGraphConfig(BaseConfig):
graph_user: Optional[str] = "admin"
graph_pwd: Optional[str] = "xxx"
graph_space: Optional[str] = None
+ graph_token: Optional[str] = None
# graph query config
limit_property: Optional[str] = "False"
diff --git a/hugegraph-llm/src/hugegraph_llm/config/models/base_prompt_config.py b/hugegraph-llm/src/hugegraph_llm/config/models/base_prompt_config.py
index 5341fd801..ab4168805 100644
--- a/hugegraph-llm/src/hugegraph_llm/config/models/base_prompt_config.py
+++ b/hugegraph-llm/src/hugegraph_llm/config/models/base_prompt_config.py
@@ -22,9 +22,9 @@
from hugegraph_llm.utils.log import log
dir_name = os.path.dirname
+package_path = dir_name(dir_name(dir_name(dir_name(dir_name(os.path.abspath(__file__))))))
F_NAME = "config_prompt.yaml"
-yaml_file_path = os.path.join(os.getcwd(), "src/hugegraph_llm/resources/demo", F_NAME)
-
+yaml_file_path = os.path.join(package_path, f"src/hugegraph_llm/resources/demo/{F_NAME}")
class BasePromptConfig:
graph_schema: str = ''
diff --git a/hugegraph-llm/src/hugegraph_llm/config/prompt_config.py b/hugegraph-llm/src/hugegraph_llm/config/prompt_config.py
index 01b92b7ef..2cc16c3c1 100644
--- a/hugegraph-llm/src/hugegraph_llm/config/prompt_config.py
+++ b/hugegraph-llm/src/hugegraph_llm/config/prompt_config.py
@@ -218,6 +218,8 @@ class PromptConfig(BasePromptConfig):
- You may use the vertex ID directly if it’s provided in the context.
- If the provided question contains entity names that are very similar to the Vertices IDs, then in the generated Gremlin statement, replace the approximate entities from the original question.
For example, if the question includes the name ABC, and the provided VerticesIDs do not contain ABC but only abC, then use abC instead of ABC from the original question when generating the gremlin.
+- Similarly, if the user's query refers to specific property names or their values, and these are present or align with the 'Referenced Extracted Properties', actively utilize these properties in your Gremlin query.
+For instance, you can use them for filtering vertices or edges (e.g., using `has('propertyName', 'propertyValue')`), or for projecting specific values.
The output format must be as follows:
```gremlin
@@ -231,6 +233,9 @@ class PromptConfig(BasePromptConfig):
Referenced Extracted Vertex IDs Related to the Query:
{vertices}
+Referenced Extracted Properties Related to the Query (Format: [('property_name', 'property_value'), ...]):
+{properties}
+
Generate Gremlin from the Following User Query:
{query}
diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
index 880ac4066..7c00a6b2d 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/app.py
@@ -23,6 +23,7 @@
from hugegraph_llm.api.admin_api import admin_http_api
from hugegraph_llm.api.rag_api import rag_http_api
+from hugegraph_llm.api.vector_api import vector_http_api
from hugegraph_llm.config import admin_settings, huge_settings, prompt
from hugegraph_llm.demo.rag_demo.admin_block import create_admin_block, log_stream
from hugegraph_llm.demo.rag_demo.configs_block import (
@@ -32,10 +33,15 @@
apply_reranker_config,
apply_graph_config,
)
+from hugegraph_llm.utils.graph_index_utils import update_vid_embedding
from hugegraph_llm.demo.rag_demo.other_block import create_other_block
from hugegraph_llm.demo.rag_demo.other_block import lifespan
from hugegraph_llm.demo.rag_demo.rag_block import create_rag_block, rag_answer
-from hugegraph_llm.demo.rag_demo.text2gremlin_block import create_text2gremlin_block, graph_rag_recall
+from hugegraph_llm.demo.rag_demo.text2gremlin_block import (
+ create_text2gremlin_block,
+ graph_rag_recall,
+ gremlin_generate_selective,
+)
from hugegraph_llm.demo.rag_demo.vector_graph_block import create_vector_graph_block
from hugegraph_llm.resources.demo.css import CSS
from hugegraph_llm.utils.log import log
@@ -171,9 +177,10 @@ def create_app():
apply_llm_config,
apply_embedding_config,
apply_reranker_config,
+ gremlin_generate_selective,
)
admin_http_api(api_auth, log_stream)
-
+ vector_http_api(api_auth, update_vid_embedding)
app.include_router(api_auth)
# Mount Gradio inside FastAPI
# TODO: support multi-user login when need
diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/configs_block.py b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/configs_block.py
index cb3677709..acc2f120d 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/configs_block.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/configs_block.py
@@ -219,7 +219,7 @@ def apply_llm_config(current_llm_config, arg1, arg2, arg3, arg4, origin_call=Non
data = {
"model": arg3,
"temperature": 0.01,
- "messages": [{"role": "user", "content": "test"}],
+ "messages": [{"role": "user", "content": "hello"}],
}
headers = {"Authorization": f"Bearer {arg1}"}
status_code = test_api_connection(test_url, method="POST", headers=headers, body=data, origin_call=origin_call)
diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/text2gremlin_block.py b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/text2gremlin_block.py
index 0fcc7f7ce..b2822a1b1 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/text2gremlin_block.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/text2gremlin_block.py
@@ -17,7 +17,7 @@
import json
import os
-from typing import Any, Tuple, Dict, Union, Literal
+from typing import Any, Tuple, Dict, Union, Literal, Optional, List, Set
import gradio as gr
import pandas as pd
@@ -214,3 +214,62 @@ def graph_rag_recall(
)
context = rag.run(verbose=True, query=query, graph_search=True)
return context
+
+def gremlin_generate_selective(
+ inp: str,
+ example_num: int,
+ schema_input: str,
+ gremlin_prompt_input: str,
+ requested_outputs: Optional[List[str]] = None,
+) -> Dict[str, Any]:
+ """
+ Wraps the original gremlin_generate function and filters its output
+ based on the requested_outputs list of strings.
+ """
+
+ output_keys = [
+ "match_result",
+ "template_gremlin",
+ "raw_gremlin",
+ "template_execution_result",
+ "raw_execution_result",
+ ]
+ original_results = gremlin_generate(inp, example_num, schema_input, gremlin_prompt_input)
+
+ outputs_dict: Dict[str, Any] = {}
+ requested_outputs_set: Set[str]
+
+ if not requested_outputs: # None or empty list
+ requested_outputs_set = set(output_keys)
+ else:
+ requested_outputs_set = set(requested_outputs)
+
+ # Handle the case where gremlin_generate might return a 2-tuple error message
+ if isinstance(original_results, tuple) and len(original_results) == 2 and isinstance(original_results[0], str):
+ # This indicates an error from gremlin_generate (e.g., "Invalid JSON schema...")
+ # In this case, we can return the error message for relevant fields or a general error
+ if "match_result" in requested_outputs_set: # Or any other default error field
+ outputs_dict["match_result"] = original_results[0]
+ outputs_dict["error_detail"] = original_results[1] # usually empty string from original
+ # Or, more simply, return a dictionary indicating the error.
+ # For simplicity, if an error tuple is returned, and match_result is requested, we populate it.
+ # This part might need refinement based on how errors should be structured in the selective output.
+ # For now, if an error tuple is returned, and "match_result" is requested, it gets the error message.
+ # Other requested fields will be absent.
+ return outputs_dict # Early exit if gremlin_generate returned an error tuple
+
+
+ match_res_orig, template_gremlin_orig, raw_gremlin_orig, template_exec_res_orig, raw_exec_res_orig = original_results
+
+ if "match_result" in requested_outputs_set:
+ outputs_dict["match_result"] = match_res_orig
+ if "template_gremlin" in requested_outputs_set:
+ outputs_dict["template_gremlin"] = template_gremlin_orig
+ if "raw_gremlin" in requested_outputs_set:
+ outputs_dict["raw_gremlin"] = raw_gremlin_orig
+ if "template_execution_result" in requested_outputs_set:
+ outputs_dict["template_execution_result"] = template_exec_res_orig
+ if "raw_execution_result" in requested_outputs_set:
+ outputs_dict["raw_execution_result"] = raw_exec_res_orig
+
+ return outputs_dict
diff --git a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py
index 51af045d7..b1934332f 100644
--- a/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py
+++ b/hugegraph-llm/src/hugegraph_llm/demo/rag_demo/vector_graph_block.py
@@ -54,8 +54,8 @@ def create_vector_graph_block():
- Docs:
- text: Build rag index from plain text
- file: Upload file(s) which should be TXT or .docx (Multiple files can be selected together)
-- [Schema](https://hugegraph.apache.org/docs/clients/restful-api/schema/): (Accept **2 types**)
- - User-defined Schema (JSON format, follow the [template](https://github.com/apache/incubator-hugegraph-ai/blob/aff3bbe25fa91c3414947a196131be812c20ef11/hugegraph-llm/src/hugegraph_llm/config/config_data.py#L125)
+- [Schema](https://starhugegraph.github.io/hugegraph-doc/clients/restful-api-v3/schema.html): (Accept **2 types**)
+ - User-defined Schema (JSON format, follow the [template](https://console.cloud.baidu-int.com/devops/icode/repos/baidu/starhugegraph/hugegraph-ai/blob/master/hugegraph-llm/src/hugegraph_llm/config/config_data.py#L173)
to modify it)
- Specify the name of the HugeGraph graph instance, it will automatically get the schema from it (like
**"hugegraph"**)
diff --git a/hugegraph-llm/src/hugegraph_llm/indices/graph_index.py b/hugegraph-llm/src/hugegraph_llm/indices/graph_index.py
index e78aa6d58..1b9affd17 100644
--- a/hugegraph-llm/src/hugegraph_llm/indices/graph_index.py
+++ b/hugegraph-llm/src/hugegraph_llm/indices/graph_index.py
@@ -29,10 +29,11 @@ def __init__(
graph_name: Optional[str] = huge_settings.graph_name,
graph_user: Optional[str] = huge_settings.graph_user,
graph_pwd: Optional[str] = huge_settings.graph_pwd,
+ graph_token: Optional[str] = huge_settings.graph_token,
graph_space: Optional[str] = huge_settings.graph_space,
):
self.client = PyHugeClient(url=graph_url, graph=graph_name, user=graph_user, pwd=graph_pwd,
- graphspace=graph_space)
+ token=huge_settings.graph_token, graphspace=graph_space)
def clear_graph(self):
self.client.gremlin().exec("g.V().drop()")
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py b/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py
index 70f3d27d2..2f4e0d201 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/gremlin_generate_task.py
@@ -59,9 +59,13 @@ def example_index_query(self, num_examples):
return self
def gremlin_generate_synthesize(
- self, schema, gremlin_prompt: Optional[str] = None, vertices: Optional[List[str]] = None
+ self,
+ schema,
+ gremlin_prompt: Optional[str] = None,
+ vertices: Optional[List[str]] = None,
+ properties: Optional[List[tuple]] = None
):
- self.operators.append(GremlinGenerateSynthesize(self.llm, schema, vertices, gremlin_prompt))
+ self.operators.append(GremlinGenerateSynthesize(self.llm, schema, vertices, gremlin_prompt, properties))
return self
def print_result(self):
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/commit_to_hugegraph.py b/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/commit_to_hugegraph.py
index 5cc846d21..748f505a9 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/commit_to_hugegraph.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/commit_to_hugegraph.py
@@ -32,6 +32,7 @@ def __init__(self):
graph=huge_settings.graph_name,
user=huge_settings.graph_user,
pwd=huge_settings.graph_pwd,
+ token=huge_settings.graph_token,
graphspace=huge_settings.graph_space,
)
self.schema = self.client.schema()
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/fetch_graph_data.py b/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/fetch_graph_data.py
index e93d916b3..313b56659 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/fetch_graph_data.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/fetch_graph_data.py
@@ -25,6 +25,7 @@ class FetchGraphData:
def __init__(self, graph: PyHugeClient):
self.graph = graph
+ self.schema = self.graph.schema()
def run(self, graph_summary: Optional[Dict[str, Any]]) -> Dict[str, Any]:
if graph_summary is None:
@@ -49,4 +50,19 @@ def res = [:];
if isinstance(result, list) and len(result) > 0:
graph_summary.update({key: result[i].get(key) for i, key in enumerate(keys)})
+
+ index_labels = self.schema.getIndexLabels()
+ if index_labels:
+ graph_summary["index_labels"] = [
+ {
+ "id": label.id,
+ "base_type": label.baseType,
+ "base_value": label.baseValue,
+ "name": label.name,
+ "fields": label.fields,
+ "index_type": label.indexType
+ } for label in index_labels
+ ]
+ else:
+ graph_summary["index_labels"] = []
return graph_summary
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/graph_rag_query.py b/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/graph_rag_query.py
index 6012b7534..75bd54be0 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/graph_rag_query.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/graph_rag_query.py
@@ -53,7 +53,7 @@
"""
PROPERTY_QUERY_NEIGHBOR_TPL = """\
-g.V().has('{prop}', within({keywords}))
+g.V().has('{current_prop_name}', '{current_prop_value}')
.repeat(
bothE({edge_labels}).limit({edge_limit}).otherV().dedup()
).times({max_deep}).emit()
@@ -65,8 +65,8 @@
)
.by(project('label', 'inV', 'outV', 'props')
.by(label())
- .by(inV().values('{prop}'))
- .by(outV().values('{prop}'))
+ .by(inV().values('{current_prop_name}'))
+ .by(outV().values('{current_prop_name}'))
.by(valueMap().by(unfold()))
)
.limit({max_items})
@@ -92,6 +92,7 @@ def __init__(
graph=huge_settings.graph_name,
user=huge_settings.graph_user,
pwd=huge_settings.graph_pwd,
+ token=huge_settings.graph_token,
graphspace=huge_settings.graph_space,
)
self._max_deep = max_deep
@@ -129,12 +130,13 @@ def run(self, context: Dict[str, Any]) -> Dict[str, Any]:
def _gremlin_generate_query(self, context: Dict[str, Any]) -> Dict[str, Any]:
query = context["query"]
vertices = context.get("match_vids")
+ properties = context.get("match_props")
query_embedding = context.get("query_embedding")
self._gremlin_generator.clear()
self._gremlin_generator.example_index_query(num_examples=self._num_gremlin_generate_example)
gremlin_response = self._gremlin_generator.gremlin_generate_synthesize(
- context["simple_schema"], vertices=vertices, gremlin_prompt=self._gremlin_prompt
+ context["simple_schema"], vertices=vertices, gremlin_prompt=self._gremlin_prompt, properties=properties
).run(query=query, query_embedding=query_embedding)
if self._num_gremlin_generate_example > 0:
gremlin = gremlin_response["result"]
@@ -160,12 +162,14 @@ def _gremlin_generate_query(self, context: Dict[str, Any]) -> Dict[str, Any]:
def _subgraph_query(self, context: Dict[str, Any]) -> Dict[str, Any]:
# 1. Extract params from context
matched_vids = context.get("match_vids")
+ matched_props = context.get("match_props")
if isinstance(context.get("max_deep"), int):
self._max_deep = context["max_deep"]
if isinstance(context.get("max_items"), int):
self._max_items = context["max_items"]
- if isinstance(context.get("prop_to_match"), str):
- self._prop_to_match = context["prop_to_match"]
+ if isinstance(context.get("match_props"), list):
+ self._prop_to_match = matched_props[0][0] if matched_props else None
+ log.debug("Prop to match: %s", self._prop_to_match)
# 2. Extract edge_labels from graph schema
_, edge_labels = self._extract_labels_from_schema()
@@ -207,31 +211,34 @@ def _subgraph_query(self, context: Dict[str, Any]) -> Dict[str, Any]:
vertex_degree_list[0].update(vertex_knowledge)
else:
vertex_degree_list.append(vertex_knowledge)
- else:
+ elif matched_props:
# WARN: When will the query enter here?
- keywords = context.get("keywords")
- assert keywords, "No related property(keywords) for graph query."
- keywords_str = ",".join("'" + kw + "'" for kw in keywords)
- gremlin_query = PROPERTY_QUERY_NEIGHBOR_TPL.format(
- prop=self._prop_to_match,
- keywords=keywords_str,
- edge_labels=edge_labels_str,
- edge_limit=edge_limit_amount,
- max_deep=self._max_deep,
- max_items=self._max_items,
- )
- log.warning("Unable to find vid, downgraded to property query, please confirm if it meets expectation.")
+ graph_chain_knowledge = set()
+ for prop_name, prop_value in matched_props:
+ self._prop_to_match = prop_name
+ gremlin_query = PROPERTY_QUERY_NEIGHBOR_TPL.format(
+ current_prop_name=prop_name,
+ current_prop_value=prop_value,
+ edge_labels=edge_labels_str,
+ edge_limit=edge_limit_amount,
+ max_deep=self._max_deep,
+ max_items=self._max_items
+ )
+ log.warning("Unable to find vid, downgraded to property query, please confirm if it meets expectation.")
+ log.debug("property gremlin: %s", gremlin_query)
- paths: List[Any] = self._client.gremlin().exec(gremlin=gremlin_query)["data"]
- graph_chain_knowledge, vertex_degree_list, knowledge_with_degree = self._format_graph_query_result(
- query_paths=paths
- )
+ paths: List[Any] = self._client.gremlin().exec(gremlin=gremlin_query)["data"]
+ log.debug("paths: %s", paths)
+ temp_graph_chain_knowledge, vertex_degree_list, knowledge_with_degree = self._format_graph_query_result(
+ query_paths=paths
+ )
+ graph_chain_knowledge.update(temp_graph_chain_knowledge)
context["graph_result"] = list(graph_chain_knowledge)
if context["graph_result"]:
context["graph_result_flag"] = 0
context["vertex_degree_list"] = [list(vertex_degree) for vertex_degree in vertex_degree_list]
- context["knowledge_with_degree"] = knowledge_with_degree
+ context["knowledge_with_degree"] = knowledge_with_degree # pylint: disable=possibly-used-before-assignment
context["graph_context_head"] = (
f"The following are graph knowledge in {self._max_deep} depth, e.g:\n"
"`vertexA--[links]-->vertexB<--[links]--vertexC ...`"
@@ -340,7 +347,7 @@ def _process_vertex(
node_str = matched_str
else:
v_cache.add(matched_str)
- node_str = f"{item['id']}{{{props_str}}}"
+ node_str = f"{item['id']}{{{props_str}}}" if use_id_to_match else f"{item['props']}{{{props_str}}}"
flat_rel += node_str
nodes_with_degree.append(node_str)
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/schema_manager.py b/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/schema_manager.py
index 2f50bb818..5cc89f38e 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/schema_manager.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/hugegraph_op/schema_manager.py
@@ -28,6 +28,7 @@ def __init__(self, graph_name: str):
graph=self.graph_name,
user=huge_settings.graph_user,
pwd=huge_settings.graph_pwd,
+ token=huge_settings.graph_token,
graphspace=huge_settings.graph_space,
)
self.schema = self.client.schema()
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py b/hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py
index e6b4080ad..778ad4883 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/index_op/build_semantic_index.py
@@ -18,7 +18,8 @@
import asyncio
import os
-from typing import Any, Dict
+from collections import defaultdict
+from typing import Any
from tqdm import tqdm
@@ -27,14 +28,33 @@
from hugegraph_llm.models.embeddings.base import BaseEmbedding
from hugegraph_llm.operators.hugegraph_op.schema_manager import SchemaManager
from hugegraph_llm.utils.log import log
+from pyhugegraph.client import PyHugeClient
+INDEX_PROPERTY_GREMLIN = """
+g.V().hasLabel('{label}')
+ .limit(100000)
+ .project('vid', 'properties')
+ .by(id())
+ .by(valueMap({fields}))
+ .toList()
+"""
class BuildSemanticIndex:
def __init__(self, embedding: BaseEmbedding):
self.index_dir = str(os.path.join(resource_path, huge_settings.graph_name, "graph_vids"))
+ self.index_dir_prop = str(os.path.join(resource_path, huge_settings.graph_name, "graph_props"))
self.vid_index = VectorIndex.from_index_file(self.index_dir)
+ self.prop_index = VectorIndex.from_index_file(self.index_dir_prop)
self.embedding = embedding
self.sm = SchemaManager(huge_settings.graph_name)
+ self.client = PyHugeClient(
+ url=huge_settings.graph_url,
+ graph=huge_settings.graph_name,
+ user=huge_settings.graph_user,
+ pwd=huge_settings.graph_pwd,
+ token=huge_settings.graph_token,
+ graphspace=huge_settings.graph_space,
+ )
def _extract_names(self, vertices: list[str]) -> list[str]:
return [v.split(":")[1] for v in vertices]
@@ -67,15 +87,72 @@ async def get_embeddings_with_semaphore(vid_list: list[str]) -> Any:
pbar.update(1)
return embeddings
- def run(self, context: Dict[str, Any]) -> Dict[str, Any]:
- vertexlabels = self.sm.schema.getSchema()["vertexlabels"]
- all_pk_flag = all(data.get('id_strategy') == 'PRIMARY_KEY' for data in vertexlabels)
+ def diff_property_sets(
+ self,
+ present_prop_value_to_propset: dict,
+ past_prop_value_to_propset: dict
+ ):
+ to_add = []
+ to_update = []
+ to_update_remove = []
+ to_remove_keys = set(past_prop_value_to_propset) - set(present_prop_value_to_propset)
+ to_remove = [past_prop_value_to_propset[k] for k in to_remove_keys]
+ for prop_value, present_propset in present_prop_value_to_propset.items():
+ past_propset = past_prop_value_to_propset.get(prop_value)
+ if past_propset is None:
+ to_add.append((prop_value, present_propset))
+ elif present_propset != past_propset:
+ to_update.append((prop_value, present_propset))
+ to_update_remove.append((prop_value, past_propset))
+ return to_add, to_update, to_remove, to_update_remove
+
+ def get_present_props(self, context: dict[str, Any]) -> dict[str, frozenset[tuple[str, str]]]:
+ results = []
+ for item in context["index_labels"]:
+ label = item["base_value"]
+ fields = item["fields"]
+ fields_str_list = [f"'{field}'" for field in fields]
+ fields_for_query = ", ".join(fields_str_list)
+ gremlin_query = INDEX_PROPERTY_GREMLIN.format(
+ label=label,
+ fields=fields_for_query
+ )
+ log.debug("gremlin_query: %s", gremlin_query)
+ result = self.client.gremlin().exec(gremlin=gremlin_query)["data"]
+ results.extend(result)
+ orig_present_prop_value_to_propset = defaultdict(set)
+ for item in results:
+ properties = item["properties"]
+ for prop_key, values in properties.items():
+ if not values:
+ continue
+ prop_value = str(values[0])
+ orig_present_prop_value_to_propset[prop_value].add((prop_key, prop_value))
+ present_prop_value_to_propset = {
+ k: frozenset(v)
+ for k, v in orig_present_prop_value_to_propset.items()
+ }
+ return present_prop_value_to_propset
+
+ def get_past_props(self) -> dict[str, frozenset[tuple[str, str]]]:
+ orig_past_prop_value_to_propset = defaultdict(set)
+ for propset in self.prop_index.properties:
+ for _, prop_value in propset:
+ orig_past_prop_value_to_propset[prop_value].update(propset)
+ past_prop_value_to_propset = {
+ k: frozenset(v)
+ for k, v in orig_past_prop_value_to_propset.items()
+ }
+ return past_prop_value_to_propset
+ def _update_vid_index(self, context: dict[str, Any], all_pk_flag: bool) -> tuple[int, int]:
past_vids = self.vid_index.properties
# TODO: We should build vid vector index separately, especially when the vertices may be very large
present_vids = context["vertices"] # Warning: data truncated by fetch_graph_data.py
removed_vids = set(past_vids) - set(present_vids)
removed_num = self.vid_index.remove(removed_vids)
+ if removed_num:
+ self.vid_index.to_index_file(self.index_dir)
added_vids = list(set(present_vids) - set(past_vids))
if added_vids:
@@ -86,8 +163,69 @@ def run(self, context: Dict[str, Any]) -> Dict[str, Any]:
self.vid_index.to_index_file(self.index_dir)
else:
log.debug("No update vertices to build vector index.")
+ return removed_num, len(added_vids)
+
+ def _update_property_index(self, context: dict[str, Any]) -> tuple[int, int | str]:
+ removed_props_num = 0
+ added_props_vector_num: int | str = 0
+
+ present_prop_value_to_propset = self.get_present_props(context)
+ # log.debug("present_prop_value_to_propset: %s", present_prop_value_to_propset)
+ past_prop_value_to_propset = self.get_past_props()
+ # log.debug("past_prop_value_to_propset: %s", past_prop_value_to_propset)
+ to_add, to_update, to_remove, to_update_remove = self.diff_property_sets(
+ present_prop_value_to_propset,
+ past_prop_value_to_propset
+ )
+ log.debug("to_add: %s", to_add)
+ log.debug("to_update: %s", to_update)
+ log.debug("to_remove: %s", to_remove)
+ log.debug("to_update_remove: %s", to_update_remove)
+ log.info("Removing %s outdated property value", len(to_remove))
+ removed_props_num = self.prop_index.remove(to_remove)
+ if removed_props_num:
+ self.prop_index.to_index_file(self.index_dir_prop)
+ all_to_add = to_add + to_update
+ add_propsets = []
+ add_prop_values = []
+ for prop_value, propset in all_to_add:
+ add_propsets.append(propset)
+ add_prop_values.append(prop_value)
+ if add_prop_values:
+ if len(add_prop_values) > 100000:
+ log.warning("The number of props > 100000, please select which properties to vectorize.")
+ return removed_props_num, "0 (because of exceeding limit)"
+ if to_update_remove:
+ update_remove_prop_values = [prop_set for _, prop_set in to_update_remove]
+ inner_removed_num = self.prop_index.remove(update_remove_prop_values)
+ self.prop_index.to_index_file(self.index_dir_prop)
+ log.info("In to_update: Removed %s outdated property set", inner_removed_num)
+ added_props_embeddings = asyncio.run(self._get_embeddings_parallel(add_prop_values))
+ self.prop_index.add(added_props_embeddings, add_propsets)
+ log.info("Added %s new or updated property embeddings", len(added_props_embeddings))
+ self.prop_index.to_index_file(self.index_dir_prop)
+ added_props_vector_num = len(to_add)
+ else:
+ log.debug("No update props to build vector index.")
+ added_props_vector_num = 0
+
+ return removed_props_num, added_props_vector_num
+
+ def run(self, context: dict[str, Any]) -> dict[str, Any]: # pylint: disable=too-many-statements, too-many-branches
+ vertexlabels = self.sm.schema.getSchema()["vertexlabels"]
+ all_pk_flag = all(data.get('id_strategy') == 'PRIMARY_KEY' for data in vertexlabels)
+
+ removed_vid_num, added_vid_num = self._update_vid_index(context, all_pk_flag)
context.update({
- "removed_vid_vector_num": removed_num,
- "added_vid_vector_num": len(added_vids)
+ "removed_vid_vector_num": removed_vid_num,
+ "added_vid_vector_num": added_vid_num,
})
+
+ if context["index_labels"]:
+ removed_prop_num, added_prop_num = self._update_property_index(context)
+ context.update({
+ "removed_props_num": removed_prop_num,
+ "added_props_vector_num": added_prop_num,
+ })
+
return context
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/index_op/semantic_id_query.py b/hugegraph-llm/src/hugegraph_llm/operators/index_op/semantic_id_query.py
index e3375ef02..7102731cf 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/index_op/semantic_id_query.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/index_op/semantic_id_query.py
@@ -17,7 +17,7 @@
import os
-from typing import Dict, Any, Literal, List, Tuple
+from typing import Dict, Any, Literal, List, Tuple, Union, FrozenSet
from hugegraph_llm.config import resource_path, huge_settings
from hugegraph_llm.indices.vector_index import VectorIndex
@@ -38,7 +38,9 @@ def __init__(
vector_dis_threshold: float = huge_settings.vector_dis_threshold,
):
self.index_dir = str(os.path.join(resource_path, huge_settings.graph_name, "graph_vids"))
+ self.index_dir_prop = str(os.path.join(resource_path, huge_settings.graph_name, "graph_props"))
self.vector_index = VectorIndex.from_index_file(self.index_dir)
+ self.prop_index = VectorIndex.from_index_file(self.index_dir_prop)
self.embedding = embedding
self.by = by
self.topk_per_query = topk_per_query
@@ -49,8 +51,10 @@ def __init__(
graph=huge_settings.graph_name,
user=huge_settings.graph_user,
pwd=huge_settings.graph_pwd,
+ token=huge_settings.graph_token,
graphspace=huge_settings.graph_space,
)
+ self.schema = self._client.schema()
def _exact_match_vids(self, keywords: List[str]) -> Tuple[List[str], List[str]]:
assert keywords, "keywords can't be empty, please check the logic"
@@ -75,18 +79,57 @@ def _exact_match_vids(self, keywords: List[str]) -> Tuple[List[str], List[str]]:
def _fuzzy_match_vids(self, keywords: List[str]) -> List[str]:
fuzzy_match_result = []
for keyword in keywords:
- keyword_vector = self.embedding.get_text_embedding(keyword)
- results = self.vector_index.search(keyword_vector, top_k=self.topk_per_keyword,
+ keyword_vector = self.embedding.get_texts_embeddings([keyword])
+ results = self.vector_index.search(keyword_vector[0], top_k=self.topk_per_keyword,
dis_threshold=float(self.vector_dis_threshold))
if results:
fuzzy_match_result.extend(results[:self.topk_per_keyword])
return fuzzy_match_result
+ def _exact_match_properties(self, keywords: List[str]) -> Tuple[List[str], List[str]]:
+ property_keys = self.schema.getPropertyKeys()
+ log.debug("property_keys: %s", property_keys)
+ matched_properties = set()
+ unmatched_keywords = set(keywords)
+ for key in property_keys:
+ for keyword in list(unmatched_keywords):
+ gremlin_query = f"g.V().has('{key.name}', '{keyword}').limit(1)"
+ log.debug("prop Gremlin query: %s", gremlin_query)
+ resp = self._client.gremlin().exec(gremlin_query)
+ if resp.get("data"):
+ matched_properties.add((key.name, keyword))
+ unmatched_keywords.remove(keyword)
+ return list(matched_properties), list(unmatched_keywords)
+
+ def _fuzzy_match_props(self, keywords: List[str]) -> List[str]:
+ fuzzy_match_result = []
+ for keyword in keywords:
+ keyword_vector = self.embedding.get_texts_embeddings([keyword])
+ results = self.prop_index.search(keyword_vector[0], top_k=self.topk_per_keyword,
+ dis_threshold=float(self.vector_dis_threshold))
+ if results:
+ fuzzy_match_result.extend(results[:self.topk_per_keyword])
+ return fuzzy_match_result
+
+ def _reformat_mixed_list_to_unique_tuples(
+ self, mixed_data_list: List[Union[FrozenSet[Tuple[str, str]], Tuple[str, str]]]
+ ) -> List[Tuple[str, str]]:
+ unique_tuples = set()
+ for item in mixed_data_list:
+ if isinstance(item, (frozenset, set)):
+ for prop_tuple in item:
+ if isinstance(prop_tuple, tuple) and len(prop_tuple) == 2:
+ unique_tuples.add(prop_tuple)
+ elif isinstance(item, tuple):
+ if len(item) == 2:
+ unique_tuples.add(item)
+ return list(unique_tuples)
+
def run(self, context: Dict[str, Any]) -> Dict[str, Any]:
graph_query_list = set()
if self.by == "query":
query = context["query"]
- query_vector = self.embedding.get_text_embedding(query)
+ query_vector = self.embedding.get_texts_embeddings([query])
results = self.vector_index.search(query_vector, top_k=self.topk_per_query)
if results:
graph_query_list.update(results[:self.topk_per_query])
@@ -95,11 +138,22 @@ def run(self, context: Dict[str, Any]) -> Dict[str, Any]:
if not keywords:
context["match_vids"] = []
return context
-
exact_match_vids, unmatched_vids = self._exact_match_vids(keywords)
graph_query_list.update(exact_match_vids)
fuzzy_match_vids = self._fuzzy_match_vids(unmatched_vids)
log.debug("Fuzzy match vids: %s", fuzzy_match_vids)
graph_query_list.update(fuzzy_match_vids)
+ index_labels = self.schema.getIndexLabels()
+ if index_labels:
+ props_list = set()
+ exact_match_props, unmatched_props = self._exact_match_properties(keywords)
+ log.debug("Exact match props: %s", exact_match_props)
+ props_list.update(exact_match_props)
+ fuzzy_match_props = self._fuzzy_match_props(unmatched_props)
+ log.debug("Fuzzy match props: %s", fuzzy_match_props)
+ props_list.update(fuzzy_match_props)
+ props_list = self._reformat_mixed_list_to_unique_tuples(props_list)
+ context["match_props"] = list(props_list)
+ log.debug("Match props: %s", context["match_props"])
context["match_vids"] = list(graph_query_list)
return context
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/index_op/test.py b/hugegraph-llm/src/hugegraph_llm/operators/index_op/test.py
new file mode 100644
index 000000000..09cb250fd
--- /dev/null
+++ b/hugegraph-llm/src/hugegraph_llm/operators/index_op/test.py
@@ -0,0 +1,242 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import asyncio
+import os
+from collections import defaultdict
+from typing import Any
+
+from tqdm import tqdm
+
+from hugegraph_llm.config import resource_path, huge_settings
+from hugegraph_llm.indices.vector_index import VectorIndex
+from hugegraph_llm.models.embeddings.base import BaseEmbedding
+from hugegraph_llm.operators.hugegraph_op.schema_manager import SchemaManager
+from hugegraph_llm.utils.log import log
+from pyhugegraph.client import PyHugeClient
+
+INDEX_PROPERTY_GREMLIN = """
+g.V().hasLabel('{label}')
+ .limit(100000)
+ .project('vid', 'properties')
+ .by(id())
+ .by(valueMap({fields}))
+ .toList()
+"""
+
+class BuildSemanticIndex:
+ def __init__(self, embedding: BaseEmbedding):
+ self.index_dir = str(os.path.join(resource_path, huge_settings.graph_name, "graph_vids"))
+ self.index_dir_prop = str(os.path.join(resource_path, huge_settings.graph_name, "graph_props"))
+ self.vid_index = VectorIndex.from_index_file(self.index_dir)
+ self.prop_index = VectorIndex.from_index_file(self.index_dir_prop)
+ self.embedding = embedding
+ self.sm = SchemaManager(huge_settings.graph_name)
+ self.client = PyHugeClient(
+ url=huge_settings.graph_url,
+ graph=huge_settings.graph_name,
+ user=huge_settings.graph_user,
+ pwd=huge_settings.graph_pwd,
+ token=huge_settings.graph_token,
+ graphspace=huge_settings.graph_space,
+ )
+
+ def _extract_names(self, vertices: list[str]) -> list[str]:
+ return [v.split(":")[1] for v in vertices]
+
+ async def _get_embeddings_parallel(self, vids: list[str]) -> list[Any]:
+ sem = asyncio.Semaphore(10)
+ batch_size = 1000
+
+ # TODO: refactor the logic here (call async method)
+ async def get_embeddings_with_semaphore(vid_list: list[str]) -> Any:
+ # Executes sync embedding method in a thread pool via loop.run_in_executor, combining async programming
+ # with multi-threading capabilities.
+ # This pattern avoids blocking the event loop and prepares for a future fully async pipeline.
+ async with sem:
+ loop = asyncio.get_running_loop()
+ # FIXME: [PR-238] add & use async_get_texts_embedding instead of sync method
+ return await loop.run_in_executor(None, self.embedding.get_texts_embeddings, vid_list)
+
+ # Split vids into batches of size batch_size
+ vid_batches = [vids[i:i + batch_size] for i in range(0, len(vids), batch_size)]
+
+ # Create tasks for each batch
+ tasks = [get_embeddings_with_semaphore(batch) for batch in vid_batches]
+
+ embeddings = []
+ with tqdm(total=len(tasks)) as pbar:
+ for future in asyncio.as_completed(tasks):
+ batch_embeddings = await future
+ embeddings.extend(batch_embeddings) # Extend the list with batch results
+ pbar.update(1)
+ return embeddings
+
+ def diff_property_sets(
+ self,
+ present_prop_value_to_propset: dict,
+ past_prop_value_to_propset: dict
+ ):
+ to_add = []
+ to_update = []
+ to_update_remove = []
+ to_remove_keys = set(past_prop_value_to_propset) - set(present_prop_value_to_propset)
+ to_remove = [past_prop_value_to_propset[k] for k in to_remove_keys]
+ for prop_value, present_propset in present_prop_value_to_propset.items():
+ past_propset = past_prop_value_to_propset.get(prop_value)
+ if past_propset is None:
+ to_add.append((prop_value, present_propset))
+ elif present_propset != past_propset:
+ to_update.append((prop_value, present_propset))
+ to_update_remove.append((prop_value, past_propset))
+ return to_add, to_update, to_remove, to_update_remove
+
+ def get_present_props(self, context: dict[str, Any]) -> dict[str, frozenset[tuple[str, str]]]:
+ results = []
+ for item in context["index_labels"]:
+ label = item["base_value"]
+ fields = item["fields"]
+ fields_str_list = [f"'{field}'" for field in fields]
+ fields_for_query = ", ".join(fields_str_list)
+ gremlin_query = INDEX_PROPERTY_GREMLIN.format(
+ label=label,
+ fields=fields_for_query
+ )
+ log.debug("gremlin_query: %s", gremlin_query)
+ result = self.client.gremlin().exec(gremlin=gremlin_query)["data"]
+ results.extend(result)
+ orig_present_prop_value_to_propset = defaultdict(set)
+ for item in results:
+ properties = item["properties"]
+ for prop_key, values in properties.items():
+ if not values:
+ continue
+ prop_value = str(values[0])
+ orig_present_prop_value_to_propset[prop_value].add((prop_key, prop_value))
+ present_prop_value_to_propset = {
+ k: frozenset(v)
+ for k, v in orig_present_prop_value_to_propset.items()
+ }
+ return present_prop_value_to_propset
+
+ def get_past_props(self) -> dict[str, frozenset[tuple[str, str]]]:
+ orig_past_prop_value_to_propset = defaultdict(set)
+ for propset in self.prop_index.properties:
+ for _, prop_value in propset:
+ orig_past_prop_value_to_propset[prop_value].update(propset)
+ past_prop_value_to_propset = {
+ k: frozenset(v)
+ for k, v in orig_past_prop_value_to_propset.items()
+ }
+ return past_prop_value_to_propset
+
+ def _update_vertex_index(self, context: dict[str, Any], all_pk_flag: bool) -> tuple[int, int]:
+ past_vids = self.vid_index.properties
+ # TODO: We should build vid vector index separately, especially when the vertices may be very large
+ present_vids = context["vertices"] # Warning: data truncated by fetch_graph_data.py
+ removed_vids = set(past_vids) - set(present_vids)
+ removed_num = self.vid_index.remove(removed_vids)
+ if removed_num:
+ self.vid_index.to_index_file(self.index_dir)
+ added_vids = list(set(present_vids) - set(past_vids))
+
+ if added_vids:
+ vids_to_process = self._extract_names(added_vids) if all_pk_flag else added_vids
+ added_embeddings = asyncio.run(self._get_embeddings_parallel(vids_to_process))
+ log.info("Building vector index for %s vertices...", len(added_vids))
+ self.vid_index.add(added_embeddings, added_vids)
+ self.vid_index.to_index_file(self.index_dir)
+ else:
+ log.debug("No update vertices to build vector index.")
+ return removed_num, len(added_vids)
+
+ def _update_property_index(self, context: dict[str, Any]) -> tuple[int, int | str]:
+ removed_props_num = 0
+ added_props_vector_num: int | str = 0
+
+ present_prop_value_to_propset = self.get_present_props(context)
+ # log.debug("present_prop_value_to_propset: %s", present_prop_value_to_propset)
+ past_prop_value_to_propset = self.get_past_props()
+ # log.debug("past_prop_value_to_propset: %s", past_prop_value_to_propset)
+ to_add, to_update, to_remove, to_update_remove = self.diff_property_sets(
+ present_prop_value_to_propset,
+ past_prop_value_to_propset
+ )
+ log.debug("to_add: %s", to_add)
+ log.debug("to_update: %s", to_update)
+ log.debug("to_remove: %s", to_remove)
+ log.debug("to_update_remove: %s", to_update_remove)
+ log.info("Removing %s outdated property value", len(to_remove))
+ removed_props_num = self.prop_index.remove(to_remove)
+ if removed_props_num:
+ self.prop_index.to_index_file(self.index_dir_prop)
+ all_to_add = to_add + to_update
+ add_propsets = []
+ add_prop_values = []
+ for prop_value, propset in all_to_add:
+ add_propsets.append(propset)
+ add_prop_values.append(prop_value)
+ if add_prop_values:
+ if len(add_prop_values) > 100000:
+ log.warning("The number of props > 100000, please select which properties to vectorize.")
+ # context.update({
+ # "removed_props_num": removed_props_num,
+ # "added_props_vector_num": "0 (because of exceeding limit)"
+ # })
+ return removed_props_num, "0 (because of exceeding limit)"
+ if to_update_remove:
+ update_remove_prop_values = [prop_set for _, prop_set in to_update_remove]
+ # removed_num variable was conflicting, renamed to inner_removed_num
+ inner_removed_num = self.prop_index.remove(update_remove_prop_values)
+ self.prop_index.to_index_file(self.index_dir_prop)
+ log.info("In to_update: Removed %s outdated property set", inner_removed_num)
+ added_props_embeddings = asyncio.run(self._get_embeddings_parallel(add_prop_values))
+ self.prop_index.add(added_props_embeddings, add_propsets)
+ log.info("Added %s new or updated property embeddings", len(added_props_embeddings))
+ self.prop_index.to_index_file(self.index_dir_prop)
+ added_props_vector_num = len(to_add) # As per original logic
+ else:
+ log.debug("No update props to build vector index.")
+ added_props_vector_num = 0 # no items added
+
+ return removed_props_num, added_props_vector_num
+
+ def run(self, context: dict[str, Any]) -> dict[str, Any]:
+ vertexlabels = self.sm.schema.getSchema()["vertexlabels"]
+ all_pk_flag = all(data.get('id_strategy') == 'PRIMARY_KEY' for data in vertexlabels)
+
+ removed_vid_num, added_vid_num = self._update_vertex_index(context, all_pk_flag)
+ context.update({
+ "removed_vid_vector_num": removed_vid_num,
+ "added_vid_vector_num": added_vid_num,
+ })
+
+ if context["index_labels"]:
+ removed_prop_num, added_prop_num = self._update_property_index(context)
+ context.update({
+ "removed_props_num": removed_prop_num,
+ "added_props_vector_num": added_prop_num,
+ })
+ # else: # This else block is not in the original code, so commenting out
+ # context.update({
+ # "removed_props_num": 0,
+ # "added_props_vector_num": 0,
+ # })
+
+
+ return context
diff --git a/hugegraph-llm/src/hugegraph_llm/operators/llm_op/gremlin_generate.py b/hugegraph-llm/src/hugegraph_llm/operators/llm_op/gremlin_generate.py
index 09e01e5ee..57cdafa7d 100644
--- a/hugegraph-llm/src/hugegraph_llm/operators/llm_op/gremlin_generate.py
+++ b/hugegraph-llm/src/hugegraph_llm/operators/llm_op/gremlin_generate.py
@@ -33,6 +33,7 @@ def __init__(
schema: Optional[Union[dict, str]] = None,
vertices: Optional[List[str]] = None,
gremlin_prompt: Optional[str] = None,
+ properties: Optional[List[tuple]] = None
) -> None:
self.llm = llm or LLMs().get_text2gql_llm()
if isinstance(schema, dict):
@@ -40,11 +41,14 @@ def __init__(
self.schema = schema
self.vertices = vertices
self.gremlin_prompt = gremlin_prompt or prompt.gremlin_generate_prompt
+ self.properties = properties
def _extract_response(self, response: str, label: str = "gremlin") -> str:
match = re.search(f"```{label}(.*?)```", response, re.DOTALL)
- assert match is not None, f"No {label} found in response: {response}"
- return match.group(1).strip()
+ if match:
+ return match.group(1).strip()
+ else:
+ return response.strip()
def _format_examples(self, examples: Optional[List[Dict[str, str]]]) -> Optional[str]:
if not examples:
@@ -70,6 +74,7 @@ async def async_generate(self, context: Dict[str, Any]):
schema=self.schema,
example=self._format_examples(examples=raw_example),
vertices=self._format_vertices(vertices=self.vertices),
+ properties=self.properties
)
async_tasks["raw_answer"] = asyncio.create_task(self.llm.agenerate(prompt=raw_prompt))
@@ -79,6 +84,7 @@ async def async_generate(self, context: Dict[str, Any]):
schema=self.schema,
example=self._format_examples(examples=examples),
vertices=self._format_vertices(vertices=self.vertices),
+ properties=self.properties
)
async_tasks["initialized_answer"] = asyncio.create_task(self.llm.agenerate(prompt=init_prompt))
@@ -100,6 +106,7 @@ def sync_generate(self, context: Dict[str, Any]):
schema=self.schema,
example=self._format_examples(examples=raw_example),
vertices=self._format_vertices(vertices=self.vertices),
+ properties=self.properties
)
raw_response = self.llm.generate(prompt=raw_prompt)
@@ -109,6 +116,7 @@ def sync_generate(self, context: Dict[str, Any]):
schema=self.schema,
example=self._format_examples(examples=examples),
vertices=self._format_vertices(vertices=self.vertices),
+ properties=self.properties
)
initialized_response = self.llm.generate(prompt=init_prompt)
diff --git a/hugegraph-llm/src/hugegraph_llm/utils/graph_index_utils.py b/hugegraph-llm/src/hugegraph_llm/utils/graph_index_utils.py
index bb9e3c886..f3a22bde2 100644
--- a/hugegraph-llm/src/hugegraph_llm/utils/graph_index_utils.py
+++ b/hugegraph-llm/src/hugegraph_llm/utils/graph_index_utils.py
@@ -47,9 +47,10 @@ def get_graph_index_info():
def clean_all_graph_index():
VectorIndex.clean(str(os.path.join(resource_path, huge_settings.graph_name, "graph_vids")))
+ VectorIndex.clean(str(os.path.join(resource_path, huge_settings.graph_name, "graph_props")))
VectorIndex.clean(str(os.path.join(resource_path, "gremlin_examples")))
- log.warning("Clear graph index and text2gql index successfully!")
- gr.Info("Clear graph index and text2gql index successfully!")
+ log.warning("Clear graph index, property index and text2gql index successfully!")
+ gr.Info("Clear graph index, property index and text2gql index successfully!")
def clean_all_graph_data():
@@ -111,7 +112,12 @@ def update_vid_embedding():
context = builder.run()
removed_num = context["removed_vid_vector_num"]
added_num = context["added_vid_vector_num"]
- return f"Removed {removed_num} vectors, added {added_num} vectors."
+ if context["index_labels"]:
+ removed_prop_num = context["removed_props_num"]
+ added_prop_num = context["added_props_vector_num"]
+ return (f"Removed {removed_num} vid vectors, added {added_num} vid vectors.\n"
+ f"Removed {removed_prop_num} prop vectors, added {added_prop_num} prop vectors.")
+ return f"Removed {removed_num} vid vectors, added {added_num} vid vectors."
except Exception as e: # pylint: disable=broad-exception-caught
log.error(e)
raise gr.Error(str(e))
diff --git a/hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py b/hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py
index 1d02b45d3..ea39fc913 100644
--- a/hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py
+++ b/hugegraph-llm/src/hugegraph_llm/utils/hugegraph_utils.py
@@ -43,6 +43,7 @@ def get_hg_client():
graph=huge_settings.graph_name,
user=huge_settings.graph_user,
pwd=huge_settings.graph_pwd,
+ token=huge_settings.graph_token,
graphspace=huge_settings.graph_space,
)
diff --git a/hugegraph-llm/src/hugegraph_llm/utils/vector_index_utils.py b/hugegraph-llm/src/hugegraph_llm/utils/vector_index_utils.py
index ef2b5e9b8..b13646b85 100644
--- a/hugegraph-llm/src/hugegraph_llm/utils/vector_index_utils.py
+++ b/hugegraph-llm/src/hugegraph_llm/utils/vector_index_utils.py
@@ -59,12 +59,13 @@ def read_documents(input_file, input_text):
def get_vector_index_info():
chunk_vector_index = VectorIndex.from_index_file(str(os.path.join(resource_path, huge_settings.graph_name, "chunks")))
graph_vid_vector_index = VectorIndex.from_index_file(str(os.path.join(resource_path, huge_settings.graph_name, "graph_vids")))
+ graph_prop_vector_index = VectorIndex.from_index_file(str(os.path.join(resource_path, huge_settings.graph_name, "graph_props")))
return json.dumps({
"embed_dim": chunk_vector_index.index.d,
"vector_info": {
"chunk_vector_num": chunk_vector_index.index.ntotal,
"graph_vid_vector_num": graph_vid_vector_index.index.ntotal,
- "graph_properties_vector_num": len(chunk_vector_index.properties)
+ "graph_properties_vector_num": graph_prop_vector_index.index.ntotal,
}
}, ensure_ascii=False, indent=2)
diff --git a/hugegraph-python-client/README.md b/hugegraph-python-client/README.md
index d2820164a..3adee4570 100644
--- a/hugegraph-python-client/README.md
+++ b/hugegraph-python-client/README.md
@@ -172,12 +172,6 @@ Other info are under 🚧 (Welcome to add more docs for it, users could refer [j
## Contributing
-* Welcome to contribute to `hugegraph-python-client`. Please see the [Guidelines](https://hugegraph.apache.org/docs/contribution-guidelines/) for more information.
* Code format: Please run `./style/code_format_and_analysis.sh` to format your code before submitting a PR.
Thank you to all the people who already contributed to `hugegraph-python-client`!
-
-## Contact Us
-
-* [GitHub Issues](https://github.com/apache/incubator-hugegraph-ai/issues): Feedback on usage issues and functional requirements (quick response)
-* Feedback Email: [dev@hugegraph.apache.org]() (subscriber only)
diff --git a/hugegraph-python-client/src/pyhugegraph/client.py b/hugegraph-python-client/src/pyhugegraph/client.py
index 3b0301321..1c10a8e55 100644
--- a/hugegraph-python-client/src/pyhugegraph/client.py
+++ b/hugegraph-python-client/src/pyhugegraph/client.py
@@ -29,6 +29,7 @@
from pyhugegraph.api.version import VersionManager
from pyhugegraph.utils.huge_config import HGraphConfig
from pyhugegraph.utils.huge_requests import HGraphSession
+from pyhugegraph.utils.log import log
T = TypeVar("T")
@@ -50,12 +51,30 @@ def __init__(
self,
url: str,
graph: str,
- user: str,
- pwd: str,
+ user: Optional[str] = None,
+ pwd: Optional[str] = None,
+ token: Optional[str] = None,
graphspace: Optional[str] = None,
timeout: Optional[tuple[float, float]] = None
):
- self.cfg = HGraphConfig(url, user, pwd, graph, graphspace, timeout or (0.5, 15.0))
+ if token is None and (user is None or pwd is None):
+ raise ValueError("Either a token or both username and password must be provided.")
+
+ if token is not None and (user is not None or pwd is not None):
+ log.warning(
+ "Both token and username/password are provided. "
+ "Token will be used for authentication."
+ )
+
+ self.cfg = HGraphConfig(
+ url=url,
+ graph_name=graph,
+ token=token,
+ username=user,
+ password=pwd,
+ graphspace=graphspace,
+ timeout=timeout or (0.5, 15.0)
+ )
@manager_builder
def schema(self) -> "SchemaManager":
diff --git a/hugegraph-python-client/src/pyhugegraph/utils/huge_config.py b/hugegraph-python-client/src/pyhugegraph/utils/huge_config.py
index 3f6d78b95..6e76dd25a 100644
--- a/hugegraph-python-client/src/pyhugegraph/utils/huge_config.py
+++ b/hugegraph-python-client/src/pyhugegraph/utils/huge_config.py
@@ -29,9 +29,10 @@
@dataclass
class HGraphConfig:
url: str
- username: str
- password: str
graph_name: str
+ token: Optional[str] = None
+ username: Optional[str] = None
+ password: Optional[str] = None
graphspace: Optional[str] = None
timeout: tuple[float, float] = (0.5, 15.0)
gs_supported: bool = field(default=False, init=False)
diff --git a/hugegraph-python-client/src/pyhugegraph/utils/huge_requests.py b/hugegraph-python-client/src/pyhugegraph/utils/huge_requests.py
index 4d99a0e45..4f6afa14b 100644
--- a/hugegraph-python-client/src/pyhugegraph/utils/huge_requests.py
+++ b/hugegraph-python-client/src/pyhugegraph/utils/huge_requests.py
@@ -50,6 +50,10 @@ def __init__(
self._backoff_factor = backoff_factor
self._status_forcelist = status_forcelist
self._auth = (cfg.username, cfg.password)
+ if cfg.token:
+ self._token = cfg.token
+ else:
+ self._token = None
self._headers = {"Content-Type": Constants.HEADER_CONTENT_TYPE}
self._timeout = cfg.timeout
self._session = session if session else requests.Session()
@@ -140,10 +144,15 @@ def request(
**kwargs: Any,
) -> dict:
url = self.resolve(path)
+ request_headers = self._headers.copy()
+ auth = self._auth
+ if self._token:
+ request_headers['Authorization'] = f"Bearer {self._token}"
+ auth = None
response: requests.Response = getattr(self._session, method.lower())(
url,
- auth=self._auth,
- headers=self._headers,
+ auth=auth,
+ headers=request_headers,
timeout=self._timeout,
**kwargs,
)
diff --git a/vermeer-python-client/README.md b/vermeer-python-client/README.md
new file mode 100644
index 000000000..7eec5a866
--- /dev/null
+++ b/vermeer-python-client/README.md
@@ -0,0 +1,26 @@
+# vermeer-python-client
+
+The `vermeer-python-client` is a Python client(SDK) for Vermeer.
+
+
+## Installation
+
+To install the `vermeer-python-client`, you can use pip/poetry/source building:
+
+```bash
+#todo
+```
+
+### Install from Source (Latest Code)
+
+To install from the source, clone the repository and install the required dependencies:
+
+```bash
+#todo
+```
+
+## Usage
+
+```bash
+#todo
+```
\ No newline at end of file
diff --git a/vermeer-python-client/requirements.txt b/vermeer-python-client/requirements.txt
new file mode 100644
index 000000000..75f1d244b
--- /dev/null
+++ b/vermeer-python-client/requirements.txt
@@ -0,0 +1,6 @@
+decorator~=5.1.1
+requests~=2.32.0
+setuptools~=70.0.0
+urllib3~=2.2.2
+rich~=13.9.4
+python-dateutil~=2.9.0
\ No newline at end of file
diff --git a/vermeer-python-client/setup.py b/vermeer-python-client/setup.py
new file mode 100644
index 000000000..ad53fbb17
--- /dev/null
+++ b/vermeer-python-client/setup.py
@@ -0,0 +1,29 @@
+# !/usr/bin/env python3
+"""
+file:setup.py
+author: wenyuxuan@baidu.com
+"""
+import setuptools
+from pkg_resources import parse_requirements
+
+with open("README.md", "r", encoding="utf-8") as fh:
+ long_description = fh.read()
+
+with open("requirements.txt", encoding="utf-8") as fp:
+ install_requires = [str(requirement) for requirement in parse_requirements(fp)]
+
+setuptools.setup(
+ name="vermeer-python",
+ version="0.1.0",
+ install_requires=install_requires,
+ long_description=long_description,
+ long_description_content_type="text/markdown",
+ packages=setuptools.find_packages(where="src", exclude=["tests"]),
+ package_dir={"": "src"},
+ classifiers=[
+ "Programming Language :: Python :: 3",
+ "License :: OSI Approved :: Apache Software License",
+ "Operating System :: OS Independent",
+ ],
+ python_requires=">=3.9",
+)
diff --git a/vermeer-python-client/src/pyvermeer/__init__.py b/vermeer-python-client/src/pyvermeer/__init__.py
new file mode 100644
index 000000000..374f52b34
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/__init__.py
@@ -0,0 +1,5 @@
+# !/usr/bin/env python3
+"""
+file:__init__.py
+author: wenyuxuan@baidu.com
+"""
diff --git a/vermeer-python-client/src/pyvermeer/api/base.py b/vermeer-python-client/src/pyvermeer/api/base.py
new file mode 100644
index 000000000..0c53983ad
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/api/base.py
@@ -0,0 +1,29 @@
+# !/usr/bin/env python3
+"""
+file: base.py
+author: wenyuxuan@baidu.com
+"""
+
+from pyvermeer.utils.log import log
+
+
+class BaseModule:
+ """基类"""
+
+ def __init__(self, client):
+ self._client = client
+ self.log = log.getChild(__name__)
+
+ @property
+ def session(self):
+ """返回客户端的session对象"""
+ return self._client.session
+
+ def _send_request(self, method: str, endpoint: str, params: dict = None):
+ """统一请求入口"""
+ self.log.debug(f"Sending {method} to {endpoint}")
+ return self._client.send_request(
+ method=method,
+ endpoint=endpoint,
+ params=params
+ )
diff --git a/vermeer-python-client/src/pyvermeer/api/graph.py b/vermeer-python-client/src/pyvermeer/api/graph.py
new file mode 100644
index 000000000..173f5d275
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/api/graph.py
@@ -0,0 +1,28 @@
+# !/usr/bin/env python3
+"""
+file: graph.py
+author: wenyuxuan@baidu.com
+"""
+
+from pyvermeer.structure.graph_data import GraphsResponse, GraphResponse
+from .base import BaseModule
+
+
+class GraphModule(BaseModule):
+ """Graph"""
+
+ def get_graph(self, graph_name: str) -> GraphResponse:
+ """获取任务列表"""
+ response = self._send_request(
+ "GET",
+ f"/graphs/{graph_name}"
+ )
+ return GraphResponse(response)
+
+ def get_graphs(self) -> GraphsResponse:
+ """获取任务列表"""
+ response = self._send_request(
+ "GET",
+ "/graphs",
+ )
+ return GraphsResponse(response)
diff --git a/vermeer-python-client/src/pyvermeer/api/master.py b/vermeer-python-client/src/pyvermeer/api/master.py
new file mode 100644
index 000000000..e211a48ac
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/api/master.py
@@ -0,0 +1,5 @@
+# !/usr/bin/env python3
+"""
+file: master.py
+author: wenyuxuan@baidu.com
+"""
diff --git a/vermeer-python-client/src/pyvermeer/api/task.py b/vermeer-python-client/src/pyvermeer/api/task.py
new file mode 100644
index 000000000..b57c4a46f
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/api/task.py
@@ -0,0 +1,38 @@
+# !/usr/bin/env python3
+"""
+file: task.py
+author: wenyuxuan@baidu.com
+"""
+
+from pyvermeer.api.base import BaseModule
+
+from pyvermeer.structure.task_data import TasksResponse, TaskCreateRequest, TaskCreateResponse, TaskResponse
+
+
+class TaskModule(BaseModule):
+ """Task"""
+
+ def get_tasks(self) -> TasksResponse:
+ """获取任务列表"""
+ response = self._send_request(
+ "GET",
+ "/tasks"
+ )
+ return TasksResponse(response)
+
+ def get_task(self, task_id: int) -> TaskResponse:
+ """获取单个任务信息"""
+ response = self._send_request(
+ "GET",
+ f"/task/{task_id}"
+ )
+ return TaskResponse(response)
+
+ def create_task(self, create_task: TaskCreateRequest) -> TaskCreateResponse:
+ """创建新任务"""
+ response = self._send_request(
+ method="POST",
+ endpoint="/tasks/create",
+ params=create_task.to_dict()
+ )
+ return TaskCreateResponse(response)
diff --git a/vermeer-python-client/src/pyvermeer/api/worker.py b/vermeer-python-client/src/pyvermeer/api/worker.py
new file mode 100644
index 000000000..90e4abe98
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/api/worker.py
@@ -0,0 +1,5 @@
+# !/usr/bin/env python3
+"""
+file: worker.py
+author: wenyuxuan@baidu.com
+"""
diff --git a/vermeer-python-client/src/pyvermeer/client/__init__.py b/vermeer-python-client/src/pyvermeer/client/__init__.py
new file mode 100644
index 000000000..7c21a6a16
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/client/__init__.py
@@ -0,0 +1,5 @@
+# !/usr/bin/env python3
+"""
+file: __init__.py
+author: wenyuxuan@baidu.com
+"""
diff --git a/vermeer-python-client/src/pyvermeer/client/client.py b/vermeer-python-client/src/pyvermeer/client/client.py
new file mode 100644
index 000000000..1cac3d08e
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/client/client.py
@@ -0,0 +1,52 @@
+# !/usr/bin/env python3
+"""
+file: client.py
+author: wenyuxuan@baidu.com
+"""
+
+from typing import Dict
+from typing import Optional
+
+from pyvermeer.api.base import BaseModule
+from pyvermeer.api.graph import GraphModule
+from pyvermeer.api.task import TaskModule
+from pyvermeer.utils.log import log
+from pyvermeer.utils.vermeer_config import VermeerConfig
+from pyvermeer.utils.vermeer_requests import VermeerSession
+
+
+class PyVermeerClient:
+ """Vermeer API Client"""
+
+ def __init__(
+ self,
+ ip: str,
+ port: int,
+ token: str,
+ timeout: Optional[tuple[float, float]] = None,
+ log_level: str = "INFO",
+ ):
+ """初始化客户端,包括配置和会话管理
+ :param ip:
+ :param port:
+ :param token:
+ :param timeout:
+ :param log_level:
+ """
+ self.cfg = VermeerConfig(ip, port, token, timeout)
+ self.session = VermeerSession(self.cfg)
+ self._modules: Dict[str, BaseModule] = {
+ "graph": GraphModule(self),
+ "tasks": TaskModule(self)
+ }
+ log.setLevel(log_level)
+
+ def __getattr__(self, name):
+ """通过属性访问模块"""
+ if name in self._modules:
+ return self._modules[name]
+ raise AttributeError(f"Module {name} not found")
+
+ def send_request(self, method: str, endpoint: str, params: dict = None):
+ """统一请求方法"""
+ return self.session.request(method, endpoint, params)
diff --git a/vermeer-python-client/src/pyvermeer/demo/task_demo.py b/vermeer-python-client/src/pyvermeer/demo/task_demo.py
new file mode 100644
index 000000000..637eb3614
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/demo/task_demo.py
@@ -0,0 +1,42 @@
+# !/usr/bin/env python3
+"""
+file: task_demo.py
+author: wenyuxuan@baidu.com
+"""
+
+from pyvermeer.client.client import PyVermeerClient
+from pyvermeer.structure.task_data import TaskCreateRequest
+
+
+def main():
+ """main"""
+ client = PyVermeerClient(
+ ip="10.41.57.139",
+ port=8688,
+ token="Q7svB13nYvREB4bDCj7kQnwJEMvLgrgfDimu4h1Fp7CUzQLk758ya1EYwycn1kjbgskiHiKzDni9jEkJcssgTy7rZJdt4gYEkfvjeowZGzSebgiSEU86dgFPXzUUtwrA81vWKm1xfioBcS9GmXjGQoM6C",
+ log_level="DEBUG",
+ )
+ task = client.tasks.get_tasks()
+
+ print(task.to_dict())
+
+ create_response = client.tasks.create_task(
+ create_task=TaskCreateRequest(
+ task_type='load',
+ graph_name='DEFAULT-example',
+ params={
+ "load.hg_pd_peers": "[\"10.41.57.87:8686\"]",
+ "load.hugegraph_name": "DEFAULT/example/g",
+ "load.hugegraph_password": "xxx",
+ "load.hugegraph_username": "xxx",
+ "load.parallel": "10",
+ "load.type": "hugegraph"
+ },
+ )
+ )
+
+ print(create_response.to_dict())
+
+
+if __name__ == "__main__":
+ main()
diff --git a/vermeer-python-client/src/pyvermeer/structure/__init__.py b/vermeer-python-client/src/pyvermeer/structure/__init__.py
new file mode 100644
index 000000000..7c21a6a16
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/structure/__init__.py
@@ -0,0 +1,5 @@
+# !/usr/bin/env python3
+"""
+file: __init__.py
+author: wenyuxuan@baidu.com
+"""
diff --git a/vermeer-python-client/src/pyvermeer/structure/base_data.py b/vermeer-python-client/src/pyvermeer/structure/base_data.py
new file mode 100644
index 000000000..525f2f53d
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/structure/base_data.py
@@ -0,0 +1,39 @@
+# !/usr/bin/env python3
+"""
+file: base_data.py
+author: wenyuxuan@baidu.com
+"""
+
+RESPONSE_ERR = 1
+RESPONSE_OK = 0
+RESPONSE_NONE = -1
+
+
+class BaseResponse(object):
+ """
+ Base response class
+ """
+
+ def __init__(self, dic: dict):
+ """
+ init
+ :param dic:
+ """
+ self.__errcode = dic.get('errcode', RESPONSE_NONE)
+ self.__message = dic.get('message', "")
+
+ @property
+ def errcode(self) -> int:
+ """
+ get error code
+ :return:
+ """
+ return self.__errcode
+
+ @property
+ def message(self) -> str:
+ """
+ get message
+ :return:
+ """
+ return self.__message
diff --git a/vermeer-python-client/src/pyvermeer/structure/graph_data.py b/vermeer-python-client/src/pyvermeer/structure/graph_data.py
new file mode 100644
index 000000000..8c2af4db8
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/structure/graph_data.py
@@ -0,0 +1,244 @@
+# !/usr/bin/env python3
+"""
+file: graph_data.py
+author: wenyuxuan@baidu.com
+"""
+
+import datetime
+
+from pyvermeer.structure.base_data import BaseResponse
+from pyvermeer.utils.vermeer_datetime import parse_vermeer_time
+
+
+class BackendOpt:
+ """BackendOpt class"""
+
+ def __init__(self, dic: dict):
+ """init"""
+ self.__vertex_data_backend = dic.get('vertex_data_backend', None)
+
+ @property
+ def vertex_data_backend(self):
+ """vertex data backend"""
+ return self.__vertex_data_backend
+
+ def to_dict(self):
+ """to dict"""
+ return {
+ 'vertex_data_backend': self.vertex_data_backend
+ }
+
+
+class GraphWorker:
+ """GraphWorker"""
+
+ def __init__(self, dic: dict):
+ """init"""
+ self.__name = dic.get('Name', '')
+ self.__vertex_count = dic.get('VertexCount', -1)
+ self.__vert_id_start = dic.get('VertIdStart', -1)
+ self.__edge_count = dic.get('EdgeCount', -1)
+ self.__is_self = dic.get('IsSelf', False)
+ self.__scatter_offset = dic.get('ScatterOffset', -1)
+
+ @property
+ def name(self) -> str:
+ """graph worker name"""
+ return self.__name
+
+ @property
+ def vertex_count(self) -> int:
+ """vertex count"""
+ return self.__vertex_count
+
+ @property
+ def vert_id_start(self) -> int:
+ """vertex id start"""
+ return self.__vert_id_start
+
+ @property
+ def edge_count(self) -> int:
+ """edge count"""
+ return self.__edge_count
+
+ @property
+ def is_self(self) -> bool:
+ """is self worker. Nonsense """
+ return self.__is_self
+
+ @property
+ def scatter_offset(self) -> int:
+ """scatter offset"""
+ return self.__scatter_offset
+
+ def to_dict(self):
+ """to dict"""
+ return {
+ 'name': self.name,
+ 'vertex_count': self.vertex_count,
+ 'vert_id_start': self.vert_id_start,
+ 'edge_count': self.edge_count,
+ 'is_self': self.is_self,
+ 'scatter_offset': self.scatter_offset
+ }
+
+
+class VermeerGraph:
+ """VermeerGraph"""
+
+ def __init__(self, dic: dict):
+ """init"""
+ self.__name = dic.get('name', '')
+ self.__space_name = dic.get('space_name', '')
+ self.__status = dic.get('status', '')
+ self.__create_time = parse_vermeer_time(dic.get('create_time', ''))
+ self.__update_time = parse_vermeer_time(dic.get('update_time', ''))
+ self.__vertex_count = dic.get('vertex_count', 0)
+ self.__edge_count = dic.get('edge_count', 0)
+ self.__workers = [GraphWorker(w) for w in dic.get('workers', [])]
+ self.__worker_group = dic.get('worker_group', '')
+ self.__use_out_edges = dic.get('use_out_edges', False)
+ self.__use_property = dic.get('use_property', False)
+ self.__use_out_degree = dic.get('use_out_degree', False)
+ self.__use_undirected = dic.get('use_undirected', False)
+ self.__on_disk = dic.get('on_disk', False)
+ self.__backend_option = BackendOpt(dic.get('backend_option', {}))
+
+ @property
+ def name(self) -> str:
+ """graph name"""
+ return self.__name
+
+ @property
+ def space_name(self) -> str:
+ """space name"""
+ return self.__space_name
+
+ @property
+ def status(self) -> str:
+ """graph status"""
+ return self.__status
+
+ @property
+ def create_time(self) -> datetime:
+ """create time"""
+ return self.__create_time
+
+ @property
+ def update_time(self) -> datetime:
+ """update time"""
+ return self.__update_time
+
+ @property
+ def vertex_count(self) -> int:
+ """vertex count"""
+ return self.__vertex_count
+
+ @property
+ def edge_count(self) -> int:
+ """edge count"""
+ return self.__edge_count
+
+ @property
+ def workers(self) -> list[GraphWorker]:
+ """graph workers"""
+ return self.__workers
+
+ @property
+ def worker_group(self) -> str:
+ """worker group"""
+ return self.__worker_group
+
+ @property
+ def use_out_edges(self) -> bool:
+ """whether graph has out edges"""
+ return self.__use_out_edges
+
+ @property
+ def use_property(self) -> bool:
+ """whether graph has property"""
+ return self.__use_property
+
+ @property
+ def use_out_degree(self) -> bool:
+ """whether graph has out degree"""
+ return self.__use_out_degree
+
+ @property
+ def use_undirected(self) -> bool:
+ """whether graph is undirected"""
+ return self.__use_undirected
+
+ @property
+ def on_disk(self) -> bool:
+ """whether graph is on disk"""
+ return self.__on_disk
+
+ @property
+ def backend_option(self) -> BackendOpt:
+ """backend option"""
+ return self.__backend_option
+
+ def to_dict(self) -> dict:
+ """to dict"""
+ return {
+ 'name': self.__name,
+ 'space_name': self.__space_name,
+ 'status': self.__status,
+ 'create_time': self.__create_time.strftime("%Y-%m-%d %H:%M:%S") if self.__create_time else '',
+ 'update_time': self.__update_time.strftime("%Y-%m-%d %H:%M:%S") if self.__update_time else '',
+ 'vertex_count': self.__vertex_count,
+ 'edge_count': self.__edge_count,
+ 'workers': [w.to_dict() for w in self.__workers],
+ 'worker_group': self.__worker_group,
+ 'use_out_edges': self.__use_out_edges,
+ 'use_property': self.__use_property,
+ 'use_out_degree': self.__use_out_degree,
+ 'use_undirected': self.__use_undirected,
+ 'on_disk': self.__on_disk,
+ 'backend_option': self.__backend_option.to_dict(),
+ }
+
+
+class GraphsResponse(BaseResponse):
+ """GraphsResponse"""
+
+ def __init__(self, dic: dict):
+ """init"""
+ super().__init__(dic)
+ self.__graphs = [VermeerGraph(g) for g in dic.get('graphs', [])]
+
+ @property
+ def graphs(self) -> list[VermeerGraph]:
+ """graphs"""
+ return self.__graphs
+
+ def to_dict(self) -> dict:
+ """to dict"""
+ return {
+ 'errcode': self.errcode,
+ 'message': self.message,
+ 'graphs': [g.to_dict() for g in self.graphs]
+ }
+
+
+class GraphResponse(BaseResponse):
+ """GraphResponse"""
+
+ def __init__(self, dic: dict):
+ """init"""
+ super().__init__(dic)
+ self.__graph = VermeerGraph(dic.get('graph', {}))
+
+ @property
+ def graph(self) -> VermeerGraph:
+ """graph"""
+ return self.__graph
+
+ def to_dict(self) -> dict:
+ """to dict"""
+ return {
+ 'errcode': self.errcode,
+ 'message': self.message,
+ 'graph': self.graph.to_dict()
+ }
diff --git a/vermeer-python-client/src/pyvermeer/structure/master_data.py b/vermeer-python-client/src/pyvermeer/structure/master_data.py
new file mode 100644
index 000000000..27b0d96a4
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/structure/master_data.py
@@ -0,0 +1,71 @@
+# !/usr/bin/env python3
+"""
+file: master_data.py
+author: wenyuxuan@baidu.com
+"""
+
+import datetime
+
+from pyvermeer.structure.base_data import BaseResponse
+from pyvermeer.utils.vermeer_datetime import parse_vermeer_time
+
+
+class MasterInfo:
+ """Master 信息"""
+
+ def __init__(self, dic: dict):
+ """初始化函数"""
+ self.__grpc_peer = dic.get('grpc_peer', '')
+ self.__ip_addr = dic.get('ip_addr', '')
+ self.__debug_mod = dic.get('debug_mod', False)
+ self.__version = dic.get('version', '')
+ self.__launch_time = parse_vermeer_time(dic.get('launch_time', ''))
+
+ @property
+ def grpc_peer(self) -> str:
+ """gRPC地址"""
+ return self.__grpc_peer
+
+ @property
+ def ip_addr(self) -> str:
+ """IP地址"""
+ return self.__ip_addr
+
+ @property
+ def debug_mod(self) -> bool:
+ """是否为调试模式"""
+ return self.__debug_mod
+
+ @property
+ def version(self) -> str:
+ """Master 版本号"""
+ return self.__version
+
+ @property
+ def launch_time(self) -> datetime:
+ """Master 启动时间"""
+ return self.__launch_time
+
+ def to_dict(self):
+ """返回字典格式数据"""
+ return {
+ "grpc_peer": self.__grpc_peer,
+ "ip_addr": self.__ip_addr,
+ "debug_mod": self.__debug_mod,
+ "version": self.__version,
+ "launch_time": self.__launch_time.strftime("%Y-%m-%d %H:%M:%S") if self.__launch_time else ''
+ }
+
+
+class MasterResponse(BaseResponse):
+ """Master 响应"""
+
+ def __init__(self, dic: dict):
+ """初始化函数"""
+ super().__init__(dic)
+ self.__master_info = MasterInfo(dic['master_info'])
+
+ @property
+ def master_info(self) -> MasterInfo:
+ """获取主节点信息"""
+ return self.__master_info
diff --git a/vermeer-python-client/src/pyvermeer/structure/task_data.py b/vermeer-python-client/src/pyvermeer/structure/task_data.py
new file mode 100644
index 000000000..d93f756c2
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/structure/task_data.py
@@ -0,0 +1,215 @@
+# !/usr/bin/env python3
+"""
+file: task_data.py
+author: wenyuxuan@baidu.com
+"""
+
+import datetime
+
+from pyvermeer.structure.base_data import BaseResponse
+from pyvermeer.utils.vermeer_datetime import parse_vermeer_time
+
+
+class TaskWorker:
+ """task worker info"""
+
+ def __init__(self, dic):
+ """init"""
+ self.__name = dic.get('name', None)
+ self.__status = dic.get('status', None)
+
+ @property
+ def name(self) -> str:
+ """worker name"""
+ return self.__name
+
+ @property
+ def status(self) -> str:
+ """worker status"""
+ return self.__status
+
+ def to_dict(self):
+ """to dict"""
+ return {'name': self.name, 'status': self.status}
+
+
+class TaskInfo:
+ """task info"""
+
+ def __init__(self, dic):
+ """init"""
+ self.__id = dic.get('id', 0)
+ self.__status = dic.get('status', '')
+ self.__state = dic.get('state', '')
+ self.__create_user = dic.get('create_user', '')
+ self.__create_type = dic.get('create_type', '')
+ self.__create_time = parse_vermeer_time(dic.get('create_time', ''))
+ self.__start_time = parse_vermeer_time(dic.get('start_time', ''))
+ self.__update_time = parse_vermeer_time(dic.get('update_time', ''))
+ self.__graph_name = dic.get('graph_name', '')
+ self.__space_name = dic.get('space_name', '')
+ self.__type = dic.get('type', '')
+ self.__params = dic.get('params', {})
+ self.__workers = [TaskWorker(w) for w in dic.get('workers', [])]
+
+ @property
+ def id(self) -> int:
+ """task id"""
+ return self.__id
+
+ @property
+ def state(self) -> str:
+ """task state"""
+ return self.__state
+
+ @property
+ def create_user(self) -> str:
+ """task creator"""
+ return self.__create_user
+
+ @property
+ def create_type(self) -> str:
+ """task create type"""
+ return self.__create_type
+
+ @property
+ def create_time(self) -> datetime:
+ """task create time"""
+ return self.__create_time
+
+ @property
+ def start_time(self) -> datetime:
+ """task start time"""
+ return self.__start_time
+
+ @property
+ def update_time(self) -> datetime:
+ """task update time"""
+ return self.__update_time
+
+ @property
+ def graph_name(self) -> str:
+ """task graph"""
+ return self.__graph_name
+
+ @property
+ def space_name(self) -> str:
+ """task space"""
+ return self.__space_name
+
+ @property
+ def type(self) -> str:
+ """task type"""
+ return self.__type
+
+ @property
+ def params(self) -> dict:
+ """task params"""
+ return self.__params
+
+ @property
+ def workers(self) -> list[TaskWorker]:
+ """task workers"""
+ return self.__workers
+
+ def to_dict(self) -> dict:
+ """to dict"""
+ return {
+ 'id': self.__id,
+ 'status': self.__status,
+ 'state': self.__state,
+ 'create_user': self.__create_user,
+ 'create_type': self.__create_type,
+ 'create_time': self.__create_time.strftime("%Y-%m-%d %H:%M:%S") if self.__start_time else '',
+ 'start_time': self.__start_time.strftime("%Y-%m-%d %H:%M:%S") if self.__start_time else '',
+ 'update_time': self.__update_time.strftime("%Y-%m-%d %H:%M:%S") if self.__update_time else '',
+ 'graph_name': self.__graph_name,
+ 'space_name': self.__space_name,
+ 'type': self.__type,
+ 'params': self.__params,
+ 'workers': [w.to_dict() for w in self.__workers],
+ }
+
+
+class TaskCreateRequest:
+ """task create request"""
+
+ def __init__(self, task_type, graph_name, params):
+ """init"""
+ self.task_type = task_type
+ self.graph_name = graph_name
+ self.params = params
+
+ def to_dict(self) -> dict:
+ """to dict"""
+ return {
+ 'task_type': self.task_type,
+ 'graph': self.graph_name,
+ 'params': self.params
+ }
+
+
+class TaskCreateResponse(BaseResponse):
+ """task create response"""
+
+ def __init__(self, dic):
+ """init"""
+ super().__init__(dic)
+ self.__task = TaskInfo(dic.get('task', {}))
+
+ @property
+ def task(self) -> TaskInfo:
+ """task info"""
+ return self.__task
+
+ def to_dict(self) -> dict:
+ """to dict"""
+ return {
+ "errcode": self.errcode,
+ "message": self.message,
+ "task": self.task.to_dict(),
+ }
+
+
+class TasksResponse(BaseResponse):
+ """tasks response"""
+
+ def __init__(self, dic):
+ """init"""
+ super().__init__(dic)
+ self.__tasks = [TaskInfo(t) for t in dic.get('tasks', [])]
+
+ @property
+ def tasks(self) -> list[TaskInfo]:
+ """task infos"""
+ return self.__tasks
+
+ def to_dict(self) -> dict:
+ """to dict"""
+ return {
+ "errcode": self.errcode,
+ "message": self.message,
+ "tasks": [t.to_dict() for t in self.tasks]
+ }
+
+
+class TaskResponse(BaseResponse):
+ """task response"""
+
+ def __init__(self, dic):
+ """init"""
+ super().__init__(dic)
+ self.__task = TaskInfo(dic.get('task', {}))
+
+ @property
+ def task(self) -> TaskInfo:
+ """task info"""
+ return self.__task
+
+ def to_dict(self) -> dict:
+ """to dict"""
+ return {
+ "errcode": self.errcode,
+ "message": self.message,
+ "task": self.task.to_dict(),
+ }
diff --git a/vermeer-python-client/src/pyvermeer/structure/worker_data.py b/vermeer-python-client/src/pyvermeer/structure/worker_data.py
new file mode 100644
index 000000000..3b7aa0e59
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/structure/worker_data.py
@@ -0,0 +1,107 @@
+# !/usr/bin/env python3
+"""
+file: worker_data.py
+author: wenyuxuan@baidu.com
+"""
+
+import datetime
+
+from pyvermeer.structure.base_data import BaseResponse
+from pyvermeer.utils.vermeer_datetime import parse_vermeer_time
+
+
+class Worker:
+ """worker data"""
+
+ def __init__(self, dic):
+ """init"""
+ self.__id = dic.get('id', 0)
+ self.__name = dic.get('name', '')
+ self.__grpc_addr = dic.get('grpc_addr', '')
+ self.__ip_addr = dic.get('ip_addr', '')
+ self.__state = dic.get('state', '')
+ self.__version = dic.get('version', '')
+ self.__group = dic.get('group', '')
+ self.__init_time = parse_vermeer_time(dic.get('init_time', ''))
+ self.__launch_time = parse_vermeer_time(dic.get('launch_time', ''))
+
+ @property
+ def id(self) -> int:
+ """worker id"""
+ return self.__id
+
+ @property
+ def name(self) -> str:
+ """worker name"""
+ return self.__name
+
+ @property
+ def grpc_addr(self) -> str:
+ """gRPC address"""
+ return self.__grpc_addr
+
+ @property
+ def ip_addr(self) -> str:
+ """IP address"""
+ return self.__ip_addr
+
+ @property
+ def state(self) -> int:
+ """worker status"""
+ return self.__state
+
+ @property
+ def version(self) -> str:
+ """worker version"""
+ return self.__version
+
+ @property
+ def group(self) -> str:
+ """worker group"""
+ return self.__group
+
+ @property
+ def init_time(self) -> datetime:
+ """worker initialization time"""
+ return self.__init_time
+
+ @property
+ def launch_time(self) -> datetime:
+ """worker launch time"""
+ return self.__launch_time
+
+ def to_dict(self):
+ """convert object to dictionary"""
+ return {
+ "id": self.id,
+ "name": self.name,
+ "grpc_addr": self.grpc_addr,
+ "ip_addr": self.ip_addr,
+ "state": self.state,
+ "version": self.version,
+ "group": self.group,
+ "init_time": self.init_time,
+ "launch_time": self.launch_time,
+ }
+
+
+class WorkersResponse(BaseResponse):
+ """response of workers"""
+
+ def __init__(self, dic):
+ """init"""
+ super().__init__(dic)
+ self.__workers = [Worker(worker) for worker in dic['workers']]
+
+ @property
+ def workers(self) -> list[Worker]:
+ """list of workers"""
+ return self.__workers
+
+ def to_dict(self):
+ """convert object to dictionary"""
+ return {
+ "errcode": self.errcode,
+ "message": self.message,
+ "workers": [worker.to_dict() for worker in self.workers],
+ }
diff --git a/vermeer-python-client/src/pyvermeer/utils/__init__.py b/vermeer-python-client/src/pyvermeer/utils/__init__.py
new file mode 100644
index 000000000..0b14d2fed
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/utils/__init__.py
@@ -0,0 +1,5 @@
+# !/usr/bin/env python3
+"""
+file: __init__.py
+author: wenyuxuan@baidu.com
+"""
diff --git a/vermeer-python-client/src/pyvermeer/utils/exception.py b/vermeer-python-client/src/pyvermeer/utils/exception.py
new file mode 100644
index 000000000..a4237c922
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/utils/exception.py
@@ -0,0 +1,33 @@
+# !/usr/bin/env python3
+"""
+file: exceptions.py
+author: wenyuxuan@baidu.com
+"""
+
+
+class ConnectError(Exception):
+ """Raised when there is an issue connecting to the server."""
+
+ def __init__(self, message):
+ super().__init__(f"Connection error: {str(message)}")
+
+
+class TimeOutError(Exception):
+ """Raised when a request times out."""
+
+ def __init__(self, message):
+ super().__init__(f"Request timed out: {str(message)}")
+
+
+class JsonDecodeError(Exception):
+ """Raised when the response from the server cannot be decoded as JSON."""
+
+ def __init__(self, message):
+ super().__init__(f"Failed to decode JSON response: {str(message)}")
+
+
+class UnknownError(Exception):
+ """Raised for any other unknown errors."""
+
+ def __init__(self, message):
+ super().__init__(f"Unknown API error: {str(message)}")
diff --git a/vermeer-python-client/src/pyvermeer/utils/log.py b/vermeer-python-client/src/pyvermeer/utils/log.py
new file mode 100644
index 000000000..a7953c9fe
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/utils/log.py
@@ -0,0 +1,58 @@
+# !/usr/bin/env python3
+"""
+file: log.py
+author: wenyuxuan@baidu.com
+"""
+import logging
+import sys
+
+
+class VermeerLogger:
+ """vermeer API log"""
+ _instance = None
+
+ def __new__(cls, name: str = "VermeerClient"):
+ """new api logger"""
+ if cls._instance is None:
+ cls._instance = super().__new__(cls)
+ cls._instance._initialize(name)
+ return cls._instance
+
+ def _initialize(self, name: str):
+ """初始化日志配置"""
+ self.logger = logging.getLogger(name)
+ self.logger.setLevel(logging.INFO) # 默认级别
+
+ if not self.logger.handlers:
+ # 控制台输出格式
+ console_format = logging.Formatter(
+ '[%(asctime)s] [%(levelname)s] %(name)s - %(message)s',
+ datefmt='%Y-%m-%d %H:%M:%S'
+ )
+
+ # 控制台处理器
+ console_handler = logging.StreamHandler(sys.stdout)
+ console_handler.setLevel(logging.INFO) # 控制台默认级别
+ console_handler.setFormatter(console_format)
+
+ # file_handler = logging.FileHandler('api_client.log')
+ # file_handler.setLevel(logging.DEBUG)
+ # file_handler.setFormatter(
+ # logging.Formatter(
+ # '[%(asctime)s] [%(levelname)s] [%(threadName)s] %(name)s - %(message)s'
+ # )
+ # )
+
+ self.logger.addHandler(console_handler)
+ # self.logger.addHandler(file_handler)
+
+ self.logger.propagate = False
+
+ @classmethod
+ def get_logger(cls) -> logging.Logger:
+ """获取配置好的日志记录器"""
+ return cls().logger
+
+
+# 全局日志实例
+log = VermeerLogger.get_logger()
diff --git a/vermeer-python-client/src/pyvermeer/utils/vermeer_config.py b/vermeer-python-client/src/pyvermeer/utils/vermeer_config.py
new file mode 100644
index 000000000..79bb32628
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/utils/vermeer_config.py
@@ -0,0 +1,26 @@
+# !/usr/bin/env python3
+"""
+file: vermeer_config.py
+author: wenyuxuan@baidu.com
+"""
+
+
+class VermeerConfig:
+ """The configuration of a Vermeer instance."""
+ ip: str
+ port: int
+ token: str
+ factor: str
+ username: str
+ graph_space: str
+
+ def __init__(self,
+ ip: str,
+ port: int,
+ token: str,
+ timeout: tuple[float, float] = (0.5, 15.0)):
+ """Initialize the configuration for a Vermeer instance."""
+ self.ip = ip
+ self.port = port
+ self.token = token
+ self.timeout = timeout
diff --git a/vermeer-python-client/src/pyvermeer/utils/vermeer_datetime.py b/vermeer-python-client/src/pyvermeer/utils/vermeer_datetime.py
new file mode 100644
index 000000000..323ea4e2c
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/utils/vermeer_datetime.py
@@ -0,0 +1,21 @@
+# !/usr/bin/env python3
+"""
+file: vermeer_datetime.py
+author: wenyuxuan@baidu.com
+"""
+
+import datetime
+
+from dateutil import parser
+
+
+def parse_vermeer_time(vm_dt: str) -> datetime:
+ """Parse a vermeer time string into a Python datetime object."""
+ if vm_dt is None or len(vm_dt) == 0:
+ return None
+ dt = parser.parse(vm_dt)
+ return dt
+
+
+if __name__ == '__main__':
+ print(parse_vermeer_time('2025-02-17T15:45:05.396311145+08:00').strftime("%Y%m%d"))
diff --git a/vermeer-python-client/src/pyvermeer/utils/vermeer_requests.py b/vermeer-python-client/src/pyvermeer/utils/vermeer_requests.py
new file mode 100644
index 000000000..1099d5db2
--- /dev/null
+++ b/vermeer-python-client/src/pyvermeer/utils/vermeer_requests.py
@@ -0,0 +1,104 @@
+# !/usr/bin/env python3
+"""
+file: vermeer_requests.py
+author: wenyuxuan@baidu.com
+"""
+
+import json
+from typing import Optional
+from urllib.parse import urljoin
+
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util.retry import Retry
+
+from pyvermeer.utils.exception import JsonDecodeError, ConnectError, TimeOutError, UnknownError
+from pyvermeer.utils.log import log
+from pyvermeer.utils.vermeer_config import VermeerConfig
+
+
+class VermeerSession:
+ """vermeer session"""
+
+ def __init__(
+ self,
+ cfg: VermeerConfig,
+ retries: int = 3,
+ backoff_factor: int = 0.1,
+ status_forcelist=(500, 502, 504),
+ session: Optional[requests.Session] = None,
+ ):
+ """
+ Initialize the Session.
+ """
+ self._cfg = cfg
+ self._retries = retries
+ self._backoff_factor = backoff_factor
+ self._status_forcelist = status_forcelist
+ if self._cfg.token is not None:
+ self._auth = self._cfg.token
+ else:
+ raise ValueError("Vermeer Token must be provided.")
+ self._headers = {"Content-Type": "application/json", "Authorization": self._auth}
+ self._timeout = cfg.timeout
+ self._session = session if session else requests.Session()
+ self.__configure_session()
+
+ def __configure_session(self):
+ """
+ Configure the retry strategy and connection adapter for the session.
+ """
+ retry_strategy = Retry(
+ total=self._retries,
+ read=self._retries,
+ connect=self._retries,
+ backoff_factor=self._backoff_factor,
+ status_forcelist=self._status_forcelist,
+ )
+ adapter = HTTPAdapter(max_retries=retry_strategy)
+ self._session.mount("http://", adapter)
+ self._session.mount("https://", adapter)
+ self._session.keep_alive = False
+ log.debug(
+ "Session configured with retries=%s and backoff_factor=%s",
+ self._retries,
+ self._backoff_factor,
+ )
+
+ def resolve(self, path: str):
+ """
+ Resolve the path to a full URL.
+ """
+ url = f"http://{self._cfg.ip}:{self._cfg.port}/"
+ return urljoin(url, path).strip("/")
+
+ def close(self):
+ """
+ closes the session.
+ """
+ self._session.close()
+
+ def request(
+ self,
+ method: str,
+ path: str,
+ params: dict = None
+ ) -> dict:
+ """request"""
+ try:
+ log.debug(f"Request made to {path} with params {json.dumps(params)}")
+ response = self._session.request(method,
+ self.resolve(path),
+ headers=self._headers,
+ data=json.dumps(params),
+ timeout=self._timeout)
+ log.debug(f"Response code:{response.status_code}, received: {response.text}")
+ return response.json()
+ except requests.ConnectionError as e:
+ raise ConnectError(e) from e
+ except requests.Timeout as e:
+ raise TimeOutError(e) from e
+ except json.JSONDecodeError as e:
+ raise JsonDecodeError(e) from e
+ except Exception as e:
+ raise UnknownError(e) from e