-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest01.py
More file actions
177 lines (145 loc) Β· 5.44 KB
/
test01.py
File metadata and controls
177 lines (145 loc) Β· 5.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
"""
Fixed test script to verify infrastructure connections
Run this from the project root directory
"""
import asyncio
import sys
import os
# Add services to path
sys.path.append(os.path.join(os.path.dirname(__file__), 'services'))
from services.common.database import db_manager
from services.common.kafka_client import kafka_manager
from services.common.models import Store, Product, InventoryItem, Address, Coordinates, Dimensions, ProductCategory
async def test_mongodb():
"""Test MongoDB connection"""
print("π Testing MongoDB connection...")
try:
# Connect to database
success = await db_manager.connect()
if not success:
print("β MongoDB connection failed")
return False
# Test basic operations
test_doc = {"test": "data", "number": 123}
doc_id = await db_manager.insert_one("test_collection", test_doc)
print(f"β
Inserted test document with ID: {doc_id}")
# Retrieve document
retrieved = await db_manager.find_one("test_collection", {"test": "data"})
if retrieved:
print("β
Retrieved test document successfully")
# Clean up
await db_manager.delete_one("test_collection", {"test": "data"})
print("β
MongoDB test completed successfully")
return True
except Exception as e:
print(f"β MongoDB test failed: {e}")
return False
async def test_kafka():
"""Test Kafka connection"""
print("\nπ Testing Kafka connection...")
try:
# Create topics
await kafka_manager.create_topics()
print("β
Kafka topics created/verified")
# Start producer
await kafka_manager.start_producer()
print("β
Kafka producer started")
# Send test message
test_message = {"test": "message", "data": "hello kafka"}
await kafka_manager.send_message("test-topic", test_message)
print("β
Test message sent successfully")
# Health check
health = await kafka_manager.health_check()
if health:
print("β
Kafka health check passed")
print("β
Kafka test completed successfully")
return True
except Exception as e:
print(f"β Kafka test failed: {e}")
return False
async def test_models():
"""Test Pydantic models"""
print("\nπ Testing Pydantic models...")
try:
# Test Address and Coordinates
address = Address(
street="123 Main St",
city="Anytown",
state="CA",
postal_code="12345",
country="USA",
coordinates=Coordinates(latitude=37.7749, longitude=-122.4194)
)
print("β
Address model validation passed")
# Test Store model
store = Store(
store_id="STORE001",
name="Test Store",
address=address,
manager_name="John Doe",
phone="555-1234",
email="manager@teststore.com"
)
print("β
Store model validation passed")
# Test Product model
product = Product(
product_id="PROD001",
name="Test Product",
category=ProductCategory.ELECTRONICS,
price="99.99",
weight=2.5,
dimensions=Dimensions(length=10, width=5, height=3)
)
print("β
Product model validation passed")
# Test InventoryItem model - Fixed validation
inventory = InventoryItem(
store_id="STORE001",
product_id="PROD001",
current_stock=100,
reserved_stock=5,
available_stock=95, # This should be calculated correctly
reorder_threshold=20,
warning_threshold=15,
critical_threshold=5,
max_capacity=500
)
print("β
InventoryItem model validation passed")
print(f" Available stock calculated: {inventory.available_stock}")
print("β
All Pydantic models test completed successfully")
return True
except Exception as e:
print(f"β Models test failed: {e}")
import traceback
traceback.print_exc()
return False
async def main():
"""Run all tests"""
print("π Starting infrastructure tests (Fixed version)...\n")
# Test models (no external dependencies)
models_ok = await test_models()
# Test MongoDB
mongodb_ok = await test_mongodb()
# Test Kafka
kafka_ok = await test_kafka()
# Cleanup
if mongodb_ok:
await db_manager.disconnect()
if kafka_ok:
await kafka_manager.stop_producer()
# Summary
print(f"\nπ Test Results:")
print(f" Models: {'β
PASS' if models_ok else 'β FAIL'}")
print(f" MongoDB: {'β
PASS' if mongodb_ok else 'β FAIL'}")
print(f" Kafka: {'β
PASS' if kafka_ok else 'β FAIL'}")
if all([models_ok, mongodb_ok, kafka_ok]):
print("\nπ All tests passed! Infrastructure is ready.")
print("\nπ You can now start the inventory service:")
print(" cd services/inventory-service")
print(" python main.py")
return True
else:
print("\nβ οΈ Some tests failed. Check the logs above.")
return False
if __name__ == "__main__":
success = asyncio.run(main())
sys.exit(0 if success else 1)