Skip to content

Commit e2b9a0a

Browse files
authored
fix: schedule processing in worker (#611)
* fix: schedule processing in worker * fix-comment
1 parent 9565094 commit e2b9a0a

File tree

7 files changed

+275
-36
lines changed

7 files changed

+275
-36
lines changed

apps/mappings/tasks.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
2-
from typing import List
32
from datetime import datetime, timezone
3+
from typing import List
44

55
from fyle_accounting_mappings.models import Mapping
66
from fyle_integrations_platform_connector import PlatformConnector
@@ -10,6 +10,7 @@
1010
from apps.tasks.models import Error
1111
from apps.workspaces.models import FyleCredential, WorkspaceGeneralSettings, XeroCredentials
1212
from apps.xero.utils import XeroConnector
13+
from workers.helpers import RoutingKeyEnum, WorkerActionEnum, publish_to_rabbitmq
1314

1415
logger = logging.getLogger(__name__)
1516
logger.level = logging.INFO
@@ -43,8 +44,29 @@ def resolve_expense_attribute_errors(
4344
)
4445

4546

47+
def async_auto_map_employees(workspace_id: int) -> None:
48+
"""
49+
Schedule async auto map employees via RabbitMQ
50+
:param workspace_id: Workspace Id
51+
:return: None
52+
"""
53+
payload = {
54+
'workspace_id': workspace_id,
55+
'action': WorkerActionEnum.AUTO_MAP_EMPLOYEES.value,
56+
'data': {
57+
'workspace_id': workspace_id
58+
}
59+
}
60+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.IMPORT.value)
61+
62+
4663
@handle_import_exceptions(task_name="async auto map employees")
47-
def async_auto_map_employees(workspace_id: int):
64+
def trigger_async_auto_map_employees(workspace_id: int):
65+
"""
66+
Trigger async auto map employees
67+
:param workspace_id: Workspace Id
68+
:return: None
69+
"""
4870
general_settings = WorkspaceGeneralSettings.objects.get(workspace_id=workspace_id)
4971
employee_mapping_preference = general_settings.auto_map_employees
5072
fyle_credentials = FyleCredential.objects.get(workspace_id=workspace_id)

apps/workspaces/tasks.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,25 @@
2424
logger.level = logging.INFO
2525

2626

27-
def run_sync_schedule(workspace_id):
27+
def run_sync_schedule(workspace_id) -> None:
2828
"""
29-
Run schedule
30-
:param user: user email
29+
Schedule run sync via RabbitMQ
30+
:param workspace_id: workspace id
31+
:return: None
32+
"""
33+
payload = {
34+
'workspace_id': workspace_id,
35+
'action': WorkerActionEnum.RUN_SYNC_SCHEDULE.value,
36+
'data': {
37+
'workspace_id': workspace_id
38+
}
39+
}
40+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.EXPORT_P1.value)
41+
42+
43+
def trigger_run_sync_schedule(workspace_id):
44+
"""
45+
Trigger run sync schedule
3146
:param workspace_id: workspace id
3247
:return: None
3348
"""
@@ -94,7 +109,28 @@ def async_update_fyle_credentials(fyle_org_id: str, refresh_token: str):
94109
fyle_credentials.save()
95110

96111

97-
def run_email_notification(workspace_id):
112+
def run_email_notification(workspace_id) -> None:
113+
"""
114+
Schedule run email notification via RabbitMQ
115+
:param workspace_id: workspace id
116+
:return: None
117+
"""
118+
payload = {
119+
'workspace_id': workspace_id,
120+
'action': WorkerActionEnum.RUN_EMAIL_NOTIFICATION.value,
121+
'data': {
122+
'workspace_id': workspace_id
123+
}
124+
}
125+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.UTILITY.value)
126+
127+
128+
def trigger_run_email_notification(workspace_id):
129+
"""
130+
Trigger run email notification
131+
:param workspace_id: workspace id
132+
:return: None
133+
"""
98134
ws_schedule = WorkspaceSchedule.objects.get(workspace_id=workspace_id, enabled=True)
99135
task_logs_count = get_failed_task_logs_count(workspace_id)
100136
workspace = Workspace.objects.get(id=workspace_id)

apps/xero/tasks.py

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from apps.xero.utils import XeroConnector
2727
from fyle_xero_api.exceptions import BulkError
2828
from fyle_xero_api.logging_middleware import get_caller_info, get_logger
29+
from workers.helpers import RoutingKeyEnum, WorkerActionEnum, publish_to_rabbitmq
2930

3031
logger = logging.getLogger(__name__)
3132
logger.level = logging.INFO
@@ -772,7 +773,28 @@ def validate_for_skipping_payment(bill: Bill, workspace_id: int):
772773
return False
773774

774775

775-
def create_payment(workspace_id):
776+
def create_payment(workspace_id) -> None:
777+
"""
778+
Schedule create payment via RabbitMQ
779+
:param workspace_id: workspace id
780+
:return: None
781+
"""
782+
payload = {
783+
'workspace_id': workspace_id,
784+
'action': WorkerActionEnum.CREATE_PAYMENT.value,
785+
'data': {
786+
'workspace_id': workspace_id
787+
}
788+
}
789+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.EXPORT_P1.value)
790+
791+
792+
def trigger_create_payment(workspace_id):
793+
"""
794+
Trigger create payment
795+
:param workspace_id: workspace id
796+
:return: None
797+
"""
776798
fyle_credentials = FyleCredential.objects.get(workspace_id=workspace_id)
777799

778800
platform = PlatformConnector(fyle_credentials)
@@ -823,7 +845,28 @@ def get_all_xero_bill_ids(xero_objects):
823845
return xero_objects_details
824846

825847

826-
def check_xero_object_status(workspace_id):
848+
def check_xero_object_status(workspace_id) -> None:
849+
"""
850+
Schedule check xero object status via RabbitMQ
851+
:param workspace_id: workspace id
852+
:return: None
853+
"""
854+
payload = {
855+
'workspace_id': workspace_id,
856+
'action': WorkerActionEnum.CHECK_XERO_OBJECT_STATUS.value,
857+
'data': {
858+
'workspace_id': workspace_id
859+
}
860+
}
861+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.EXPORT_P1.value)
862+
863+
864+
def trigger_check_xero_object_status(workspace_id):
865+
"""
866+
Trigger check xero object status
867+
:param workspace_id: workspace id
868+
:return: None
869+
"""
827870
try:
828871
xero_credentials = XeroCredentials.get_active_xero_credentials(workspace_id)
829872

@@ -868,7 +911,28 @@ def check_xero_object_status(workspace_id):
868911
invalidate_xero_credentials(workspace_id)
869912

870913

871-
def process_reimbursements(workspace_id):
914+
def process_reimbursements(workspace_id) -> None:
915+
"""
916+
Schedule process reimbursements via RabbitMQ
917+
:param workspace_id: workspace id
918+
:return: None
919+
"""
920+
payload = {
921+
'workspace_id': workspace_id,
922+
'action': WorkerActionEnum.PROCESS_REIMBURSEMENTS.value,
923+
'data': {
924+
'workspace_id': workspace_id
925+
}
926+
}
927+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.EXPORT_P1.value)
928+
929+
930+
def trigger_process_reimbursements(workspace_id):
931+
"""
932+
Trigger process reimbursements
933+
:param workspace_id: workspace id
934+
:return: None
935+
"""
872936
fyle_credentials = FyleCredential.objects.get(workspace_id=workspace_id)
873937
try:
874938
platform = PlatformConnector(fyle_credentials=fyle_credentials)

tests/test_mappings/test_tasks.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,30 @@
88

99
from apps.fyle.models import ExpenseGroup
1010
from apps.mappings.queue import schedule_auto_map_employees
11-
from apps.mappings.tasks import async_auto_map_employees, resolve_expense_attribute_errors
11+
from apps.mappings.tasks import async_auto_map_employees, resolve_expense_attribute_errors, trigger_async_auto_map_employees
1212
from apps.tasks.models import Error
1313
from apps.workspaces.models import WorkspaceGeneralSettings, XeroCredentials
1414
from tests.test_fyle.fixtures import data as fyle_data
1515
from tests.test_xero.fixtures import data as xero_data
1616

1717

18+
def test_async_auto_map_employees_publishes_to_rabbitmq(mocker, db):
19+
"""
20+
Test that async_auto_map_employees publishes to RabbitMQ correctly
21+
"""
22+
workspace_id = 1
23+
mock_publish = mocker.patch("apps.mappings.tasks.publish_to_rabbitmq")
24+
25+
async_auto_map_employees(workspace_id)
26+
27+
mock_publish.assert_called_once()
28+
call_args = mock_publish.call_args
29+
payload = call_args[1]['payload']
30+
assert payload['workspace_id'] == workspace_id
31+
assert payload['action'] == 'IMPORT.AUTO_MAP_EMPLOYEES'
32+
assert payload['data']['workspace_id'] == workspace_id
33+
34+
1835
def test_async_auto_map_employees(mocker, db):
1936
workspace_id = 1
2037

@@ -28,7 +45,7 @@ def test_async_auto_map_employees(mocker, db):
2845
return_value=fyle_data["get_all_employees"],
2946
)
3047

31-
async_auto_map_employees(workspace_id)
48+
trigger_async_auto_map_employees(workspace_id)
3249
employee_mappings = EmployeeMapping.objects.filter(
3350
workspace_id=workspace_id
3451
).count()
@@ -38,7 +55,7 @@ def test_async_auto_map_employees(mocker, db):
3855
general_settings.employee_field_mapping = "VENDOR"
3956
general_settings.save()
4057

41-
async_auto_map_employees(workspace_id)
58+
trigger_async_auto_map_employees(workspace_id)
4259

4360
employee_mappings = EmployeeMapping.objects.filter(
4461
workspace_id=workspace_id
@@ -49,20 +66,20 @@ def test_async_auto_map_employees(mocker, db):
4966
mock_call.side_effect = FyleInvalidTokenError(
5067
msg="Invalid Token for Fyle", response="Invalid Token for Fyle"
5168
)
52-
async_auto_map_employees(workspace_id=workspace_id)
69+
trigger_async_auto_map_employees(workspace_id=workspace_id)
5370

5471
mock_call.side_effect = UnsuccessfulAuthentication(msg="Auth error")
55-
async_auto_map_employees(workspace_id=workspace_id)
72+
trigger_async_auto_map_employees(workspace_id=workspace_id)
5673

5774
mock_call.side_effect = InternalServerError(
5875
msg="Internal server error while importing to Fyle"
5976
)
60-
async_auto_map_employees(workspace_id=workspace_id)
77+
trigger_async_auto_map_employees(workspace_id=workspace_id)
6178

6279
qbo_credentials = XeroCredentials.objects.get(workspace_id=workspace_id)
6380
qbo_credentials.delete()
6481

65-
async_auto_map_employees(workspace_id)
82+
trigger_async_auto_map_employees(workspace_id)
6683

6784
employee_mappings = EmployeeMapping.objects.filter(
6885
workspace_id=workspace_id

0 commit comments

Comments
 (0)