Skip to content

Commit dc69cb2

Browse files
authored
Feat/evaluation doc qa (#649)
* fix: doc fine mode bug * fix: doc fine mode bug * feat: init longbench_v2 * feat: more strict embedder trucation * feat: parallel processing fine mode in multi-modal-fine * feat: update parsers; add chunk info into source; remove origin_part * feat: modify chunk_content in file-fine-parser * fix: token counter bug * feat: enlarge polardb
1 parent 75e9d33 commit dc69cb2

File tree

16 files changed

+904
-1053
lines changed

16 files changed

+904
-1053
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# LongBench v2 evaluation scripts
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
import argparse
2+
import json
3+
import os
4+
import sys
5+
import threading
6+
7+
from concurrent.futures import ThreadPoolExecutor, as_completed
8+
9+
from dotenv import load_dotenv
10+
from tqdm import tqdm
11+
12+
13+
ROOT_DIR = os.path.dirname(
14+
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
15+
)
16+
EVAL_SCRIPTS_DIR = os.path.join(ROOT_DIR, "evaluation", "scripts")
17+
18+
sys.path.insert(0, ROOT_DIR)
19+
sys.path.insert(0, EVAL_SCRIPTS_DIR)
20+
21+
22+
def ingest_sample(
23+
client, sample, sample_idx, frame, version, success_records, record_file, file_lock
24+
):
25+
"""Ingest a single LongBench v2 sample as memories."""
26+
# Skip if already processed
27+
if str(sample_idx) in success_records:
28+
return True
29+
30+
user_id = f"longbench_v2_{sample_idx}_{version}"
31+
conv_id = f"longbench_v2_{sample_idx}_{version}"
32+
33+
# Get context and convert to messages
34+
context = sample.get("context", "")
35+
36+
# For memos, we ingest the context as document content
37+
messages = [
38+
{
39+
"type": "file",
40+
"file": {
41+
"file_data": context,
42+
"file_id": str(sample_idx),
43+
},
44+
}
45+
]
46+
47+
if "memos-api" in frame:
48+
try:
49+
client.add(messages=messages, user_id=user_id, conv_id=conv_id, batch_size=1)
50+
print(f"✅ [{frame}] Ingested sample {sample_idx}")
51+
# Record successful ingestion (thread-safe)
52+
with file_lock, open(record_file, "a") as f:
53+
f.write(f"{sample_idx}\n")
54+
f.flush()
55+
return True
56+
except Exception as e:
57+
print(f"❌ [{frame}] Error ingesting sample {sample_idx}: {e}")
58+
return False
59+
60+
return False
61+
62+
63+
def load_dataset_from_local():
64+
"""Load LongBench v2 dataset from local JSON file."""
65+
data_dir = os.path.join(
66+
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
67+
"data",
68+
"long_bench_v2",
69+
)
70+
71+
filepath = os.path.join(data_dir, "data.json")
72+
73+
if not os.path.exists(filepath):
74+
raise FileNotFoundError(f"Dataset file not found: {filepath}")
75+
76+
# Load JSON file
77+
with open(filepath, encoding="utf-8") as f:
78+
samples = json.load(f)
79+
80+
return samples
81+
82+
83+
def main(frame, version="default", num_workers=10, max_samples=None):
84+
"""Main ingestion function."""
85+
load_dotenv()
86+
87+
print("\n" + "=" * 80)
88+
print(f"🚀 LONGBENCH V2 INGESTION - {frame.upper()} v{version}".center(80))
89+
print("=" * 80 + "\n")
90+
91+
# Load dataset from local file
92+
try:
93+
dataset = load_dataset_from_local()
94+
print(f"Loaded {len(dataset)} samples from LongBench v2")
95+
except FileNotFoundError as e:
96+
print(f"❌ Error loading dataset: {e}")
97+
return
98+
except Exception as e:
99+
print(f"❌ Error loading dataset: {e}")
100+
return
101+
102+
# Limit samples if specified
103+
if max_samples:
104+
dataset = dataset[:max_samples]
105+
print(f"Limited to {len(dataset)} samples")
106+
107+
# Initialize checkpoint file for resume functionality
108+
checkpoint_dir = os.path.join(
109+
ROOT_DIR, "evaluation", "results", "longbench_v2", f"{frame}-{version}"
110+
)
111+
os.makedirs(checkpoint_dir, exist_ok=True)
112+
record_file = os.path.join(checkpoint_dir, "success_records.txt")
113+
114+
# Load existing success records for resume
115+
success_records = set()
116+
if os.path.exists(record_file):
117+
with open(record_file) as f:
118+
for line in f:
119+
line = line.strip()
120+
if line:
121+
success_records.add(line)
122+
print(f"📋 Found {len(success_records)} already processed samples (resume mode)")
123+
else:
124+
print("📋 Starting fresh ingestion (no checkpoint found)")
125+
126+
# Initialize client
127+
client = None
128+
if frame == "memos-api":
129+
from utils.client import MemosApiClient
130+
131+
client = MemosApiClient()
132+
else:
133+
print(f"❌ Unsupported frame: {frame}")
134+
return
135+
136+
# Ingest samples
137+
success_count = len(success_records) # Start with already processed count
138+
file_lock = threading.Lock() # Lock for thread-safe file writing
139+
with ThreadPoolExecutor(max_workers=num_workers) as executor:
140+
futures = []
141+
for idx, sample in enumerate(dataset):
142+
future = executor.submit(
143+
ingest_sample,
144+
client,
145+
sample,
146+
idx,
147+
frame,
148+
version,
149+
success_records,
150+
record_file,
151+
file_lock,
152+
)
153+
futures.append(future)
154+
155+
for future in tqdm(
156+
as_completed(futures),
157+
total=len(futures),
158+
desc="Ingesting LongBench v2",
159+
):
160+
try:
161+
if future.result():
162+
success_count += 1
163+
except Exception as e:
164+
print(f"Error processing sample: {e}")
165+
166+
print(f"\n{'=' * 80}")
167+
print(f"✅ INGESTION COMPLETE: {success_count}/{len(dataset)} samples ingested".center(80))
168+
print(f"{'=' * 80}\n")
169+
170+
171+
if __name__ == "__main__":
172+
parser = argparse.ArgumentParser()
173+
parser.add_argument(
174+
"--lib",
175+
type=str,
176+
choices=["memos-api", "memos-api-online"],
177+
default="memos-api",
178+
)
179+
parser.add_argument(
180+
"--version",
181+
type=str,
182+
default="long-bench-v2-1208-1556",
183+
help="Version identifier for saving results",
184+
)
185+
parser.add_argument(
186+
"--workers",
187+
type=int,
188+
default=20,
189+
help="Number of parallel workers",
190+
)
191+
parser.add_argument(
192+
"--max_samples",
193+
type=int,
194+
default=None,
195+
help="Maximum number of samples to process (default: all)",
196+
)
197+
args = parser.parse_args()
198+
199+
main(args.lib, args.version, args.workers, args.max_samples)
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import argparse
2+
import json
3+
import os
4+
import sys
5+
6+
from concurrent.futures import ThreadPoolExecutor, as_completed
7+
8+
from dotenv import load_dotenv
9+
from tqdm import tqdm
10+
11+
12+
ROOT_DIR = os.path.dirname(
13+
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
14+
)
15+
EVAL_SCRIPTS_DIR = os.path.join(ROOT_DIR, "evaluation", "scripts")
16+
17+
sys.path.insert(0, ROOT_DIR)
18+
sys.path.insert(0, EVAL_SCRIPTS_DIR)
19+
20+
21+
def ingest_sample(client, sample, sample_idx, frame, version):
22+
"""Ingest a single LongBench v2 sample as memories."""
23+
user_id = f"longbench_v2_{sample_idx}_{version}"
24+
conv_id = f"longbench_v2_{sample_idx}_{version}"
25+
26+
# Get context and convert to messages
27+
context = sample.get("context", "")
28+
29+
# For memos, we ingest the context as document content
30+
messages = [
31+
{
32+
"type": "file",
33+
"file": {
34+
"file_data": context,
35+
"file_id": str(sample_idx),
36+
},
37+
}
38+
]
39+
40+
if "memos-api" in frame:
41+
try:
42+
client.add(messages=messages, user_id=user_id, conv_id=conv_id, batch_size=1)
43+
print(f"✅ [{frame}] Ingested sample {sample_idx}")
44+
return True
45+
except Exception as e:
46+
print(f"❌ [{frame}] Error ingesting sample {sample_idx}: {e}")
47+
return False
48+
49+
return False
50+
51+
52+
def load_dataset_from_local():
53+
"""Load LongBench v2 dataset from local JSON file."""
54+
data_dir = os.path.join(
55+
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
56+
"data",
57+
"long_bench_v2",
58+
)
59+
60+
filepath = os.path.join(data_dir, "data.json")
61+
62+
if not os.path.exists(filepath):
63+
raise FileNotFoundError(f"Dataset file not found: {filepath}")
64+
65+
# Load JSON file
66+
with open(filepath, encoding="utf-8") as f:
67+
samples = json.load(f)
68+
69+
return samples
70+
71+
72+
def main(frame, version="default", num_workers=10, max_samples=None):
73+
"""Main ingestion function."""
74+
load_dotenv()
75+
76+
print("\n" + "=" * 80)
77+
print(f"🚀 LONGBENCH V2 INGESTION - {frame.upper()} v{version}".center(80))
78+
print("=" * 80 + "\n")
79+
80+
# Load dataset from local file
81+
try:
82+
dataset = load_dataset_from_local()
83+
print(f"Loaded {len(dataset)} samples from LongBench v2")
84+
except FileNotFoundError as e:
85+
print(f"❌ Error loading dataset: {e}")
86+
return
87+
except Exception as e:
88+
print(f"❌ Error loading dataset: {e}")
89+
return
90+
91+
# Limit samples if specified
92+
if max_samples:
93+
dataset = dataset[:max_samples]
94+
print(f"Limited to {len(dataset)} samples")
95+
96+
# Initialize client
97+
client = None
98+
if frame == "memos-api":
99+
from utils.client import MemosApiClient
100+
101+
client = MemosApiClient()
102+
else:
103+
print(f"❌ Unsupported frame: {frame}")
104+
return
105+
106+
# Ingest samples
107+
success_count = 0
108+
with ThreadPoolExecutor(max_workers=num_workers) as executor:
109+
futures = []
110+
for idx, sample in enumerate(dataset):
111+
future = executor.submit(ingest_sample, client, sample, idx, frame, version)
112+
futures.append(future)
113+
114+
for future in tqdm(
115+
as_completed(futures),
116+
total=len(futures),
117+
desc="Ingesting LongBench v2",
118+
):
119+
try:
120+
if future.result():
121+
success_count += 1
122+
except Exception as e:
123+
print(f"Error processing sample: {e}")
124+
125+
print(f"\n{'=' * 80}")
126+
print(f"✅ INGESTION COMPLETE: {success_count}/{len(dataset)} samples ingested".center(80))
127+
print(f"{'=' * 80}\n")
128+
129+
130+
if __name__ == "__main__":
131+
parser = argparse.ArgumentParser()
132+
parser.add_argument(
133+
"--lib",
134+
type=str,
135+
choices=["memos-api", "memos-api-online"],
136+
default="memos-api",
137+
)
138+
parser.add_argument(
139+
"--version",
140+
type=str,
141+
default="long-bench-v2-1208-1556-async",
142+
help="Version identifier for saving results",
143+
)
144+
parser.add_argument(
145+
"--workers",
146+
type=int,
147+
default=20,
148+
help="Number of parallel workers",
149+
)
150+
parser.add_argument(
151+
"--max_samples",
152+
type=int,
153+
default=None,
154+
help="Maximum number of samples to process (default: all)",
155+
)
156+
args = parser.parse_args()
157+
158+
main(args.lib, args.version, args.workers, args.max_samples)

0 commit comments

Comments
 (0)