-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsecondary_namenode.py
More file actions
154 lines (125 loc) · 5.33 KB
/
secondary_namenode.py
File metadata and controls
154 lines (125 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# secondary_namenode.py
import socket
import json
import time
import os
import threading
import config
CHECKPOINT_INTERVAL = getattr(config, "SNN_CHECKPOINT_INTERVAL_SEC", 30) # 5 min default
SNN_ID = getattr(config, "SNN_ID", "secondary_namenode_0")
SNN_DIR = getattr(config, "SNN_DIR", "snn_checkpoints")
os.makedirs(SNN_DIR, exist_ok=True)
def _recv_full_json(sock: socket.socket) -> dict:
"""Safely receive JSON messages from a socket until full JSON is detected."""
buffer = ""
while True:
chunk = sock.recv(4096).decode()
if not chunk:
break
buffer += chunk
if buffer.strip().endswith('}'):
break
if not buffer.strip():
return {}
try:
return json.loads(buffer)
except json.JSONDecodeError as e:
print(f"[SNN] ERROR: Could not parse JSON response: {e} | Raw: {buffer}")
return {}
def _send_json_and_get_response(payload: dict) -> dict:
"""Send JSON payload to Namenode and read complete JSON response."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((config.NAMENODE_HOST, config.NAMENODE_PORT_CLIENT))
s.sendall((json.dumps(payload) + "\n").encode())
return _recv_full_json(s)
except Exception as e:
print(f"[SNN] ERROR: Communication with Namenode failed: {e}")
return {}
def _apply_transaction(meta_dict: dict, transaction: dict):
op = transaction.get('op')
if op == 'UPLOAD':
filename = transaction.get('filename')
chunks = transaction.get('chunks')
chunk_map = transaction.get('chunk_map')
meta_dict.setdefault('files', {})
meta_dict.setdefault('chunks', {})
meta_dict['files'][filename] = {"chunks": chunks, "status": "pending"}
meta_dict['chunks'].update(chunk_map)
elif op == 'COMMIT':
filename = transaction.get('filename')
if filename in meta_dict.get('files', {}):
meta_dict['files'][filename]["status"] = "committed"
def _merge_fsimage_and_edits(fsimage_obj: dict, edits_lines: str) -> dict:
"""Merge existing fsimage with pending edit log lines."""
fsimage_obj.setdefault('files', {})
fsimage_obj.setdefault('chunks', {})
fsimage_obj.setdefault('datanodes', {})
for line in edits_lines.splitlines():
line = line.strip()
if not line:
continue
try:
txn = json.loads(line)
_apply_transaction(fsimage_obj, txn)
except Exception as e:
print(f"[SNN] Warning: failed to apply edit line: {e}")
return fsimage_obj
def perform_checkpoint_once() -> bool:
try:
print("[SNN] Requesting metadata from Namenode...")
fetch_resp = _send_json_and_get_response({"command": "FETCH_METADATA", "snn_id": SNN_ID})
if fetch_resp.get("status") != "SUCCESS":
print(f"[SNN] ERROR: Namenode refused FETCH_METADATA: {fetch_resp.get('message')}")
return False
fsimage_text = fetch_resp.get("fsimage", "")
edits_text = fetch_resp.get("edits", "")
# Save raw copies
stamp = int(time.time())
src_fsimage_path = os.path.join(SNN_DIR, f"fsimage_{stamp}.json")
src_edits_path = os.path.join(SNN_DIR, f"editlog_{stamp}.jsonl")
with open(src_fsimage_path, "w", encoding="utf-8") as f:
f.write(fsimage_text or "{}")
with open(src_edits_path, "w", encoding="utf-8") as f:
f.write(edits_text or "")
try:
fsimage_obj = json.loads(fsimage_text) if fsimage_text.strip() else {}
except Exception as e:
print(f"[SNN] ERROR: Invalid fsimage JSON from Namenode: {e}")
return False
print("[SNN] Merging fsimage + editlog locally...")
merged = _merge_fsimage_and_edits(fsimage_obj, edits_text or "")
ckpt_path = os.path.join(SNN_DIR, f"fsimage.ckpt_{stamp}.json")
with open(ckpt_path, "w", encoding="utf-8") as f:
json.dump(merged, f, indent=2)
print(f"[SNN] Checkpoint created locally at {ckpt_path}")
print("[SNN] Uploading checkpoint back to Namenode...")
update_payload = {
"command": "UPDATE_FSIMAGE",
"snn_id": SNN_ID,
"fsimage": json.dumps(merged)
}
update_resp = _send_json_and_get_response(update_payload)
if not update_resp:
print("[SNN] ERROR: Namenode did not respond to UPDATE_FSIMAGE.")
return False
if update_resp.get("status") != "SUCCESS":
print(f"[SNN] ERROR: Namenode refused UPDATE_FSIMAGE: {update_resp.get('message')}")
return False
print("[SNN] ✅ Checkpoint successfully applied on Namenode (edit log cleared).")
return True
except ConnectionRefusedError:
print("[SNN] ERROR: Could not connect to Namenode. Is it running?")
return False
except Exception as e:
print(f"[SNN] ERROR performing checkpoint: {e}")
return False
def run_scheduler():
print(f"[SNN] Secondary Namenode started. Interval = {CHECKPOINT_INTERVAL}s")
while True:
started = time.strftime("%Y-%m-%d %H:%M:%S")
print(f"[SNN] ---- Checkpoint cycle start @ {started} ----")
perform_checkpoint_once()
time.sleep(CHECKPOINT_INTERVAL)
if __name__ == "__main__":
run_scheduler()