-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
115 lines (102 loc) · 3.68 KB
/
main.py
File metadata and controls
115 lines (102 loc) · 3.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from fastapi import FastAPI
import uvicorn
import asyncio
import uuid
import random
import json
import utils
from sqlalchemy.exc import OperationalError
from infrastructure.consumers import subscribe_to_topic
from modules.orders.application.events.events import OrderEvent, EventPayload
from modules.orders.application.commands.commands import OrderCommand
from modules.sagas.application.messages.payloads import QueryMessage
from config.db import Base, engine, initialize_base
from infrastructure.dispatchers import Dispatcher
from modules.sagas.infrastructure.repositories import TransactionLogRepositorySQLAlchemy
from config.db import get_db
from modules.sagas.application.choreography import SagasEvent
app = FastAPI()
tasks = list()
initialize_base()
try:
Base.metadata.create_all(bind=engine)
except OperationalError:
Base.metadata.create_all(bind=engine)
@app.on_event("startup")
async def app_startup():
global tasks
task1 = asyncio.ensure_future(subscribe_to_topic(
"order-events", "sub-sagas", OrderEvent))
task2 = asyncio.ensure_future(subscribe_to_topic(
"order-commands", "com-sagas", OrderCommand))
task3 = asyncio.ensure_future(subscribe_to_topic(
"order-queries", "query-sagas", QueryMessage))
tasks.append(task1)
tasks.append(task2)
@app.on_event("shutdown")
def shutdown_event():
global tasks
for task in tasks:
task.cancel()
@app.get("/orders")
def create_order_endpoint():
event_payload = EventPayload(
order_id = str(uuid.uuid4()),
customer_id = str(uuid.uuid4()),
order_date = str("2023-02-27T08:05:08.464634"),
order_status = "Created",
order_items = json.dumps([{ "product_id": "9cad4dc7-50c0-44d7-9ed9-3f887a9d565b", "supplier_id": "987eba3c-ae2b-4382-86f9-7ea238733e05", "name": "product1", "description": "Test Desc", "price": 33000.0, "quantity": 5 } ]),
order_total = float(random.randint(2, 15000)),
order_version = int(random.randint(1,10))
)
event1 = OrderEvent(
time = utils.time_millis(),
ingestion = utils.time_millis(),
datacontenttype = EventPayload.__name__,
data_payload = event_payload,
type = "EventOrderCreated"
)
event2 = OrderEvent(
time = utils.time_millis(),
ingestion = utils.time_millis(),
datacontenttype = EventPayload.__name__,
data_payload = event_payload,
type = "EventInventoryChecked"
)
event3 = OrderEvent(
time = utils.time_millis(),
ingestion = utils.time_millis(),
datacontenttype = EventPayload.__name__,
data_payload = event_payload,
type = "ErrorDispatchingOrder"
)
dispatcher = Dispatcher()
dispatcher.publish_message(event1, "order-events")
dispatcher.publish_message(event2, "order-events")
dispatcher.publish_message(event3, "order-events")
return {"message": "Order created successfully"}
@app.get("/test")
def test_db_sagas():
db = get_db()
repository = TransactionLogRepositorySQLAlchemy(db)
event = SagasEvent(
event_id = uuid.uuid4(),
event_type="TestEvent",
order_id = uuid.uuid4(),
order_status = random.choice(["Created", "Checking Inventory", "Ready for Dispatch"])
)
response = repository.create(event)
db.close()
return {"event": response}
@app.get("/get_test")
def test_get_db(order_id: uuid.UUID):
test = QueryMessage(
order_id = str(order_id),
type = "CommandGetOrder",
payload = ""
)
dispatcher = Dispatcher()
dispatcher.publish_message(test, "order-queries")
return {"events": "Request sent"}
if __name__ == "__main__":
uvicorn.run(app, host="localhost", port=1250)