|
| 1 | +import datetime |
| 2 | +import glob |
| 3 | +import json |
| 4 | +from collections import deque |
| 5 | +import tqdm |
| 6 | + |
| 7 | + |
| 8 | +def _serialize_json(data): |
| 9 | + # Serialize JSON with sorted keys and no whitespace |
| 10 | + return json.dumps(data, sort_keys=True, separators=(",", ":")).encode("utf-8") |
| 11 | + |
| 12 | + |
| 13 | +types = { |
| 14 | + "share", |
| 15 | + "chat", |
| 16 | + "flag", |
| 17 | + "bothbad_vote", |
| 18 | + "downvote", |
| 19 | + "leftvote", |
| 20 | + "rightvote", |
| 21 | + "upvote", |
| 22 | + "tievote", |
| 23 | +} |
| 24 | + |
| 25 | +chat_dict = {} |
| 26 | +cache_queue = deque() |
| 27 | + |
| 28 | + |
| 29 | +def process_record(r): |
| 30 | + ip = r.pop("ip", None) |
| 31 | + tstamp = r.pop("tstamp") |
| 32 | + mtype = r.pop("type") |
| 33 | + start = r.pop("start", None) |
| 34 | + finish = r.pop("finish", None) |
| 35 | + |
| 36 | + # gabagge collect to save memory |
| 37 | + while len(cache_queue) > 100000: |
| 38 | + outdated = cache_queue.popleft() |
| 39 | + poped_item = chat_dict.pop(outdated["key"], None) |
| 40 | + if poped_item is None: |
| 41 | + # TODO: this sometimes happens, need to investigate what happens. in theory the chat dict should be synced with the queue, unless there are duplicated items |
| 42 | + print("Error: Key to GC does not exist.") |
| 43 | + |
| 44 | + assert mtype in types |
| 45 | + if mtype == "chat": |
| 46 | + key = _serialize_json(r["state"]) |
| 47 | + # TODO: add the string length of the last reply for analyzing voting time per character. |
| 48 | + chat_dict[key] = { |
| 49 | + "timestamp": tstamp, |
| 50 | + "start": start, |
| 51 | + "finish": finish, |
| 52 | + "conv_id": r["state"]["conv_id"], |
| 53 | + } |
| 54 | + cache_queue.append({"key": key, "timestamp": tstamp}) |
| 55 | + elif mtype in ("leftvote", "rightvote", "bothbad_vote", "tievote"): |
| 56 | + left_key = _serialize_json(r["states"][0]) |
| 57 | + right_key = _serialize_json(r["states"][1]) |
| 58 | + if left_key not in chat_dict: |
| 59 | + # TODO: this sometimes happens, it means we have the vote but we cannot find previous chat, need to investigate what happens |
| 60 | + print( |
| 61 | + f'WARNING: Cannot find vote context for conversation {r["states"][0]["conv_id"]}' |
| 62 | + ) |
| 63 | + return |
| 64 | + if right_key not in chat_dict: |
| 65 | + print( |
| 66 | + f'WARNING: Cannot find vote context for conversation {r["states"][1]["conv_id"]}' |
| 67 | + ) |
| 68 | + return |
| 69 | + vote_time_data = { |
| 70 | + "timestamp": tstamp, |
| 71 | + "type": mtype, |
| 72 | + "left": chat_dict[left_key], |
| 73 | + "right": chat_dict[right_key], |
| 74 | + "ip": ip, |
| 75 | + } |
| 76 | + return vote_time_data |
| 77 | + |
| 78 | + return None |
| 79 | + |
| 80 | + |
| 81 | +def process_file(infile: str, outfile: str): |
| 82 | + with open(infile) as f: |
| 83 | + records = [] |
| 84 | + for l in f.readlines(): |
| 85 | + l = l.strip() |
| 86 | + if l: |
| 87 | + try: |
| 88 | + r = json.loads(l) |
| 89 | + if r.get("tstamp") is not None: |
| 90 | + records.append(r) |
| 91 | + except Exception: |
| 92 | + pass |
| 93 | + # sort the record in case there are out-of-order records |
| 94 | + records.sort(key=lambda x: x["tstamp"]) |
| 95 | + |
| 96 | + with open(outfile, "a") as outfile: |
| 97 | + for r in records: |
| 98 | + try: |
| 99 | + output = process_record(r) |
| 100 | + if output is not None: |
| 101 | + outfile.write(json.dumps(output) + "\n") |
| 102 | + except Exception as e: |
| 103 | + import traceback |
| 104 | + |
| 105 | + print("Error:", e) |
| 106 | + traceback.print_exc() |
| 107 | + |
| 108 | + |
| 109 | +today = datetime.datetime.today().isoformat().split("T", 1)[0] |
| 110 | +# sort it to make sure the date is continuous for each server |
| 111 | +filelist = sorted(glob.glob("/mnt/disks/data/fastchat_logs/server*/202*-*-*-conv.json")) |
| 112 | +filelist = [ |
| 113 | + f for f in filelist if today not in f |
| 114 | +] # skip today because date could be partial |
| 115 | + |
| 116 | +# TODO: change this to select different range of data |
| 117 | +filelist = [f for f in filelist if "2024-03-" in f] |
| 118 | + |
| 119 | +for f in tqdm.tqdm(filelist): |
| 120 | + process_file(f, "output.jsonl") |
0 commit comments