Skip to content

Commit d2adf5c

Browse files
committed
add flush training data
1 parent 2e808b2 commit d2adf5c

File tree

3 files changed

+485
-21
lines changed

3 files changed

+485
-21
lines changed

latencypredictor-v1/test_dual_server_client.py

Lines changed: 202 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
import tempfile
1414

1515
# Base URLs for the dual-server architecture
16-
PREDICTION_URL = os.getenv("PREDICTION_SERVER_URL", "http://34.158.41.245:80") # Update this
17-
TRAINING_URL = os.getenv("TRAINING_SERVER_URL", "http://34.143.208.0:8080") # Update this
16+
PREDICTION_URL = os.getenv("PREDICTION_SERVER_URL", "http://34.158.34.93:80") # Update this
17+
TRAINING_URL = os.getenv("TRAINING_SERVER_URL", "http://34.158.41.245:8080") # Update this
1818

1919
TARGET_QPS = float(os.getenv("TARGET_QPS", 1000)) # Update this
2020
TARGET_QPS_LARGE_BATCH = float(os.getenv("TARGET_QPS_LARGE_BATCH", 100)) # Update this
@@ -1133,6 +1133,204 @@ def test_server_configuration():
11331133
print(f"Training server: {train_root_data.get('message')}")
11341134
print(f" Model type: {train_root_data.get('model_type')}")
11351135

1136+
def test_training_server_flush_api():
1137+
"""Test the training server flush API and data status endpoint."""
1138+
print("Testing training server flush API...")
1139+
1140+
# 1. Check initial data status
1141+
print("Step 1: Checking initial data status...")
1142+
initial_status_r = requests.get(f"{TRAINING_URL}/data/status")
1143+
assert initial_status_r.status_code == 200
1144+
initial_status = initial_status_r.json()
1145+
1146+
print(f" Initial training samples: TTFT={initial_status['training_data']['ttft_samples']}, "
1147+
f"TPOT={initial_status['training_data']['tpot_samples']}")
1148+
print(f" Initial test samples: TTFT={initial_status['test_data']['ttft_samples']}, "
1149+
f"TPOT={initial_status['test_data']['tpot_samples']}")
1150+
1151+
# 2. Add training data
1152+
print("Step 2: Adding training data...")
1153+
training_entries = [generate_random_training_payload() for _ in range(100)]
1154+
training_payload = {"entries": training_entries}
1155+
1156+
add_r = requests.post(f"{TRAINING_URL}/add_training_data_bulk", json=training_payload)
1157+
assert add_r.status_code == 202
1158+
print(f" Added 100 training samples")
1159+
1160+
# Wait a bit for data to be processed
1161+
time.sleep(2)
1162+
1163+
# 3. Verify data was added
1164+
print("Step 3: Verifying data was added...")
1165+
after_add_status_r = requests.get(f"{TRAINING_URL}/data/status")
1166+
assert after_add_status_r.status_code == 200
1167+
after_add_status = after_add_status_r.json()
1168+
1169+
total_samples_after = after_add_status['training_data']['total_samples'] + after_add_status['test_data']['total_samples']
1170+
print(f" After adding - Training: {after_add_status['training_data']['total_samples']}, "
1171+
f"Test: {after_add_status['test_data']['total_samples']}, Total: {total_samples_after}")
1172+
1173+
# Should have more data now (some goes to training, some to test based on TEST_TRAIN_RATIO)
1174+
assert total_samples_after > 0, "No samples were added"
1175+
1176+
# 4. Test flush with only training data
1177+
print("Step 4: Testing flush with only training data...")
1178+
flush_training_only = {
1179+
"flush_training_data": True,
1180+
"flush_test_data": False,
1181+
"flush_metrics": False,
1182+
"reason": "Test flush training data only"
1183+
}
1184+
1185+
flush_r = requests.post(f"{TRAINING_URL}/flush", json=flush_training_only)
1186+
assert flush_r.status_code == 200
1187+
flush_response = flush_r.json()
1188+
1189+
assert flush_response["success"] == True
1190+
assert flush_response["metrics_cleared"] == False
1191+
assert flush_response["reason"] == "Test flush training data only"
1192+
1193+
print(f" Flushed {flush_response['ttft_training_samples_flushed']} TTFT training samples")
1194+
print(f" Flushed {flush_response['tpot_training_samples_flushed']} TPOT training samples")
1195+
print(f" Test samples flushed: {flush_response['ttft_test_samples_flushed']} TTFT, "
1196+
f"{flush_response['tpot_test_samples_flushed']} TPOT (should be 0)")
1197+
1198+
# Verify training data was flushed but test data remains
1199+
after_flush_training_r = requests.get(f"{TRAINING_URL}/data/status")
1200+
after_flush_training = after_flush_training_r.json()
1201+
1202+
assert after_flush_training['training_data']['total_samples'] == 0, "Training data should be empty"
1203+
# Test data should still exist if any was added
1204+
print(f" After training flush - Training: {after_flush_training['training_data']['total_samples']}, "
1205+
f"Test: {after_flush_training['test_data']['total_samples']}")
1206+
1207+
# 5. Add more data
1208+
print("Step 5: Adding more training data...")
1209+
more_entries = [generate_random_training_payload() for _ in range(50)]
1210+
requests.post(f"{TRAINING_URL}/add_training_data_bulk", json={"entries": more_entries})
1211+
time.sleep(2)
1212+
1213+
# 6. Test flush everything
1214+
print("Step 6: Testing flush everything...")
1215+
flush_all = {
1216+
"flush_training_data": True,
1217+
"flush_test_data": True,
1218+
"flush_metrics": True,
1219+
"reason": "Complete flush test"
1220+
}
1221+
1222+
flush_all_r = requests.post(f"{TRAINING_URL}/flush", json=flush_all)
1223+
assert flush_all_r.status_code == 200
1224+
flush_all_response = flush_all_r.json()
1225+
1226+
assert flush_all_response["success"] == True
1227+
assert flush_all_response["metrics_cleared"] == True
1228+
assert "Successfully flushed" in flush_all_response["message"]
1229+
1230+
print(f" Complete flush message: {flush_all_response['message']}")
1231+
1232+
# Verify everything was flushed
1233+
after_flush_all_r = requests.get(f"{TRAINING_URL}/data/status")
1234+
after_flush_all = after_flush_all_r.json()
1235+
1236+
assert after_flush_all['training_data']['total_samples'] == 0, "Training data should be empty"
1237+
assert after_flush_all['test_data']['total_samples'] == 0, "Test data should be empty"
1238+
1239+
print(f" After complete flush - Training: {after_flush_all['training_data']['total_samples']}, "
1240+
f"Test: {after_flush_all['test_data']['total_samples']}")
1241+
1242+
# 7. Test flush with default parameters (should flush everything)
1243+
print("Step 7: Testing default flush (no body)...")
1244+
1245+
# Add some data first
1246+
requests.post(f"{TRAINING_URL}/add_training_data_bulk",
1247+
json={"entries": [generate_random_training_payload() for _ in range(20)]})
1248+
time.sleep(1)
1249+
1250+
# Flush with empty body (uses defaults)
1251+
default_flush_r = requests.post(f"{TRAINING_URL}/flush")
1252+
assert default_flush_r.status_code == 200
1253+
default_flush_response = default_flush_r.json()
1254+
1255+
assert default_flush_response["success"] == True
1256+
print(f" Default flush result: {default_flush_response['message']}")
1257+
1258+
# 8. Test flush with only test data
1259+
print("Step 8: Testing flush with only test data...")
1260+
1261+
# Add data
1262+
requests.post(f"{TRAINING_URL}/add_training_data_bulk",
1263+
json={"entries": [generate_random_training_payload() for _ in range(50)]})
1264+
time.sleep(2)
1265+
1266+
# Get status before
1267+
before_test_flush_r = requests.get(f"{TRAINING_URL}/data/status")
1268+
before_test_flush = before_test_flush_r.json()
1269+
1270+
# Flush only test data
1271+
flush_test_only = {
1272+
"flush_training_data": False,
1273+
"flush_test_data": True,
1274+
"flush_metrics": False,
1275+
"reason": "Test flush test data only"
1276+
}
1277+
1278+
flush_test_r = requests.post(f"{TRAINING_URL}/flush", json=flush_test_only)
1279+
assert flush_test_r.status_code == 200
1280+
flush_test_response = flush_test_r.json()
1281+
1282+
print(f" Test data flush: {flush_test_response['ttft_test_samples_flushed']} TTFT, "
1283+
f"{flush_test_response['tpot_test_samples_flushed']} TPOT")
1284+
1285+
# Verify only test data was flushed
1286+
after_test_flush_r = requests.get(f"{TRAINING_URL}/data/status")
1287+
after_test_flush = after_test_flush_r.json()
1288+
1289+
assert after_test_flush['test_data']['total_samples'] == 0, "Test data should be empty"
1290+
# Training data should still exist
1291+
print(f" After test flush - Training: {after_test_flush['training_data']['total_samples']}, "
1292+
f"Test: {after_test_flush['test_data']['total_samples']}")
1293+
1294+
# 9. Test bucket distribution in status
1295+
print("Step 9: Testing bucket distribution in status...")
1296+
if "bucket_distribution" in after_flush_all:
1297+
print(f" Bucket distribution available: {len(after_flush_all.get('bucket_distribution', {}))} buckets with data")
1298+
1299+
print("✓ Flush API tests passed!")
1300+
1301+
1302+
def test_training_server_flush_error_handling():
1303+
"""Test error handling in flush API."""
1304+
print("Testing flush API error handling...")
1305+
1306+
# Test with invalid JSON
1307+
invalid_json = '{"flush_training_data": "not_a_boolean"}'
1308+
headers = {'Content-Type': 'application/json'}
1309+
1310+
try:
1311+
r = requests.post(f"{TRAINING_URL}/flush", data=invalid_json, headers=headers)
1312+
# Should get validation error
1313+
assert r.status_code in [400, 422], f"Expected 400 or 422, got {r.status_code}"
1314+
print("✓ Invalid JSON handled correctly")
1315+
except Exception as e:
1316+
print(f"⚠️ Error handling test skipped: {e}")
1317+
1318+
# Test with valid parameters
1319+
valid_flush = {
1320+
"flush_training_data": False,
1321+
"flush_test_data": False,
1322+
"flush_metrics": True,
1323+
"reason": "Metrics only flush"
1324+
}
1325+
1326+
r = requests.post(f"{TRAINING_URL}/flush", json=valid_flush)
1327+
assert r.status_code == 200
1328+
response = r.json()
1329+
assert response["metrics_cleared"] == True
1330+
assert response["ttft_training_samples_flushed"] == 0
1331+
assert response["tpot_training_samples_flushed"] == 0
1332+
1333+
print("✓ Flush error handling tests passed!")
11361334

11371335
if __name__ == "__main__":
11381336
print("Running dual-server architecture tests with prefix cache score support...")
@@ -1168,6 +1366,8 @@ def test_server_configuration():
11681366
("Training Metrics", test_training_server_metrics),
11691367
("Model Consistency", test_model_consistency_between_servers),
11701368
("XGBoost Trees", test_model_specific_endpoints_on_training_server),
1369+
("Flush API", test_training_server_flush_api),
1370+
("Flush Error Handling", test_training_server_flush_error_handling),
11711371

11721372
("Dual Server Model Learns Equation", test_dual_server_quantile_regression_learns_distribution),
11731373
("End-to-End Workflow", test_end_to_end_workflow),

0 commit comments

Comments
 (0)