-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest.py
More file actions
274 lines (237 loc) · 12.5 KB
/
test.py
File metadata and controls
274 lines (237 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# test.py
import pandas as pd
import random
import os
import math
from collections import defaultdict
from config import QUESTION_TYPE_MC, QUESTION_TYPE_OE, QUESTION_TYPE_UNKNOWN
from core_logic import generate_all_tests_data
# Constants for test and output file names
TEST_EXCEL_FILE = "test_set_4_by_12_questions.xlsx"
OUTPUT_EXCEL_FILE = "similarity_analysis_unified_dice_mc_15t.xlsx"
def _load_test_questions(status_callback):
"""
Load questions from TEST_EXCEL_FILE, detect blocks and types.
Returns (all_questions, blocks_summary, None) or (None, None, error_key).
Calls status_callback only for critical errors.
"""
if not os.path.exists(TEST_EXCEL_FILE):
status_callback("error", "TEST_FILE_NOT_FOUND", filename=TEST_EXCEL_FILE)
return None, None, "TEST_FILE_NOT_FOUND"
try:
_, file_extension = os.path.splitext(TEST_EXCEL_FILE)
if file_extension.lower() not in ['.xlsx', '.xls']:
status_callback("error", "FH_UNSUPPORTED_FORMAT", filename=TEST_EXCEL_FILE, extension=file_extension)
return None, None, "FH_UNSUPPORTED_FORMAT"
df = pd.read_excel(TEST_EXCEL_FILE, header=None)
df = df.fillna('').astype(str)
all_questions = []
blocks_summary = []
current_block_id = 1
current_block_questions = []
current_block_type = None
first_question_in_block = True
df.loc[len(df)] = [""] * df.shape[1]
for index, row in df.iterrows():
is_empty_row = all(s is None or str(s).strip() == "" for s in row)
if is_empty_row:
if current_block_questions:
if current_block_type is None:
current_block_type = QUESTION_TYPE_UNKNOWN
blocks_summary.append({
'block_id': current_block_id,
'type': current_block_type,
'count': len(current_block_questions)
})
all_questions.extend(current_block_questions)
current_block_id += 1
current_block_questions = []
current_block_type = None
first_question_in_block = True
else:
row_list = [str(s).strip() for s in row]
question_text = row_list[0]
answers = [ans for ans in row_list[1:] if ans]
if question_text:
question_type = QUESTION_TYPE_MC if len(answers) >= 2 else QUESTION_TYPE_OE
if first_question_in_block:
current_block_type = question_type
first_question_in_block = False
elif question_type != current_block_type:
status_callback("warning", "FH_BLOCK_MIXED_TYPES", block_id=current_block_id, expected=current_block_type, found=question_type, row_num=index + 1)
continue
question_dict = {
'question': question_text,
'answers': answers if current_block_type == QUESTION_TYPE_MC else [],
'original_index': index,
'type': current_block_type,
'block_id': current_block_id
}
current_block_questions.append(question_dict)
blocks_summary = [b for b in blocks_summary if b['count'] > 0]
if not all_questions:
status_callback("error", "FH_NO_VALID_QUESTIONS", filename=TEST_EXCEL_FILE)
return None, None, "FH_NO_VALID_QUESTIONS"
return all_questions, blocks_summary, None
except Exception as e:
status_callback("error", "TEST_LOAD_ERROR", filename=TEST_EXCEL_FILE, error=str(e))
return None, None, "TEST_LOAD_ERROR"
def _calculate_dice(set1, set2):
"""Calculate the Sorensen-Dice coefficient between two sets."""
intersection_cardinality = len(set1.intersection(set2))
denominator = len(set1) + len(set2)
if denominator == 0:
return 1.0
return 2 * intersection_cardinality / denominator
def _run_single_unified_analysis_for_k(k_per_block, blocks_info, all_questions_list, num_tests_to_generate):
"""
Run a single similarity analysis for a given k_per_block,
using the unified logic (WRSwOR or Simple Random) from core_logic.
Returns dict {distance: avg_dice_index} and list of generation error messages.
Does NOT call status_callback.
"""
dice_by_distance = {}
max_distance_to_check = num_tests_to_generate - 1
generation_error_messages = []
def nop_callback(*args, **kwargs):
pass
block_requests = {block['block_id']: k_per_block for block in blocks_info if block['count'] >= k_per_block}
if not block_requests:
generation_error_messages.append(("error", "STAT_TEST_K_INVALID", {"k": k_per_block}))
return None, generation_error_messages
generated_tests_data, gen_messages_internal = generate_all_tests_data(
all_questions_list, block_requests, num_tests_to_generate, nop_callback
)
# Collect warnings (e.g. fallback) and errors
generation_error_messages.extend([msg for msg in gen_messages_internal if msg[0] in ('error', 'warning')])
if generated_tests_data is None:
if not any(m[1] == "STAT_TEST_GENERATION_FAILED_KPB" for m in generation_error_messages):
generation_error_messages.append(("error", "STAT_TEST_GENERATION_FAILED_KPB", {"k_per_block": k_per_block}))
return None, generation_error_messages
test_sets = [set(q['original_index'] for q in test) for test in generated_tests_data]
for d in range(1, max_distance_to_check + 1):
dice_indices_for_d = []
for i in range(num_tests_to_generate - d):
dice_index = _calculate_dice(test_sets[i], test_sets[i + d])
dice_indices_for_d.append(dice_index)
if dice_indices_for_d:
dice_by_distance[d] = dice_indices_for_d
avg_dice_results_for_k = {}
for d, indices in dice_by_distance.items():
if indices:
avg = sum(indices) / len(indices)
avg_dice_results_for_k[d] = avg if not math.isnan(avg) else 0.0
else:
avg_dice_results_for_k[d] = None
return avg_dice_results_for_k, generation_error_messages
# ================================================================
# Monte Carlo Test Orchestrator (run_all_tests)
# ================================================================
def run_all_tests(status_callback, num_monte_carlo_runs=30):
"""
Orchestrate statistical similarity analysis (Dice) with Monte Carlo,
varying k_per_block from 1 to 11, using unified logic (WRSwOR/Simple).
Saves final average results to a formatted Excel file.
Returns list of tuples (type, key, kwargs_dict) with final summary messages
and includes a success/failure message for the Excel file.
Calls status_callback only for critical errors and final messages.
"""
monte_carlo_summary = []
results_accumulator = defaultdict(lambda: defaultdict(lambda: {'sum': 0.0, 'count': 0}))
sampling_method_used = {}
fallback_counts = defaultdict(int)
# 1. Load data and block info
all_questions, blocks_summary, error_key = _load_test_questions(status_callback)
if error_key:
monte_carlo_summary.append(("error", "TEST_ABORTED_LOAD_FAILED", {}))
return monte_carlo_summary, None
expected_blocks = 4
expected_q_per_block = 12
if not blocks_summary or len(blocks_summary) != expected_blocks:
monte_carlo_summary.append(("error", "TEST_WRONG_BLOCK_COUNT", {"found": len(blocks_summary) if blocks_summary else 0, "expected": expected_blocks}))
return monte_carlo_summary, None
for block in blocks_summary:
if block['count'] != expected_q_per_block:
monte_carlo_summary.append(("error", "TEST_WRONG_Q_PER_BLOCK_COUNT", {"block_id": block['block_id'], "found": block['count'], "expected": expected_q_per_block}))
return monte_carlo_summary, None
# 2. Define parameters
num_tests_per_sequence = 15
k_per_block_values = range(1, 12)
max_distance_overall = 0
# 3. Outer Monte Carlo loop
for run in range(1, num_monte_carlo_runs + 1):
for k_block in k_per_block_values:
# Determine which method core_logic will use
method = "WRSwOR" if (k_block * 2 < expected_q_per_block) else "Simple Random"
if k_block not in sampling_method_used:
sampling_method_used[k_block] = method
avg_dice_by_distance, gen_errors = _run_single_unified_analysis_for_k(
k_block, blocks_summary, all_questions, num_tests_per_sequence
)
# Accumulate critical errors from generation
monte_carlo_summary.extend([msg for msg in gen_errors if msg[0] == 'error'])
# Count fallback warnings for this k
if any(m[1] == "BLOCK_FALLBACK_WARNING" for m in gen_errors):
fallback_counts[k_block] += 1
if avg_dice_by_distance is not None:
for d, avg_d in avg_dice_by_distance.items():
if avg_d is not None:
results_accumulator[k_block][d]['sum'] += avg_d
results_accumulator[k_block][d]['count'] += 1
max_distance_overall = max(max_distance_overall, d)
else:
monte_carlo_summary.append(("warning", "MC_TEST_FAILED_FOR_KPB_IN_RUN", {"k_per_block": k_block, "run": run, "method": method}))
# 4. Compute final averages and prepare Excel output
detailed_results_for_excel = []
sorted_k = sorted(results_accumulator.keys())
if not max_distance_overall and any(results_accumulator.values()):
max_distance_overall = 1
for k_block in sorted_k:
for d in range(1, max_distance_overall + 1):
data = results_accumulator[k_block].get(d)
if data and data['count'] > 0:
final_avg = data['sum'] / data['count']
num_samples = data['count']
else:
final_avg = None
num_samples = 0
detailed_results_for_excel.append({
'k_per_block': k_block,
'distance': d,
'avg_dice': final_avg,
'num_samples': num_samples,
'method': sampling_method_used.get(k_block, 'Unknown'),
'fallback_runs': fallback_counts.get(k_block, 0)
})
# 5. Create and save Excel file
excel_created = False
excel_filename = None
if detailed_results_for_excel:
try:
df_results = pd.DataFrame(detailed_results_for_excel)
df_pivot = pd.pivot_table(df_results, values='avg_dice', index='k_per_block', columns='distance')
method_map = pd.Series(sampling_method_used, name='Metodo Usato')
fallback_map = pd.Series(fallback_counts, name=f'WRSwOR Fallback Runs (su {num_monte_carlo_runs})')
df_pivot = df_pivot.join(method_map)
df_pivot = df_pivot.join(fallback_map)
# Sort by numeric k_per_block index before formatting
df_pivot = df_pivot.sort_index(ascending=True)
# Format index as string after sorting
df_pivot.index = [f"{k} su {expected_q_per_block}" for k in df_pivot.index]
df_pivot.index.name = f"k / n (n={expected_q_per_block} per blocco)"
# Sort and format distance columns
distance_cols = sorted([col for col in df_pivot.columns if isinstance(col, int)], key=int)
other_cols = [col for col in df_pivot.columns if not isinstance(col, int)]
df_pivot = df_pivot.reindex(distance_cols + other_cols, axis=1)
df_pivot.columns = [f"Distanza {col}" if isinstance(col, int) else col for col in df_pivot.columns]
df_pivot.to_excel(OUTPUT_EXCEL_FILE, sheet_name='Similarity_Analysis')
monte_carlo_summary.append(("success", "STAT_TEST_EXCEL_CREATED", {"filename": OUTPUT_EXCEL_FILE}))
excel_created = True
excel_filename = OUTPUT_EXCEL_FILE
except Exception as e:
monte_carlo_summary.append(("error", "STAT_TEST_EXCEL_SAVE_ERROR", {"filename": OUTPUT_EXCEL_FILE, "error": str(e)}))
elif any(results_accumulator.values()):
monte_carlo_summary.append(("warning", "STAT_TEST_NO_DATA_FOR_EXCEL", {}))
# 6. Final completion message
monte_carlo_summary.append(("info", "MC_TEST_ALL_COMPLETE", {}))
return monte_carlo_summary, excel_filename