|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +import json |
| 4 | +import subprocess |
| 5 | +import time |
| 6 | +from typing import Dict |
| 7 | +import requests |
| 8 | +from pathlib import Path |
| 9 | +from utils import print_step, BUILD_PROFILE, run_command |
| 10 | + |
| 11 | +metactl_bin = f"./target/{BUILD_PROFILE}/databend-metactl" |
| 12 | + |
| 13 | + |
| 14 | +def metactl_upsert(grpc_addr, key, value): |
| 15 | + """Upsert a key-value pair using the upsert subcommand.""" |
| 16 | + result = run_command([ |
| 17 | + metactl_bin, "upsert", |
| 18 | + "--grpc-api-address", grpc_addr, |
| 19 | + "--key", key, |
| 20 | + "--value", value |
| 21 | + ]) |
| 22 | + return result |
| 23 | + |
| 24 | + |
| 25 | +def metactl_trigger_snapshot(admin_addr): |
| 26 | + """Trigger snapshot creation using the trigger-snapshot subcommand.""" |
| 27 | + result = run_command([ |
| 28 | + metactl_bin, "trigger-snapshot", |
| 29 | + "--admin-api-address", admin_addr |
| 30 | + ]) |
| 31 | + return result |
| 32 | + |
| 33 | + |
| 34 | +def verify_kv(grpc_addr, key, expected_value=None): |
| 35 | + """Verify a key-value pair using the get subcommand.""" |
| 36 | + time.sleep(0.5) # Brief sleep to ensure data is persisted |
| 37 | + |
| 38 | + result = run_command([ |
| 39 | + metactl_bin, "get", |
| 40 | + "--grpc-api-address", grpc_addr, |
| 41 | + "--key", key |
| 42 | + ], check=False) |
| 43 | + |
| 44 | + print(f"Get result for key '{key}': {result}") |
| 45 | + |
| 46 | + if not result.strip(): |
| 47 | + assert False, f"Key '{key}' not found or empty result" |
| 48 | + |
| 49 | + try: |
| 50 | + data = json.loads(result.strip()) |
| 51 | + except json.JSONDecodeError as e: |
| 52 | + assert False, f"Failed to parse JSON result: {e}, result: {result}" |
| 53 | + |
| 54 | + print(f"Parsed data: {data}") |
| 55 | + |
| 56 | + if data is None: |
| 57 | + assert expected_value is None, f"Expected None but got {expected_value}" |
| 58 | + else: |
| 59 | + # Extract the actual value from the stored data |
| 60 | + actual_value = bytes(data["data"]).decode('utf-8') |
| 61 | + print(f"Actual value: '{actual_value}', Expected: '{expected_value}'") |
| 62 | + |
| 63 | + if expected_value is not None: |
| 64 | + assert actual_value == expected_value, f"Expected '{expected_value}', got '{actual_value}'" |
| 65 | + |
| 66 | + |
| 67 | +def metactl_export_from_grpc(addr: str) -> str: |
| 68 | + """Export meta data from grpc-address to stdout""" |
| 69 | + print_step(f"Start: Export meta data from {addr} to stdout") |
| 70 | + |
| 71 | + cmd = [metactl_bin, "export", "--grpc-api-address", addr] |
| 72 | + |
| 73 | + result = run_command(cmd, check=True) |
| 74 | + |
| 75 | + print_step(f"Done: Exported meta data to stdout") |
| 76 | + return result |
| 77 | + |
| 78 | +def metactl_export(meta_dir: str, output_path: str) -> str: |
| 79 | + """Export meta data from raft directory to database file or stdout""" |
| 80 | + print_step(f"Start: Export meta data from {meta_dir} to {output_path}") |
| 81 | + |
| 82 | + cmd = [metactl_bin, "export", "--raft-dir", meta_dir] |
| 83 | + |
| 84 | + if output_path: |
| 85 | + cmd.append("--db") |
| 86 | + cmd.append(output_path) |
| 87 | + |
| 88 | + result = run_command(cmd, check=False) |
| 89 | + |
| 90 | + print_step(f"Done: Exported meta data to {output_path}") |
| 91 | + return result |
| 92 | + |
| 93 | + |
| 94 | +def metactl_import( |
| 95 | + meta_dir: str, id: int, db_path: str, initial_cluster: Dict[int, str] |
| 96 | +): |
| 97 | + """Import meta data from database file to raft directory""" |
| 98 | + cmd = [ |
| 99 | + metactl_bin, |
| 100 | + "import", |
| 101 | + "--raft-dir", |
| 102 | + meta_dir, |
| 103 | + "--id", |
| 104 | + str(id), |
| 105 | + "--db", |
| 106 | + db_path, |
| 107 | + ] |
| 108 | + |
| 109 | + for id, addr in initial_cluster.items(): |
| 110 | + cmd.append("--initial-cluster") |
| 111 | + cmd.append(f"{id}={addr}") |
| 112 | + |
| 113 | + result = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| 114 | + result.wait() |
| 115 | + print(result.stdout.read().decode()) |
| 116 | + print(result.stderr.read().decode()) |
| 117 | + |
| 118 | + |
| 119 | +def cluster_status(msg: str) -> str: |
| 120 | + """Get cluster status from meta service endpoint""" |
| 121 | + print_step(f"Check /v1/cluster/status {msg} start") |
| 122 | + |
| 123 | + response = requests.get("http://127.0.0.1:28101/v1/cluster/status") |
| 124 | + status = response.json() |
| 125 | + |
| 126 | + print_step(f"Check /v1/cluster/status {msg} end") |
| 127 | + |
| 128 | + return status |
0 commit comments