diff --git a/examples/customize/build_graph/components/writers/parquet_writer.py b/examples/customize/build_graph/components/writers/parquet_writer.py new file mode 100644 index 000000000..ec6a921d2 --- /dev/null +++ b/examples/customize/build_graph/components/writers/parquet_writer.py @@ -0,0 +1,92 @@ +import asyncio + +from neo4j_graphrag.experimental.components.kg_writer import ( + KGWriterModel, + ParquetWriter, +) +from neo4j_graphrag.experimental.components.schema import GraphSchema +from neo4j_graphrag.experimental.components.types import ( + Neo4jGraph, + Neo4jNode, + Neo4jRelationship, +) + +OUTPUT_FOLDER = "output" + + +async def main(graph: Neo4jGraph) -> KGWriterModel: + writer = ParquetWriter(output_folder=OUTPUT_FOLDER) + result = await writer.run(graph=graph) + return result + + +if __name__ == "__main__": + graph = Neo4jGraph( + nodes=[ + Neo4jNode( + id="p-0", + label="Person", + properties={ + "name": "Alice", + "eyeColor": "blue", + }, + ), + Neo4jNode( + id="p-1", + label="Person", + properties={ + "name": "Robert", + "eyeColor": "brown", + "nickName": "Bob", + }, + ), + Neo4jNode( + id="l-0", + label="Location", + properties={ + "name": "Wonderland", + }, + ), + ], + relationships=[ + Neo4jRelationship( + type="KNOWS", + start_node_id="p-0", + end_node_id="p-1", + properties={"reason": "Cryptography"}, + ) + ], + ) + + schema = GraphSchema.model_validate( + { + "node_types": [ + "Location", + { + "label": "Person", + "properties": [ + { + "name": "name", + "type": "STRING", + "required": True, + } + ], + "additional_properties": True, + }, + ], + "relationship_types": [ + "KNOWS", + ], + } + ) + + res = asyncio.run(main(graph=graph)) + print(res) + # import pandas as pd + # + # df = pd.read_parquet(f'{OUTPUT_FOLDER}/nodes/Person.parquet') + # print(df.head(10)) + # df = pd.read_parquet(f'{OUTPUT_FOLDER}/nodes/Location.parquet') + # print(df.head(10)) + # df = pd.read_parquet(f'{OUTPUT_FOLDER}/relationships/KNOWS.parquet') + # print(df.head(10)) diff --git a/poetry.lock b/poetry.lock index e48e77846..8cb029782 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -4864,6 +4864,62 @@ files = [ [package.extras] tests = ["pytest"] +[[package]] +name = "pyarrow" +version = "21.0.0" +description = "Python library for Apache Arrow" +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "pyarrow-21.0.0-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:e563271e2c5ff4d4a4cbeb2c83d5cf0d4938b891518e676025f7268c6fe5fe26"}, + {file = "pyarrow-21.0.0-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:fee33b0ca46f4c85443d6c450357101e47d53e6c3f008d658c27a2d020d44c79"}, + {file = "pyarrow-21.0.0-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:7be45519b830f7c24b21d630a31d48bcebfd5d4d7f9d3bdb49da9cdf6d764edb"}, + {file = "pyarrow-21.0.0-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:26bfd95f6bff443ceae63c65dc7e048670b7e98bc892210acba7e4995d3d4b51"}, + {file = "pyarrow-21.0.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:bd04ec08f7f8bd113c55868bd3fc442a9db67c27af098c5f814a3091e71cc61a"}, + {file = "pyarrow-21.0.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:9b0b14b49ac10654332a805aedfc0147fb3469cbf8ea951b3d040dab12372594"}, + {file = "pyarrow-21.0.0-cp310-cp310-win_amd64.whl", hash = "sha256:9d9f8bcb4c3be7738add259738abdeddc363de1b80e3310e04067aa1ca596634"}, + {file = "pyarrow-21.0.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:c077f48aab61738c237802836fc3844f85409a46015635198761b0d6a688f87b"}, + {file = "pyarrow-21.0.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:689f448066781856237eca8d1975b98cace19b8dd2ab6145bf49475478bcaa10"}, + {file = "pyarrow-21.0.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:479ee41399fcddc46159a551705b89c05f11e8b8cb8e968f7fec64f62d91985e"}, + {file = "pyarrow-21.0.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:40ebfcb54a4f11bcde86bc586cbd0272bac0d516cfa539c799c2453768477569"}, + {file = "pyarrow-21.0.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:8d58d8497814274d3d20214fbb24abcad2f7e351474357d552a8d53bce70c70e"}, + {file = "pyarrow-21.0.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:585e7224f21124dd57836b1530ac8f2df2afc43c861d7bf3d58a4870c42ae36c"}, + {file = "pyarrow-21.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:555ca6935b2cbca2c0e932bedd853e9bc523098c39636de9ad4693b5b1df86d6"}, + {file = "pyarrow-21.0.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:3a302f0e0963db37e0a24a70c56cf91a4faa0bca51c23812279ca2e23481fccd"}, + {file = "pyarrow-21.0.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:b6b27cf01e243871390474a211a7922bfbe3bda21e39bc9160daf0da3fe48876"}, + {file = "pyarrow-21.0.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:e72a8ec6b868e258a2cd2672d91f2860ad532d590ce94cdf7d5e7ec674ccf03d"}, + {file = "pyarrow-21.0.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:b7ae0bbdc8c6674259b25bef5d2a1d6af5d39d7200c819cf99e07f7dfef1c51e"}, + {file = "pyarrow-21.0.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:58c30a1729f82d201627c173d91bd431db88ea74dcaa3885855bc6203e433b82"}, + {file = "pyarrow-21.0.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:072116f65604b822a7f22945a7a6e581cfa28e3454fdcc6939d4ff6090126623"}, + {file = "pyarrow-21.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:cf56ec8b0a5c8c9d7021d6fd754e688104f9ebebf1bf4449613c9531f5346a18"}, + {file = "pyarrow-21.0.0-cp313-cp313-macosx_12_0_arm64.whl", hash = "sha256:e99310a4ebd4479bcd1964dff9e14af33746300cb014aa4a3781738ac63baf4a"}, + {file = "pyarrow-21.0.0-cp313-cp313-macosx_12_0_x86_64.whl", hash = "sha256:d2fe8e7f3ce329a71b7ddd7498b3cfac0eeb200c2789bd840234f0dc271a8efe"}, + {file = "pyarrow-21.0.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:f522e5709379d72fb3da7785aa489ff0bb87448a9dc5a75f45763a795a089ebd"}, + {file = "pyarrow-21.0.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:69cbbdf0631396e9925e048cfa5bce4e8c3d3b41562bbd70c685a8eb53a91e61"}, + {file = "pyarrow-21.0.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:731c7022587006b755d0bdb27626a1a3bb004bb56b11fb30d98b6c1b4718579d"}, + {file = "pyarrow-21.0.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:dc56bc708f2d8ac71bd1dcb927e458c93cec10b98eb4120206a4091db7b67b99"}, + {file = "pyarrow-21.0.0-cp313-cp313-win_amd64.whl", hash = "sha256:186aa00bca62139f75b7de8420f745f2af12941595bbbfa7ed3870ff63e25636"}, + {file = "pyarrow-21.0.0-cp313-cp313t-macosx_12_0_arm64.whl", hash = "sha256:a7a102574faa3f421141a64c10216e078df467ab9576684d5cd696952546e2da"}, + {file = "pyarrow-21.0.0-cp313-cp313t-macosx_12_0_x86_64.whl", hash = "sha256:1e005378c4a2c6db3ada3ad4c217b381f6c886f0a80d6a316fe586b90f77efd7"}, + {file = "pyarrow-21.0.0-cp313-cp313t-manylinux_2_28_aarch64.whl", hash = "sha256:65f8e85f79031449ec8706b74504a316805217b35b6099155dd7e227eef0d4b6"}, + {file = "pyarrow-21.0.0-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:3a81486adc665c7eb1a2bde0224cfca6ceaba344a82a971ef059678417880eb8"}, + {file = "pyarrow-21.0.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:fc0d2f88b81dcf3ccf9a6ae17f89183762c8a94a5bdcfa09e05cfe413acf0503"}, + {file = "pyarrow-21.0.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:6299449adf89df38537837487a4f8d3bd91ec94354fdd2a7d30bc11c48ef6e79"}, + {file = "pyarrow-21.0.0-cp313-cp313t-win_amd64.whl", hash = "sha256:222c39e2c70113543982c6b34f3077962b44fca38c0bd9e68bb6781534425c10"}, + {file = "pyarrow-21.0.0-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:a7f6524e3747e35f80744537c78e7302cd41deee8baa668d56d55f77d9c464b3"}, + {file = "pyarrow-21.0.0-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:203003786c9fd253ebcafa44b03c06983c9c8d06c3145e37f1b76a1f317aeae1"}, + {file = "pyarrow-21.0.0-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:3b4d97e297741796fead24867a8dabf86c87e4584ccc03167e4a811f50fdf74d"}, + {file = "pyarrow-21.0.0-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:898afce396b80fdda05e3086b4256f8677c671f7b1d27a6976fa011d3fd0a86e"}, + {file = "pyarrow-21.0.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:067c66ca29aaedae08218569a114e413b26e742171f526e828e1064fcdec13f4"}, + {file = "pyarrow-21.0.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:0c4e75d13eb76295a49e0ea056eb18dbd87d81450bfeb8afa19a7e5a75ae2ad7"}, + {file = "pyarrow-21.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:cdc4c17afda4dab2a9c0b79148a43a7f4e1094916b3e18d8975bfd6d6d52241f"}, + {file = "pyarrow-21.0.0.tar.gz", hash = "sha256:5051f2dccf0e283ff56335760cbc8622cf52264d67e359d5569541ac11b6d5bc"}, +] + +[package.extras] +test = ["cffi", "hypothesis", "pandas", "pytest", "pytz"] + [[package]] name = "pyasn1" version = "0.6.1" @@ -7852,4 +7908,4 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "2.1" python-versions = ">=3.9.0,<3.14" -content-hash = "a49fdff6b12387ac342d57f0f569309587ceecccd659847134621351af288d2e" +content-hash = "d903055324b40138f393acb8edf9286d63d0318720f7d4fe681f6f162cc37eb0" diff --git a/pyproject.toml b/pyproject.toml index aababc8cd..86390351f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,7 @@ scipy = [ { version = "^1.15.0", python = ">=3.13,<3.14" } ] tenacity = "^9.1.2" +pyarrow = "^21.0.0" [tool.poetry.group.dev.dependencies] urllib3 = "<2" diff --git a/src/neo4j_graphrag/experimental/components/kg_writer.py b/src/neo4j_graphrag/experimental/components/kg_writer.py index 1f46e5ab3..2d39f5070 100644 --- a/src/neo4j_graphrag/experimental/components/kg_writer.py +++ b/src/neo4j_graphrag/experimental/components/kg_writer.py @@ -13,9 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations +import pandas as pd import logging from abc import abstractmethod +from collections import defaultdict +from pathlib import Path from typing import Any, Generator, Literal, Optional import neo4j @@ -83,6 +86,100 @@ async def run( pass +class ParquetWriter(KGWriter): + """Writes a knowledge graph to Parquet files. + + Args: + node_file_path (str): The file path to write the nodes Parquet file. + relationship_file_path (str): The file path to write the relationships Parquet file. + """ + + def __init__( + self, + output_folder: str, + ): + self._output_folder = Path(output_folder) + + @staticmethod + def _nodes_to_rows( + nodes: list[Neo4jNode], + label_to_rows: dict[str, list[dict[str, Any]]], + lexical_graph_config: LexicalGraphConfig, + ) -> None: + for node in nodes: + row: dict[str, Any] = dict() + + labels = [node.label] + if node.label not in lexical_graph_config.lexical_graph_node_labels: + labels.append("__Entity__") + + row["id"] = node.id + row["labels"] = labels + row.update(node.properties) + if node.embedding_properties is not None: + row.update(node.embedding_properties) + + label_to_rows[node.label].append(row) + + @staticmethod + def _relationships_to_rows( + relationships: list[Neo4jRelationship], + type_to_rows: dict[str, list[dict[str, Any]]], + ) -> None: + for rel in relationships: + row: dict[str, Any] = dict() + + row["from"] = rel.start_node_id + row["to"] = rel.end_node_id + row["type"] = rel.type + row.update(rel.properties) + if rel.embedding_properties is not None: + row.update(rel.embedding_properties) + + type_to_rows[rel.type].append(row) + + @validate_call + async def run( + self, + graph: Neo4jGraph, + lexical_graph_config: LexicalGraphConfig = LexicalGraphConfig(), + ) -> KGWriterModel: + """Writes a knowledge graph to Parquet files. + + Args: + graph (Neo4jGraph): The knowledge graph to write to Parquet files. + lexical_graph_config (LexicalGraphConfig): Node labels and relationship types for the lexical graph. + """ + (self._output_folder / "nodes").mkdir(parents=True, exist_ok=True) + (self._output_folder / "relationships").mkdir(parents=True, exist_ok=True) + + label_to_rows: defaultdict[str, list[dict[str, Any]]] = defaultdict(list) + type_to_rows: defaultdict[str, list[dict[str, Any]]] = defaultdict(list) + + # TODO: Parallelize? + self._nodes_to_rows(graph.nodes, label_to_rows, lexical_graph_config) + self._relationships_to_rows(graph.relationships, type_to_rows) + + for label, node_df in label_to_rows.items(): + pd.DataFrame(node_df).to_parquet( + self._output_folder / "nodes" / f"{label}.parquet", index=False + ) + + for rtype, rel_df in type_to_rows.items(): + pd.DataFrame(rel_df).to_parquet( + self._output_folder / "relationships" / f"{rtype}.parquet", index=False + ) + + return KGWriterModel( + status="SUCCESS", + metadata={ + "node_count": len(graph.nodes), + "relationship_count": len(graph.relationships), + "output_folder": str(self._output_folder), + }, + ) + + class Neo4jWriter(KGWriter): """Writes a knowledge graph to a Neo4j database.