Skip to content

Commit 75afb1b

Browse files
authored
feat: rabbitmq migration (#608)
* feat: rabbitmq migration * fix tests * fix tests * fix tests * fix tests * Fix import processing * fix tests * fix test
1 parent 1a48f97 commit 75afb1b

File tree

35 files changed

+846
-310
lines changed

35 files changed

+846
-310
lines changed

apps/fyle/queue.py

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import logging
22

3-
from django_q.tasks import async_task
4-
from fyle_accounting_library.fyle_platform.enums import ExpenseImportSourceEnum, RoutingKeyEnum, WebhookCallbackActionEnum
5-
from fyle_accounting_library.rabbitmq.connector import RabbitMQConnection
6-
from fyle_accounting_library.rabbitmq.data_class import RabbitMQData
3+
from fyle_accounting_library.fyle_platform.enums import ExpenseImportSourceEnum, WebhookCallbackActionEnum
74

85
from apps.fyle.helpers import assert_valid_request
96
from apps.workspaces.models import FeatureConfig
107
from fyle_integrations_imports.modules.webhook_attributes import WebhookAttributeProcessor
8+
from workers.helpers import RoutingKeyEnum, WorkerActionEnum, publish_to_rabbitmq
119

1210
logger = logging.getLogger(__name__)
1311
logger.level = logging.INFO
@@ -27,47 +25,78 @@ def handle_webhook_callback(body: dict, workspace_id: int) -> None:
2725
if data and data.get('org_id'):
2826
assert_valid_request(workspace_id=workspace_id, fyle_org_id=data['org_id'])
2927

30-
rabbitmq = RabbitMQConnection.get_instance('xero_exchange')
3128
if action in ('ADMIN_APPROVED', 'APPROVED', 'STATE_CHANGE_PAYMENT_PROCESSING', 'PAID') and data:
3229
report_id = data['id']
3330
org_id = data['org_id']
3431
state = data['state']
3532
payload = {
33+
'workspace_id': workspace_id,
34+
'action': WorkerActionEnum.EXPENSE_STATE_CHANGE.value,
3635
'data': {
3736
'report_id': report_id,
3837
'org_id': org_id,
3938
'is_state_change_event': True,
4039
'report_state': state,
4140
'imported_from': ExpenseImportSourceEnum.WEBHOOK
42-
},
43-
'workspace_id': workspace_id
41+
}
4442
}
45-
rabbitmq_data = RabbitMQData(
46-
new=payload
47-
)
48-
rabbitmq.publish(RoutingKeyEnum.EXPORT, rabbitmq_data)
43+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.EXPORT_P1.value)
4944

5045
if action == 'ACCOUNTING_EXPORT_INITIATED' and data:
5146
report_id = data['id']
5247
org_id = data['org_id']
53-
async_task('apps.fyle.tasks.import_and_export_expenses', report_id, org_id, False, None, ExpenseImportSourceEnum.DIRECT_EXPORT)
48+
payload = {
49+
'workspace_id': workspace_id,
50+
'action': WorkerActionEnum.DIRECT_EXPORT.value,
51+
'data': {
52+
'report_id': report_id,
53+
'org_id': org_id,
54+
'is_state_change_event': False,
55+
'report_state': None,
56+
'imported_from': ExpenseImportSourceEnum.DIRECT_EXPORT
57+
}
58+
}
59+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.EXPORT_P0.value)
5460

5561
elif action == 'UPDATED_AFTER_APPROVAL' and data and resource == 'EXPENSE':
5662
org_id = data['org_id']
5763
logger.info("| Updating non-exported expenses through webhook | Content: {{WORKSPACE_ID: {} Payload: {}}}".format(workspace_id, data))
58-
async_task('apps.fyle.tasks.update_non_exported_expenses', data)
64+
payload = {
65+
'workspace_id': workspace_id,
66+
'action': WorkerActionEnum.EXPENSE_UPDATED_AFTER_APPROVAL.value,
67+
'data': {
68+
'data': data
69+
}
70+
}
71+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.UTILITY.value)
5972

6073
elif action in ('EJECTED_FROM_REPORT', 'ADDED_TO_REPORT') and data and resource == 'EXPENSE':
6174
org_id = data['org_id']
6275
expense_id = data['id']
6376
logger.info("| Handling expense report change | Content: {{WORKSPACE_ID: {} EXPENSE_ID: {} ACTION: {} Payload: {}}}".format(workspace_id, expense_id, action, data))
64-
async_task('apps.fyle.tasks.handle_expense_report_change', data, action)
77+
payload = {
78+
'workspace_id': workspace_id,
79+
'action': WorkerActionEnum.EXPENSE_ADDED_EJECTED_FROM_REPORT.value,
80+
'data': {
81+
'expense_data': data,
82+
'action_type': action
83+
}
84+
}
85+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.UTILITY.value)
6586

6687
elif (
6788
action == WebhookCallbackActionEnum.UPDATED.value
6889
and resource == 'ORG_SETTING'
6990
):
70-
async_task('apps.fyle.tasks.handle_org_setting_updated', workspace_id, data)
91+
payload = {
92+
'workspace_id': workspace_id,
93+
'action': WorkerActionEnum.HANDLE_ORG_SETTING_UPDATED.value,
94+
'data': {
95+
'workspace_id': workspace_id,
96+
'org_settings': data
97+
}
98+
}
99+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.UTILITY.value)
71100

72101
elif action in (WebhookCallbackActionEnum.CREATED, WebhookCallbackActionEnum.UPDATED, WebhookCallbackActionEnum.DELETED):
73102
try:

apps/fyle/signals.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
import logging
2-
from django.dispatch import receiver
3-
from django.db.models.signals import pre_save
4-
5-
from django_q.tasks import async_task
62

7-
from fyle_accounting_library.fyle_platform.enums import FundSourceEnum, ExpenseImportSourceEnum
3+
from django.db.models.signals import pre_save
4+
from django.dispatch import receiver
5+
from fyle_accounting_library.fyle_platform.enums import ExpenseImportSourceEnum, FundSourceEnum
86

97
from apps.fyle.enums import ExpenseStateEnum
108
from apps.fyle.models import ExpenseGroupSettings
119
from apps.workspaces.models import WorkspaceGeneralSettings
10+
from workers.helpers import RoutingKeyEnum, WorkerActionEnum, publish_to_rabbitmq
1211

1312
logger = logging.getLogger(__name__)
1413
logger.setLevel(logging.INFO)
@@ -29,8 +28,28 @@ def run_pre_save_expense_group_setting_triggers(sender, instance: ExpenseGroupSe
2928
# TODO: move these async_tasks to maintenance worker later
3029
if configuration.reimbursable_expenses_object and existing_expense_group_setting.reimbursable_expense_state != instance.reimbursable_expense_state and existing_expense_group_setting.reimbursable_expense_state == ExpenseStateEnum.PAID and instance.reimbursable_expense_state == ExpenseStateEnum.PAYMENT_PROCESSING:
3130
logger.info(f'Reimbursable expense state changed from {existing_expense_group_setting.reimbursable_expense_state} to {instance.reimbursable_expense_state} for workspace {instance.workspace_id}, so pulling the data from Fyle')
32-
async_task('apps.fyle.tasks.create_expense_groups', workspace_id=instance.workspace_id, task_log=None, fund_source=[FundSourceEnum.PERSONAL], imported_from=ExpenseImportSourceEnum.CONFIGURATION_UPDATE)
31+
payload = {
32+
'workspace_id': instance.workspace_id,
33+
'action': WorkerActionEnum.CREATE_EXPENSE_GROUP.value,
34+
'data': {
35+
'workspace_id': instance.workspace_id,
36+
'task_log': None,
37+
'fund_source': [FundSourceEnum.PERSONAL],
38+
'imported_from': ExpenseImportSourceEnum.CONFIGURATION_UPDATE
39+
}
40+
}
41+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.EXPORT_P1.value)
3342

3443
if configuration.corporate_credit_card_expenses_object and existing_expense_group_setting.ccc_expense_state != instance.ccc_expense_state and existing_expense_group_setting.ccc_expense_state == ExpenseStateEnum.PAID and instance.ccc_expense_state == ExpenseStateEnum.APPROVED:
3544
logger.info(f'Corporate credit card expense state changed from {existing_expense_group_setting.ccc_expense_state} to {instance.ccc_expense_state} for workspace {instance.workspace_id}, so pulling the data from Fyle')
36-
async_task('apps.fyle.tasks.create_expense_groups', workspace_id=instance.workspace_id, task_log=None, fund_source=[FundSourceEnum.CCC], imported_from=ExpenseImportSourceEnum.CONFIGURATION_UPDATE)
45+
payload = {
46+
'workspace_id': instance.workspace_id,
47+
'action': WorkerActionEnum.CREATE_EXPENSE_GROUP.value,
48+
'data': {
49+
'workspace_id': instance.workspace_id,
50+
'task_log': None,
51+
'fund_source': [FundSourceEnum.CCC],
52+
'imported_from': ExpenseImportSourceEnum.CONFIGURATION_UPDATE
53+
}
54+
}
55+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.EXPORT_P1.value)

apps/fyle/views.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22

3+
from django.core.cache import cache
34
from django_filters.rest_framework import DjangoFilterBackend
4-
from django_q.tasks import async_task
55
from fyle_accounting_library.fyle_platform.enums import ExpenseImportSourceEnum
66
from rest_framework import generics
77
from rest_framework.response import Response
@@ -16,6 +16,7 @@
1616
from apps.fyle.tasks import create_expense_groups, get_task_log_and_fund_source
1717
from apps.workspaces.models import FeatureConfig, FyleCredential, Workspace
1818
from fyle_xero_api.utils import LookupFieldMixin
19+
from workers.helpers import RoutingKeyEnum, WorkerActionEnum, publish_to_rabbitmq
1920

2021
logger = logging.getLogger(__name__)
2122
logger.level = logging.INFO
@@ -89,7 +90,19 @@ def post(self, request, *args, **kwargs):
8990
logger.info(f"Skipping sync_dimensions for workspace {kwargs['workspace_id']} as webhook sync is enabled")
9091
return Response(status=status.HTTP_200_OK)
9192

92-
async_task('apps.fyle.tasks.check_interval_and_sync_dimension', kwargs['workspace_id'])
93+
cache_key = f"sync_fyle_dimensions_{kwargs['workspace_id']}"
94+
is_cached = cache.get(cache_key)
95+
96+
if not is_cached:
97+
cache.set(cache_key, True, 300)
98+
payload = {
99+
'workspace_id': kwargs['workspace_id'],
100+
'action': WorkerActionEnum.CHECK_INTERVAL_AND_SYNC_FYLE_DIMENSION.value,
101+
'data': {
102+
'workspace_id': kwargs['workspace_id']
103+
}
104+
}
105+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.IMPORT.value)
93106

94107
return Response(status=status.HTTP_200_OK)
95108

@@ -109,7 +122,19 @@ def post(self, request, *args, **kwargs):
109122
Workspace.objects.get(id=kwargs['workspace_id'])
110123
FyleCredential.objects.get(workspace_id=kwargs['workspace_id'])
111124

112-
async_task('apps.fyle.tasks.sync_dimensions', kwargs['workspace_id'])
125+
cache_key = f"sync_fyle_dimensions_{kwargs['workspace_id']}"
126+
is_cached = cache.get(cache_key)
127+
128+
if not is_cached:
129+
cache.set(cache_key, True, 300)
130+
payload = {
131+
'workspace_id': kwargs['workspace_id'],
132+
'action': WorkerActionEnum.HANDLE_FYLE_REFRESH_DIMENSION.value,
133+
'data': {
134+
'workspace_id': kwargs['workspace_id']
135+
}
136+
}
137+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.IMPORT.value)
113138

114139
return Response(status=status.HTTP_200_OK)
115140

apps/mappings/queue.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22

33
from django_q.models import Schedule
44
from fyle_accounting_mappings.models import MappingSetting
5+
56
from apps.fyle.enums import FyleAttributeEnum
67
from apps.mappings.constants import SYNC_METHODS
8+
from apps.workspaces.models import WorkspaceGeneralSettings, XeroCredentials
79
from fyle_integrations_imports.dataclasses import TaskSetting
810
from fyle_integrations_imports.queues import chain_import_fields_to_fyle
9-
from apps.workspaces.models import WorkspaceGeneralSettings, XeroCredentials
11+
from workers.helpers import RoutingKeyEnum, WorkerActionEnum, publish_to_rabbitmq
1012

1113

1214
def schedule_auto_map_employees(employee_mapping_preference: str, workspace_id: str):
@@ -31,7 +33,23 @@ def schedule_auto_map_employees(employee_mapping_preference: str, workspace_id:
3133
schedule.delete()
3234

3335

34-
def construct_tasks_and_chain_import_fields_to_fyle(workspace_id: int):
36+
def construct_tasks_and_chain_import_fields_to_fyle(workspace_id: int) -> None:
37+
"""
38+
Initiate the Import of dimensions to Fyle
39+
:param workspace_id: Workspace Id
40+
:return: None
41+
"""
42+
payload = {
43+
'workspace_id': workspace_id,
44+
'action': WorkerActionEnum.IMPORT_DIMENSIONS_TO_FYLE.value,
45+
'data': {
46+
'workspace_id': workspace_id
47+
}
48+
}
49+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.IMPORT.value)
50+
51+
52+
def initiate_import_to_fyle(workspace_id: int):
3553
"""
3654
Construct tasks and chain import fields to fyle
3755
:param workspace_id: Workspace Id
@@ -102,4 +120,4 @@ def construct_tasks_and_chain_import_fields_to_fyle(workspace_id: int):
102120
}
103121
)
104122

105-
chain_import_fields_to_fyle(workspace_id, task_settings)
123+
chain_import_fields_to_fyle(workspace_id, task_settings, run_in_rabbitmq_worker=True)

apps/mappings/schedules.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from datetime import datetime
2-
from typing import List, Dict
3-
from apps.workspaces.models import WorkspaceGeneralSettings
2+
from typing import Dict, List
3+
44
from django_q.models import Schedule
55
from fyle_accounting_mappings.models import MappingSetting
66

7+
from apps.workspaces.models import WorkspaceGeneralSettings
8+
79

810
def new_schedule_or_delete_fyle_import_tasks(
911
workspace_general_settings_instance: WorkspaceGeneralSettings,
@@ -32,8 +34,7 @@ def new_schedule_or_delete_fyle_import_tasks(
3234
defaults={
3335
'schedule_type':Schedule.MINUTES,
3436
'minutes': 24 * 60,
35-
'next_run': datetime.now(),
36-
'cluster': 'import'
37+
'next_run': datetime.now()
3738
}
3839
)
3940
else:

apps/mappings/signals.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
from django.db.models.signals import post_save, pre_save
99
from django.dispatch import receiver
10-
from django_q.tasks import async_task
1110
from fyle.platform.exceptions import WrongParamsError
1211
from fyle_accounting_mappings.models import Mapping, MappingSetting
1312
from fyle_integrations_platform_connector import PlatformConnector
@@ -23,6 +22,7 @@
2322
from apps.xero.utils import XeroConnector
2423
from fyle_integrations_imports.models import ImportLog
2524
from fyle_integrations_imports.modules.expense_custom_fields import ExpenseCustomField
25+
from workers.helpers import RoutingKeyEnum, WorkerActionEnum, publish_to_rabbitmq
2626

2727
logger = logging.getLogger(__name__)
2828
logger.level = logging.INFO
@@ -167,5 +167,20 @@ def run_post_tenant_mapping_trigger(sender, instance: TenantMapping, **kwargs):
167167
:param instance: Row Instance of Sender Class
168168
:return: None
169169
"""
170-
async_task("apps.xero.tasks.create_missing_currency", int(instance.workspace_id), q_options={'cluster': 'import'})
171-
async_task("apps.xero.tasks.update_xero_short_code", int(instance.workspace_id), q_options={'cluster': 'import'})
170+
payload = {
171+
'workspace_id': int(instance.workspace_id),
172+
'action': WorkerActionEnum.CREATE_MISSING_CURRENCY.value,
173+
'data': {
174+
'workspace_id': int(instance.workspace_id)
175+
}
176+
}
177+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.IMPORT.value)
178+
179+
payload = {
180+
'workspace_id': int(instance.workspace_id),
181+
'action': WorkerActionEnum.UPDATE_XERO_SHORT_CODE.value,
182+
'data': {
183+
'workspace_id': int(instance.workspace_id)
184+
}
185+
}
186+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.IMPORT.value)

apps/mappings/views.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import logging
22

3-
from django_q.tasks import Chain
43
from rest_framework import generics
54
from rest_framework.response import Response
65
from rest_framework.views import status
@@ -11,6 +10,7 @@
1110
from apps.mappings.serializers import TenantMappingSerializer
1211
from apps.workspaces.models import WorkspaceGeneralSettings
1312
from fyle_xero_api.utils import assert_valid
13+
from workers.helpers import RoutingKeyEnum, WorkerActionEnum, publish_to_rabbitmq
1414

1515
logger = logging.getLogger(__name__)
1616

@@ -60,8 +60,6 @@ def post(self, request, *args, **kwargs):
6060
workspace_id=workspace_id
6161
)
6262

63-
chain = Chain()
64-
6563
if not general_settings.auto_map_employees:
6664
return Response(
6765
data={
@@ -70,10 +68,13 @@ def post(self, request, *args, **kwargs):
7068
status=status.HTTP_400_BAD_REQUEST,
7169
)
7270

73-
chain.append("apps.mappings.tasks.async_auto_map_employees", workspace_id, q_options={
74-
'cluster': 'import'
75-
})
76-
77-
chain.run()
71+
payload = {
72+
'workspace_id': workspace_id,
73+
'action': WorkerActionEnum.AUTO_MAP_EMPLOYEES.value,
74+
'data': {
75+
'workspace_id': workspace_id
76+
}
77+
}
78+
publish_to_rabbitmq(payload=payload, routing_key=RoutingKeyEnum.IMPORT.value)
7879

7980
return Response(data={}, status=status.HTTP_200_OK)

0 commit comments

Comments
 (0)