|
1 | | -from http import HTTPStatus |
| 1 | +from domain.common_steps.create_product import after_steps, before_steps |
2 | 2 |
|
3 | | -from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent |
4 | | -from domain.core.cpm_product import CpmProduct |
5 | | -from domain.core.cpm_system_id import ProductId |
6 | | -from domain.core.product_team.v3 import ProductTeam |
7 | | -from domain.repository.cpm_product_repository.v3 import CpmProductRepository |
8 | | -from domain.repository.product_team_repository.v2 import ProductTeamRepository |
9 | | -from domain.request_models.v1 import ( |
10 | | - CreateCpmProductIncomingParams, |
11 | | - ProductTeamPathParams, |
12 | | -) |
13 | | -from domain.response.validation_errors import ( |
14 | | - InboundValidationError, |
15 | | - mark_json_decode_errors_as_inbound, |
16 | | - mark_validation_errors_as_inbound, |
17 | | -) |
18 | | -from event.step_chain import StepChain |
19 | | -from pydantic import ValidationError |
20 | | - |
21 | | - |
22 | | -@mark_validation_errors_as_inbound |
23 | | -def parse_path_params(data, cache) -> ProductTeamPathParams: |
24 | | - event = APIGatewayProxyEvent(data[StepChain.INIT]) |
25 | | - return ProductTeamPathParams(**event.path_parameters) |
26 | | - |
27 | | - |
28 | | -@mark_json_decode_errors_as_inbound |
29 | | -def parse_event_body(data, cache) -> dict: |
30 | | - event = APIGatewayProxyEvent(data[StepChain.INIT]) |
31 | | - return event.json_body if event.body else {} |
32 | | - |
33 | | - |
34 | | -def parse_incoming_cpm_product(data, cache) -> CreateCpmProductIncomingParams: |
35 | | - json_body = data[parse_event_body] |
36 | | - try: |
37 | | - incoming_product = CreateCpmProductIncomingParams(**json_body) |
38 | | - return incoming_product |
39 | | - except ValidationError as exc: |
40 | | - raise InboundValidationError(errors=exc.raw_errors, model=exc.model) |
41 | | - |
42 | | - |
43 | | -def read_product_team(data, cache) -> ProductTeam: |
44 | | - path_params: ProductTeamPathParams = data[parse_path_params] |
45 | | - |
46 | | - product_team_repo = ProductTeamRepository( |
47 | | - table_name=cache["DYNAMODB_TABLE"], dynamodb_client=cache["DYNAMODB_CLIENT"] |
48 | | - ) |
49 | | - return product_team_repo.read(id=path_params.product_team_id) |
50 | | - |
51 | | - |
52 | | -def generate_cpm_product_id(data, cache) -> str: |
53 | | - generated_product_id = ProductId.create() |
54 | | - return generated_product_id.id |
55 | | - |
56 | | - |
57 | | -@mark_validation_errors_as_inbound |
58 | | -def create_cpm_product(data, cache) -> CpmProduct: |
59 | | - incoming_product = data[parse_incoming_cpm_product] |
60 | | - product_team: ProductTeam = data[read_product_team] |
61 | | - product_id = data[generate_cpm_product_id] |
62 | | - product = product_team.create_cpm_product( |
63 | | - product_id=product_id, name=incoming_product.product_name |
64 | | - ) |
65 | | - return product |
66 | | - |
67 | | - |
68 | | -def save_cpm_product(data, cache) -> dict: |
69 | | - cpm_product = data[create_cpm_product] |
70 | | - cpm_product_repo = CpmProductRepository( |
71 | | - table_name=cache["DYNAMODB_TABLE"], dynamodb_client=cache["DYNAMODB_CLIENT"] |
72 | | - ) |
73 | | - return cpm_product_repo.write(entity=cpm_product) |
74 | | - |
75 | | - |
76 | | -def set_http_status(data, cache) -> tuple[HTTPStatus, str]: |
77 | | - product: CpmProduct = data[create_cpm_product] |
78 | | - return HTTPStatus.CREATED, str(product.id) |
79 | | - |
80 | | - |
81 | | -steps = [ |
82 | | - parse_path_params, |
83 | | - parse_event_body, |
84 | | - parse_incoming_cpm_product, |
85 | | - read_product_team, |
86 | | - generate_cpm_product_id, |
87 | | - create_cpm_product, |
88 | | - save_cpm_product, |
89 | | - set_http_status, |
90 | | -] |
| 3 | +steps = [*before_steps, *after_steps] |
0 commit comments