forked from jkef80/Filament-Management
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathws_dump.py
More file actions
192 lines (149 loc) · 5.68 KB
/
ws_dump.py
File metadata and controls
192 lines (149 loc) · 5.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#!/usr/bin/env python3
"""
ws_dump.py — Creality K2 Plus WebSocket raw message dumper
Connects to ws://<host>:9999, sends only heartbeats to keep the
connection alive, and dumps EVERY message the printer pushes to
stdout and an optional JSONL log file.
The printer streams data continuously — temperatures, status, CFS
slot events — without needing to be polled for most of it. This
script captures everything as-is so you can analyse the full protocol.
Usage:
python3 ws_dump.py <printer-ip> [output-file]
Example:
python3 ws_dump.py 192.168.1.144
python3 ws_dump.py 192.168.1.144 capture.jsonl
Requirements:
pip install websockets (same venv as the main app)
What to look for:
- "rfid" fields inside materialBoxs → unique spool RFID ID
- "materialState" → fires on spool insert/remove/scan
- Any top-level key you haven't seen before (marked with ***)
- Insert a spool while running and watch what bursts come through
"""
import asyncio
import json
import sys
import time
from datetime import datetime
try:
import websockets
except ImportError:
print("ERROR: websockets not installed. Run: pip install websockets")
sys.exit(1)
HEARTBEAT_INTERVAL = 10.0 # seconds between heartbeats
RECV_TIMEOUT = 15.0 # seconds of silence before sending a keepalive
HEARTBEAT_REQ = json.dumps({"ModeCode": "heart_beat"})
seen_keys: set[str] = set()
def ts() -> str:
return datetime.now().strftime("%H:%M:%S.%f")[:-3]
def pretty(obj) -> str:
return json.dumps(obj, indent=2, ensure_ascii=False)
def annotate(raw: str) -> list[str]:
"""Return human-readable notes about interesting fields in a message."""
notes = []
try:
d = json.loads(raw)
except Exception:
return notes
# Track new top-level keys
global seen_keys
new = set(d.keys()) - seen_keys
if new:
seen_keys |= new
notes.append(f" *** NEW TOP-LEVEL KEYS: {sorted(new)}")
# RFID fields inside boxsInfo
boxes = (d.get("boxsInfo") or {}).get("materialBoxs") or []
for box in boxes:
for mat in (box.get("materials") or []):
rfid = mat.get("rfid", "")
state = mat.get("state", 0)
slot_letter = "ABCD"[mat["id"]] if isinstance(mat.get("id"), int) and 0 <= mat["id"] <= 3 else "?"
slot = f"{box.get('id', '?')}{slot_letter}"
if rfid:
notes.append(
f" >>> RFID slot {slot}: {rfid!r} state={state} "
f"({'RFID chip' if state == 2 else 'manual' if state == 1 else 'empty'})"
)
elif state > 0:
notes.append(f" --- slot {slot}: no rfid state={state} material={mat.get('type','?')}")
# materialState — fires on spool events
if "materialState" in d:
notes.append(f" >>> materialState: {d['materialState']}")
# deviceState / state — printer status
if "deviceState" in d:
notes.append(f" >>> deviceState: {d['deviceState']}")
if "state" in d:
notes.append(f" >>> state: {d['state']}")
return notes
async def dump(host: str, out_path: str | None) -> None:
url = f"ws://{host}:9999"
print(f"[{ts()}] Connecting to {url} ...")
out_file = None
if out_path:
out_file = open(out_path, "a", encoding="utf-8")
print(f"[{ts()}] Logging raw messages to {out_path}")
def log(raw: str, direction: str = "RECV") -> None:
if out_file:
out_file.write(json.dumps({
"t": time.time(),
"ts": ts(),
"dir": direction,
"raw": raw,
}) + "\n")
out_file.flush()
async with websockets.connect(
url,
ping_interval=None,
ping_timeout=None,
close_timeout=5,
max_size=2**22,
) as ws:
print(f"[{ts()}] Connected — listening passively (heartbeat only)\n")
print(" Insert / remove a spool and watch for events marked >>>")
print(" Newly seen message types are marked ***")
print(" Press Ctrl+C to stop\n")
last_heartbeat = time.time()
async def heartbeat_loop() -> None:
nonlocal last_heartbeat
while True:
await asyncio.sleep(HEARTBEAT_INTERVAL)
await ws.send(HEARTBEAT_REQ)
log(HEARTBEAT_REQ, "SEND")
last_heartbeat = time.time()
asyncio.create_task(heartbeat_loop())
# Request CFS slot data immediately (same as CFSync does)
await ws.send(json.dumps({"method": "get", "params": {"boxsInfo": 1}}))
log(json.dumps({"method": "get", "params": {"boxsInfo": 1}}), "SEND")
async for raw in ws:
log(raw)
if raw == "ok":
print(f"[{ts()}] heartbeat ack")
continue
try:
parsed = json.loads(raw)
except Exception:
print(f"[{ts()}] non-JSON: {raw[:300]}")
continue
top_keys = list(parsed.keys())
print(f"[{ts()}] keys={top_keys}")
for note in annotate(raw):
print(note)
# Full dump for boxsInfo (richest payload)
if "boxsInfo" in parsed:
print(pretty(parsed))
print()
def main() -> None:
if len(sys.argv) < 2:
print(f"Usage: python3 {sys.argv[0]} <printer-ip> [output.jsonl]")
sys.exit(1)
host = sys.argv[1]
out = sys.argv[2] if len(sys.argv) > 2 else None
try:
asyncio.run(dump(host, out))
except KeyboardInterrupt:
print("\nStopped.")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()