Skip to content

Commit d736763

Browse files
committed
add import_to_interactive scripts
1 parent 4ef6767 commit d736763

File tree

1 file changed

+193
-0
lines changed

1 file changed

+193
-0
lines changed
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import os
2+
import json
3+
import time
4+
import logging
5+
import sys
6+
7+
from gs_interactive.client.driver import Driver
8+
from gs_interactive.models import *
9+
10+
11+
def get_interactive_schema(path):
12+
graph_path = os.path.join(path, "_graph")
13+
schema_json = os.path.join(graph_path, "schema.json")
14+
with open(schema_json, "r") as f:
15+
return json.load(f)
16+
17+
18+
def build_import_config_from_schema(schema, graph_path):
19+
import_config = {
20+
"loading_config": {
21+
"data_source": {"scheme": "file"},
22+
"import_option": "init",
23+
"format": {"type": "csv"},
24+
},
25+
"vertex_mappings": [],
26+
"edge_mappings": [],
27+
}
28+
for vertex_type in schema["vertex_types"]:
29+
vertex_mapping = {
30+
"type_name": vertex_type["type_name"],
31+
"inputs": [f"@{graph_path}/{vertex_type['type_name']}.csv"],
32+
"column_mappings": [],
33+
}
34+
35+
# Map the vertex properties to CSV columns
36+
for index, prop in enumerate(vertex_type["properties"]):
37+
vertex_mapping["column_mappings"].append(
38+
{
39+
"column": {
40+
"index": index,
41+
"name": prop["property_name"],
42+
},
43+
"property": prop["property_name"],
44+
}
45+
)
46+
47+
import_config["vertex_mappings"].append(vertex_mapping)
48+
49+
# Process edge mappings
50+
for edge_type in schema["edge_types"]:
51+
edge_mapping = {
52+
"type_triplet": {
53+
"edge": edge_type["type_name"],
54+
"source_vertex": edge_type["vertex_type_pair_relations"][0][
55+
"source_vertex"
56+
],
57+
"destination_vertex": edge_type["vertex_type_pair_relations"][0][
58+
"destination_vertex"
59+
],
60+
},
61+
"inputs": [f"@{graph_path}/{edge_type['type_name']}.csv"],
62+
"source_vertex_mappings": [
63+
{
64+
"column": {
65+
"index": 0,
66+
"name": f"{edge_type['vertex_type_pair_relations'][0]['source_vertex']}.id",
67+
},
68+
"property": "id",
69+
}
70+
],
71+
"destination_vertex_mappings": [
72+
{
73+
"column": {
74+
"index": 1,
75+
"name": f"{edge_type['vertex_type_pair_relations'][0]['destination_vertex']}.id",
76+
},
77+
"property": "id",
78+
}
79+
],
80+
"column_mappings": [],
81+
}
82+
83+
# Map the edge properties to CSV columns
84+
for index, prop in enumerate(edge_type["properties"]):
85+
edge_mapping["column_mappings"].append(
86+
{
87+
"column": {
88+
"index": index + 2,
89+
"name": prop["property_name"],
90+
},
91+
"property": prop["property_name"],
92+
}
93+
)
94+
95+
import_config["edge_mappings"].append(edge_mapping)
96+
97+
return import_config
98+
99+
100+
def import_to_interactive(path, name):
101+
driver = Driver()
102+
graph_id = ""
103+
schema = get_interactive_schema(path)
104+
with driver.session() as sess:
105+
graph_def = {
106+
"name": name,
107+
"description": f"This is a graph from graphy you dataset: {name}",
108+
"schema": schema,
109+
}
110+
create_graph_request = CreateGraphRequest.from_dict(graph_def)
111+
resp = sess.list_graphs()
112+
if resp.is_error():
113+
raise Exception(f"Failed to list graphs: {resp.get_status_message()}")
114+
else:
115+
for graph_info in resp.get_value():
116+
if name == graph_info.name:
117+
logging.info(f"Graph `{name}` already exists in Interactive")
118+
return graph_info.id
119+
120+
resp = sess.create_graph(create_graph_request)
121+
if resp.is_error():
122+
raise Exception(f"Failed to create graph: {resp.get_status_message()}")
123+
else:
124+
graph_id = resp.get_value().graph_id
125+
126+
graph_path = os.path.join(path, "_graph")
127+
import_config = build_import_config_from_schema(schema, graph_path)
128+
129+
bulk_load_request = SchemaMapping.from_dict(import_config)
130+
resp = sess.bulk_loading(graph_id, bulk_load_request)
131+
132+
if resp.is_error():
133+
raise Exception(f"Failed to import graph: {resp.get_status_message()}")
134+
else:
135+
job_id = resp.get_value().job_id
136+
137+
while True:
138+
resp = sess.get_job(job_id)
139+
if resp.is_error():
140+
raise Exception(
141+
f"Failed to import graph: {resp.get_status_message()}"
142+
)
143+
status = resp.get_value().status
144+
if status == "SUCCESS":
145+
break
146+
elif status == "FAILED":
147+
raise Exception(f"Import job {job_id} failed")
148+
else:
149+
time.sleep(1)
150+
151+
return graph_id
152+
153+
154+
def start_interactive_service(graph_id):
155+
driver = Driver()
156+
with driver.session() as sess:
157+
resp = sess.start_service(
158+
start_service_request=StartServiceRequest(graph_id=graph_id)
159+
)
160+
if resp.is_error():
161+
raise Exception(
162+
f"Start service for {graph_id} failed: {resp.get_status_message()}"
163+
)
164+
165+
166+
def delete_interactive_service(graph_id):
167+
driver = Driver()
168+
with driver.session() as sess:
169+
resp = sess.stop_service()
170+
if resp.is_error():
171+
raise Exception(
172+
f"Stop service for {graph_id} failed: {resp.get_status_message()}"
173+
)
174+
175+
resp = sess.delete_graph(graph_id)
176+
if resp.is_error():
177+
raise Exception(
178+
f"Delete graph {graph_id} failed: {resp.get_status_message()}"
179+
)
180+
181+
182+
logging.basicConfig(
183+
level=logging.INFO,
184+
format="[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s",
185+
handlers=[logging.StreamHandler()],
186+
)
187+
188+
189+
if __name__ == "__main__":
190+
path = sys.argv[1]
191+
graph_name = sys.argv[2]
192+
graph_id = import_to_interactive(path, graph_name)
193+
start_interactive_service(graph_id)

0 commit comments

Comments
 (0)