Skip to content

Commit 4458e5e

Browse files
wartraxx51fatih-acar
authored andcommitted
add cluster neo4j in infrahub_testcontainers
1 parent c83718e commit 4458e5e

File tree

5 files changed

+580
-67
lines changed

5 files changed

+580
-67
lines changed

python_testcontainers/infrahub_testcontainers/container.py

Lines changed: 239 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,12 @@ class ContainerService:
6767
class InfrahubDockerCompose(DockerCompose):
6868
project_name: str | None = None
6969
env_vars: dict[str, str] = field(default_factory=dict)
70+
deployment_type: str | None = None
7071

7172
@classmethod
72-
def init(cls, directory: Path | None = None, version: str | None = None) -> Self:
73+
def init(
74+
cls, directory: Path | None = None, version: str | None = None, deployment_type: str | None = None
75+
) -> Self:
7376
if not directory:
7477
directory = Path.cwd()
7578

@@ -80,7 +83,7 @@ def init(cls, directory: Path | None = None, version: str | None = None) -> Self
8083
if version == "local" and infrahub_image_version:
8184
version = infrahub_image_version
8285

83-
compose = cls(project_name=cls.generate_project_name(), context=directory)
86+
compose = cls(project_name=cls.generate_project_name(), context=directory, deployment_type=deployment_type)
8487
compose.create_docker_file(directory=directory)
8588
compose.create_env_file(directory=directory, version=version)
8689

@@ -112,7 +115,10 @@ def generate_project_name(cls) -> str:
112115

113116
def create_docker_file(self, directory: Path) -> Path:
114117
current_directory = Path(__file__).resolve().parent
115-
compose_file = current_directory / "docker-compose.test.yml"
118+
compose_file_name = (
119+
"docker-compose-cluster.test.yml" if self.deployment_type == "cluster" else "docker-compose.test.yml"
120+
)
121+
compose_file = current_directory / compose_file_name
116122

117123
test_compose_file = directory / "docker-compose.yml"
118124
test_compose_file.write_bytes(compose_file.read_bytes())
@@ -161,7 +167,7 @@ def restart(self) -> None:
161167
cmd.extend(self.services)
162168
self._run_command(cmd=cmd)
163169

164-
def start_container(self, service_name: str) -> None:
170+
def start_container(self, service_name: str | list[str]) -> None:
165171
"""
166172
Starts a specific service of the docker compose environment.
167173
@@ -171,7 +177,11 @@ def start_container(self, service_name: str) -> None:
171177

172178
# pull means running a separate command before starting
173179
if self.pull:
174-
pull_cmd = [*base_cmd, "pull", service_name]
180+
pull_cmd = [*base_cmd, "pull"]
181+
if isinstance(service_name, list):
182+
pull_cmd.extend(service_name)
183+
else:
184+
pull_cmd.append(service_name)
175185
self._run_command(cmd=pull_cmd)
176186

177187
up_cmd = [*base_cmd, "up"]
@@ -186,7 +196,10 @@ def start_container(self, service_name: str) -> None:
186196
# we run in detached mode instead of blocking
187197
up_cmd.append("--detach")
188198

189-
up_cmd.append(service_name)
199+
if isinstance(service_name, list):
200+
up_cmd.extend(service_name)
201+
else:
202+
up_cmd.append(service_name)
190203
self._run_command(cmd=up_cmd)
191204

192205
# TODO would be good to the support for project_name upstream
@@ -234,7 +247,7 @@ def database_create_backup(self, backup_name: str = "neo4j_database.backup", des
234247
dest_dir / backup_name,
235248
)
236249

237-
def database_restore_backup(self, backup_file: Path) -> None:
250+
def database_restore_backup(self, backup_file: Path) -> None: # noqa: PLR0915
238251
assert self.use_neo4j_enterprise
239252

240253
shutil.copy(
@@ -243,52 +256,35 @@ def database_restore_backup(self, backup_file: Path) -> None:
243256
)
244257
service_name = "database"
245258

246-
# Ensure the database container is running otherwise start it
247-
try:
248-
self.get_container(service_name=service_name)
249-
except ContainerIsNotRunning:
250-
self.start_container(service_name=service_name)
251-
252-
self.exec_in_container(
253-
command=["cypher-shell", "-u", "neo4j", "-p", "admin", "STOP DATABASE neo4j;"],
254-
service_name=service_name,
255-
)
259+
if self.deployment_type != "cluster": # noqa: PLR1702
260+
try:
261+
self.get_container(service_name=service_name)
262+
except ContainerIsNotRunning:
263+
self.start_container(service_name=service_name)
256264

257-
self.exec_in_container(
258-
command=[
259-
"neo4j-admin",
260-
"database",
261-
"restore",
262-
"--overwrite-destination",
263-
"--from-path",
264-
str(self.internal_backup_dir / backup_file.name),
265-
],
266-
service_name=service_name,
267-
)
265+
self.exec_in_container(
266+
command=["cypher-shell", "-u", "neo4j", "-p", "admin", "STOP DATABASE neo4j;"],
267+
service_name=service_name,
268+
)
268269

269-
self.exec_in_container(
270-
command=["chown", "-R", "neo4j:neo4j", "/data"],
271-
service_name=service_name,
272-
)
270+
self.exec_in_container(
271+
command=[
272+
"neo4j-admin",
273+
"database",
274+
"restore",
275+
"--overwrite-destination",
276+
"--from-path",
277+
str(self.internal_backup_dir / backup_file.name),
278+
],
279+
service_name=service_name,
280+
)
273281

274-
(restore_output, _, _) = self.exec_in_container(
275-
command=[
276-
"cypher-shell",
277-
"--format",
278-
"plain",
279-
"-d",
280-
"system",
281-
"-u",
282-
"neo4j",
283-
"-p",
284-
"admin",
285-
"START DATABASE neo4j;",
286-
],
287-
service_name=service_name,
288-
)
282+
self.exec_in_container(
283+
command=["chown", "-R", "neo4j:neo4j", "/data"],
284+
service_name=service_name,
285+
)
289286

290-
for _ in range(3):
291-
(stdout, _, _) = self.exec_in_container(
287+
(restore_output, _, _) = self.exec_in_container(
292288
command=[
293289
"cypher-shell",
294290
"--format",
@@ -299,26 +295,205 @@ def database_restore_backup(self, backup_file: Path) -> None:
299295
"neo4j",
300296
"-p",
301297
"admin",
302-
"SHOW DATABASES WHERE name = 'neo4j' AND currentStatus = 'online';",
298+
"START DATABASE neo4j;",
303299
],
304300
service_name=service_name,
305301
)
306-
if stdout:
307-
break
308-
time.sleep(5)
302+
303+
for _ in range(3):
304+
(stdout, _, _) = self.exec_in_container(
305+
command=[
306+
"cypher-shell",
307+
"--format",
308+
"plain",
309+
"-d",
310+
"system",
311+
"-u",
312+
"neo4j",
313+
"-p",
314+
"admin",
315+
"SHOW DATABASES WHERE name = 'neo4j' AND currentStatus = 'online';",
316+
],
317+
service_name=service_name,
318+
)
319+
if stdout:
320+
break
321+
time.sleep(5)
322+
else:
323+
(debug_logs, _, _) = self.exec_in_container(
324+
command=["cat", "logs/debug.log"],
325+
service_name=service_name,
326+
)
327+
raise Exception(f"Failed to restore database:\n{restore_output}\nDebug logs:\n{debug_logs}")
328+
329+
old_services = self.services
330+
self.services = ["infrahub-server", "task-worker"]
331+
self.stop(down=False)
332+
try:
333+
self.start()
334+
except Exception as exc:
335+
stdout, stderr = self.get_logs()
336+
raise Exception(f"Failed to start docker compose:\nStdout:\n{stdout}\nStderr:\n{stderr}") from exc
337+
self.services = old_services
309338
else:
310-
(debug_logs, _, _) = self.exec_in_container(
311-
command=["cat", "logs/debug.log"],
339+
print("Cluster mode detected")
340+
try:
341+
self.get_container(service_name=service_name)
342+
self.get_container(service_name="database-core2")
343+
self.get_container(service_name="database-core3")
344+
except ContainerIsNotRunning:
345+
self.start_container("database", "database-core2", "database-core3")
346+
347+
# Waiting for cluster to stabilize...
348+
time.sleep(10)
349+
350+
self.exec_in_container(
351+
command=["cypher-shell", "-u", "neo4j", "-p", "admin", "DROP DATABASE neo4j;"],
352+
service_name=service_name,
353+
)
354+
355+
self.exec_in_container(
356+
command=["rm", "-rf", "/data/databases/neo4j"],
357+
service_name=service_name,
358+
)
359+
self.exec_in_container(
360+
command=["rm", "-rf", "/data/transactions/neo4j"],
361+
service_name=service_name,
362+
)
363+
364+
self.exec_in_container(
365+
command=[
366+
"neo4j-admin",
367+
"database",
368+
"restore",
369+
"--from-path",
370+
str(self.internal_backup_dir / backup_file.name),
371+
"neo4j",
372+
],
312373
service_name=service_name,
313374
)
314-
raise Exception(f"Failed to restore database:\n{restore_output}\nDebug logs:\n{debug_logs}")
315375

316-
old_services = self.services
317-
self.services = ["infrahub-server", "task-worker"]
318-
self.stop(down=False)
319-
try:
376+
cmd = self.compose_command_property[:]
377+
cmd += ["restart", "database"]
378+
self._run_command(cmd=cmd)
379+
380+
main_node = service_name
381+
cluster_nodes = ["database", "database-core2", "database-core3"]
382+
383+
for attempt in range(3):
384+
try:
385+
(stdout, _, _) = self.exec_in_container(
386+
command=[
387+
"cypher-shell",
388+
"--format",
389+
"plain",
390+
"-d",
391+
"system",
392+
"-u",
393+
"neo4j",
394+
"-p",
395+
"admin",
396+
"SHOW DATABASES YIELD name, address, currentStatus WHERE name = 'system' RETURN address, currentStatus",
397+
],
398+
service_name=main_node,
399+
)
400+
except Exception:
401+
time.sleep(10)
402+
continue
403+
404+
raw_output = stdout
405+
nodes_status = dict.fromkeys(cluster_nodes, False)
406+
online_count = 0
407+
total_entries = 0
408+
409+
try:
410+
for line_raw in stdout.splitlines():
411+
line = line_raw.strip()
412+
if not line or line.startswith("address"):
413+
continue
414+
415+
total_entries += 1
416+
if "online" in line:
417+
online_count += 1
418+
for node in cluster_nodes:
419+
node_pattern = f'"{node}:'
420+
if node_pattern in line:
421+
nodes_status[node] = True
422+
break
423+
if all(nodes_status.values()) and online_count == len(cluster_nodes):
424+
break
425+
except Exception as e:
426+
print(f"Error parsing database status on attempt {attempt + 1}: {e}")
427+
428+
print(f"Waiting for all nodes to be online. Current status: {nodes_status}")
429+
time.sleep(5)
430+
else:
431+
debug_logs = {}
432+
for node in cluster_nodes:
433+
try:
434+
(logs, _, _) = self.exec_in_container(
435+
command=["cat", "logs/debug.log"],
436+
service_name=node,
437+
)
438+
debug_logs[node] = logs
439+
except Exception as e:
440+
debug_logs[node] = f"Could not retrieve logs: {str(e)}"
441+
442+
debug_info = f"Raw output from SHOW DATABASES command:\n{raw_output}\n\n"
443+
debug_info += f"Final node status: {nodes_status}\n\n"
444+
445+
status_str = ", ".join(
446+
[f"{node}: {'online' if status else 'offline'}" for node, status in nodes_status.items()]
447+
)
448+
logs_str = debug_info + "\n\n".join(
449+
[f"--- {node} logs ---\n{logs}" for node, logs in debug_logs.items()]
450+
)
451+
452+
raise Exception(
453+
f"Failed to restore database cluster. Node status: {status_str}\nDebug logs:\n{logs_str}"
454+
)
455+
456+
server_id = None
457+
try:
458+
stdout, _, _ = self.exec_in_container(
459+
command=[
460+
"cypher-shell",
461+
"--format",
462+
"plain",
463+
"-d",
464+
"system",
465+
"-u",
466+
"neo4j",
467+
"-p",
468+
"admin",
469+
'SHOW SERVERS YIELD name, address WHERE address = "database:7687" RETURN name;',
470+
],
471+
service_name=service_name,
472+
)
473+
474+
lines = stdout.splitlines()
475+
for line_raw in lines:
476+
line = line_raw.strip()
477+
if not line or line == "name" or line.startswith("+"):
478+
continue
479+
server_id = line.strip('"')
480+
break
481+
except Exception as e:
482+
print(f"Error retrieving server ID with direct query: {e}")
483+
484+
if server_id:
485+
self.exec_in_container(
486+
command=[
487+
"cypher-shell",
488+
"-d",
489+
"system",
490+
"-u",
491+
"neo4j",
492+
"-p",
493+
"admin",
494+
f"CREATE DATABASE neo4j TOPOLOGY 3 PRIMARIES OPTIONS {{ existingData: 'use', existingDataSeedInstance: '{server_id}' }};",
495+
],
496+
service_name=service_name,
497+
)
320498
self.start()
321-
except Exception as exc:
322-
stdout, stderr = self.get_logs()
323-
raise Exception(f"Failed to start docker compose:\nStdout:\n{stdout}\nStderr:\n{stderr}") from exc
324-
self.services = old_services
499+
print("Database restored successfully")

0 commit comments

Comments
 (0)