Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ GCI = $(LOCALBIN)/gci

## Tool Versions
KUSTOMIZE_VERSION ?= v5.4.3
CONTROLLER_TOOLS_VERSION ?= v0.16.1
CONTROLLER_TOOLS_VERSION ?= v0.17.0
ENVTEST_VERSION ?= release-0.19
CRD_REF_DOCS_VERSION ?= v0.2.0
GOLANGCI_LINT_VERSION ?= v2.3.0
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ require (
github.com/spf13/cobra v1.9.1 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/x448/float16 v0.8.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
Expand Down
205 changes: 203 additions & 2 deletions latencypredictor-v1/test_dual_server_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
import tempfile

# Base URLs for the dual-server architecture
PREDICTION_URL = os.getenv("PREDICTION_SERVER_URL", "http://34.158.41.245:80") # Update this
TRAINING_URL = os.getenv("TRAINING_SERVER_URL", "http://34.143.208.0:8080") # Update this

PREDICTION_URL = os.getenv("PREDICTION_SERVER_URL", "http://<PREDICTION_IP>:80") # Update this
TRAINING_URL = os.getenv("TRAINING_SERVER_URL", "http://<TRAINING_IP>:8080") # Update this

TARGET_QPS = float(os.getenv("TARGET_QPS", 1000)) # Update this
TARGET_QPS_LARGE_BATCH = float(os.getenv("TARGET_QPS_LARGE_BATCH", 100)) # Update this
Expand Down Expand Up @@ -1133,6 +1134,204 @@ def test_server_configuration():
print(f"Training server: {train_root_data.get('message')}")
print(f" Model type: {train_root_data.get('model_type')}")

def test_training_server_flush_api():
"""Test the training server flush API and data status endpoint."""
print("Testing training server flush API...")

# 1. Check initial data status
print("Step 1: Checking initial data status...")
initial_status_r = requests.get(f"{TRAINING_URL}/data/status")
assert initial_status_r.status_code == 200
initial_status = initial_status_r.json()

print(f" Initial training samples: TTFT={initial_status['training_data']['ttft_samples']}, "
f"TPOT={initial_status['training_data']['tpot_samples']}")
print(f" Initial test samples: TTFT={initial_status['test_data']['ttft_samples']}, "
f"TPOT={initial_status['test_data']['tpot_samples']}")

# 2. Add training data
print("Step 2: Adding training data...")
training_entries = [generate_random_training_payload() for _ in range(100)]
training_payload = {"entries": training_entries}

add_r = requests.post(f"{TRAINING_URL}/add_training_data_bulk", json=training_payload)
assert add_r.status_code == 202
print(f" Added 100 training samples")

# Wait a bit for data to be processed
time.sleep(2)

# 3. Verify data was added
print("Step 3: Verifying data was added...")
after_add_status_r = requests.get(f"{TRAINING_URL}/data/status")
assert after_add_status_r.status_code == 200
after_add_status = after_add_status_r.json()

total_samples_after = after_add_status['training_data']['total_samples'] + after_add_status['test_data']['total_samples']
print(f" After adding - Training: {after_add_status['training_data']['total_samples']}, "
f"Test: {after_add_status['test_data']['total_samples']}, Total: {total_samples_after}")

# Should have more data now (some goes to training, some to test based on TEST_TRAIN_RATIO)
assert total_samples_after > 0, "No samples were added"

# 4. Test flush with only training data
print("Step 4: Testing flush with only training data...")
flush_training_only = {
"flush_training_data": True,
"flush_test_data": False,
"flush_metrics": False,
"reason": "Test flush training data only"
}

flush_r = requests.post(f"{TRAINING_URL}/flush", json=flush_training_only)
assert flush_r.status_code == 200
flush_response = flush_r.json()

assert flush_response["success"] == True
assert flush_response["metrics_cleared"] == False
assert flush_response["reason"] == "Test flush training data only"

print(f" Flushed {flush_response['ttft_training_samples_flushed']} TTFT training samples")
print(f" Flushed {flush_response['tpot_training_samples_flushed']} TPOT training samples")
print(f" Test samples flushed: {flush_response['ttft_test_samples_flushed']} TTFT, "
f"{flush_response['tpot_test_samples_flushed']} TPOT (should be 0)")

# Verify training data was flushed but test data remains
after_flush_training_r = requests.get(f"{TRAINING_URL}/data/status")
after_flush_training = after_flush_training_r.json()

assert after_flush_training['training_data']['total_samples'] == 0, "Training data should be empty"
# Test data should still exist if any was added
print(f" After training flush - Training: {after_flush_training['training_data']['total_samples']}, "
f"Test: {after_flush_training['test_data']['total_samples']}")

# 5. Add more data
print("Step 5: Adding more training data...")
more_entries = [generate_random_training_payload() for _ in range(50)]
requests.post(f"{TRAINING_URL}/add_training_data_bulk", json={"entries": more_entries})
time.sleep(2)

# 6. Test flush everything
print("Step 6: Testing flush everything...")
flush_all = {
"flush_training_data": True,
"flush_test_data": True,
"flush_metrics": True,
"reason": "Complete flush test"
}

flush_all_r = requests.post(f"{TRAINING_URL}/flush", json=flush_all)
assert flush_all_r.status_code == 200
flush_all_response = flush_all_r.json()

assert flush_all_response["success"] == True
assert flush_all_response["metrics_cleared"] == True
assert "Successfully flushed" in flush_all_response["message"]

print(f" Complete flush message: {flush_all_response['message']}")

# Verify everything was flushed
after_flush_all_r = requests.get(f"{TRAINING_URL}/data/status")
after_flush_all = after_flush_all_r.json()

assert after_flush_all['training_data']['total_samples'] == 0, "Training data should be empty"
assert after_flush_all['test_data']['total_samples'] == 0, "Test data should be empty"

print(f" After complete flush - Training: {after_flush_all['training_data']['total_samples']}, "
f"Test: {after_flush_all['test_data']['total_samples']}")

# 7. Test flush with default parameters (should flush everything)
print("Step 7: Testing default flush (no body)...")

# Add some data first
requests.post(f"{TRAINING_URL}/add_training_data_bulk",
json={"entries": [generate_random_training_payload() for _ in range(20)]})
time.sleep(1)

# Flush with empty body (uses defaults)
default_flush_r = requests.post(f"{TRAINING_URL}/flush")
assert default_flush_r.status_code == 200
default_flush_response = default_flush_r.json()

assert default_flush_response["success"] == True
print(f" Default flush result: {default_flush_response['message']}")

# 8. Test flush with only test data
print("Step 8: Testing flush with only test data...")

# Add data
requests.post(f"{TRAINING_URL}/add_training_data_bulk",
json={"entries": [generate_random_training_payload() for _ in range(50)]})
time.sleep(2)

# Get status before
before_test_flush_r = requests.get(f"{TRAINING_URL}/data/status")
before_test_flush = before_test_flush_r.json()

# Flush only test data
flush_test_only = {
"flush_training_data": False,
"flush_test_data": True,
"flush_metrics": False,
"reason": "Test flush test data only"
}

flush_test_r = requests.post(f"{TRAINING_URL}/flush", json=flush_test_only)
assert flush_test_r.status_code == 200
flush_test_response = flush_test_r.json()

print(f" Test data flush: {flush_test_response['ttft_test_samples_flushed']} TTFT, "
f"{flush_test_response['tpot_test_samples_flushed']} TPOT")

# Verify only test data was flushed
after_test_flush_r = requests.get(f"{TRAINING_URL}/data/status")
after_test_flush = after_test_flush_r.json()

assert after_test_flush['test_data']['total_samples'] == 0, "Test data should be empty"
# Training data should still exist
print(f" After test flush - Training: {after_test_flush['training_data']['total_samples']}, "
f"Test: {after_test_flush['test_data']['total_samples']}")

# 9. Test bucket distribution in status
print("Step 9: Testing bucket distribution in status...")
if "bucket_distribution" in after_flush_all:
print(f" Bucket distribution available: {len(after_flush_all.get('bucket_distribution', {}))} buckets with data")

print("✓ Flush API tests passed!")


def test_training_server_flush_error_handling():
"""Test error handling in flush API."""
print("Testing flush API error handling...")

# Test with invalid JSON
invalid_json = '{"flush_training_data": "not_a_boolean"}'
headers = {'Content-Type': 'application/json'}

try:
r = requests.post(f"{TRAINING_URL}/flush", data=invalid_json, headers=headers)
# Should get validation error
assert r.status_code in [400, 422], f"Expected 400 or 422, got {r.status_code}"
print("✓ Invalid JSON handled correctly")
except Exception as e:
print(f"⚠️ Error handling test skipped: {e}")

# Test with valid parameters
valid_flush = {
"flush_training_data": False,
"flush_test_data": False,
"flush_metrics": True,
"reason": "Metrics only flush"
}

r = requests.post(f"{TRAINING_URL}/flush", json=valid_flush)
assert r.status_code == 200
response = r.json()
assert response["metrics_cleared"] == True
assert response["ttft_training_samples_flushed"] == 0
assert response["tpot_training_samples_flushed"] == 0

print("✓ Flush error handling tests passed!")

if __name__ == "__main__":
print("Running dual-server architecture tests with prefix cache score support...")
Expand Down Expand Up @@ -1168,6 +1367,8 @@ def test_server_configuration():
("Training Metrics", test_training_server_metrics),
("Model Consistency", test_model_consistency_between_servers),
("XGBoost Trees", test_model_specific_endpoints_on_training_server),
("Flush API", test_training_server_flush_api),
("Flush Error Handling", test_training_server_flush_error_handling),

("Dual Server Model Learns Equation", test_dual_server_quantile_regression_learns_distribution),
("End-to-End Workflow", test_end_to_end_workflow),
Expand Down
Loading