Skip to content

Commit b6dde28

Browse files
authored
fix: dependent timeout (#894)
* fix: dependent timeout * fix * fix: dependent timeout
1 parent c6eb7da commit b6dde28

File tree

1 file changed

+99
-74
lines changed

1 file changed

+99
-74
lines changed

apps/sage_intacct/dependent_fields.py

Lines changed: 99 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,10 @@
88
from fyle.platform.exceptions import InvalidTokenError as FyleInvalidTokenError
99
from fyle_accounting_mappings.models import ExpenseAttribute
1010
from fyle_integrations_platform_connector import PlatformConnector
11+
from intacctsdk.exceptions import BadRequestError as IntacctRESTBadRequestError
12+
from intacctsdk.exceptions import InternalServerError as IntacctRESTInternalServerError
13+
from intacctsdk.exceptions import InvalidTokenError as IntacctRESTInvalidTokenError
1114
from sageintacctsdk.exceptions import InvalidTokenError, NoPrivilegeError, SageIntacctSDKError
12-
from intacctsdk.exceptions import (
13-
BadRequestError as IntacctRESTBadRequestError,
14-
InvalidTokenError as IntacctRESTInvalidTokenError,
15-
InternalServerError as IntacctRESTInternalServerError
16-
)
1715

1816
from apps.fyle.helpers import connect_to_platform
1917
from apps.fyle.models import DependentFieldSetting
@@ -97,7 +95,6 @@ def post_dependent_cost_code(import_log: ImportLog, dependent_field_setting: Dep
9795
total_batches = 0
9896
processed_batches = 0
9997
is_errored = False
100-
projects_batch = []
10198

10299
"""
103100
Structure of the query output:
@@ -137,7 +134,7 @@ def post_dependent_cost_code(import_log: ImportLog, dependent_field_setting: Dep
137134
project_id;
138135
"""
139136
if not dependent_field_setting.is_cost_type_import_enabled:
140-
projects_batch = (
137+
projects_queryset = (
141138
CostCode.objects.filter(**filters)
142139
.values('project_name', 'project_id')
143140
.annotate(
@@ -150,9 +147,10 @@ def post_dependent_cost_code(import_log: ImportLog, dependent_field_setting: Dep
150147
distinct=True
151148
)
152149
)
150+
.order_by('project_id')
153151
)
154152
else:
155-
projects_batch = (
153+
projects_queryset = (
156154
CostType.objects.filter(**filters)
157155
.values('project_name', 'project_id')
158156
.annotate(
@@ -166,50 +164,65 @@ def post_dependent_cost_code(import_log: ImportLog, dependent_field_setting: Dep
166164
distinct=True
167165
)
168166
)
167+
.order_by('project_id')
169168
)
170169

171-
existing_projects_in_fyle = set(
172-
ExpenseAttribute.objects.filter(
173-
workspace_id=dependent_field_setting.workspace_id,
174-
attribute_type='PROJECT',
175-
value__in=[
176-
prepend_code_to_name(use_job_code_in_naming, project['project_name'], project['project_id'])
177-
for project in projects_batch
178-
],
179-
active=True
180-
).values_list('value', flat=True)
181-
)
170+
# Process projects in batches to avoid memory issues and statement timeouts
171+
BATCH_SIZE = 5000
172+
offset = 0
182173

183-
logger.info(f'Posting Cost Codes | WORKSPACE_ID: {dependent_field_setting.workspace_id} | Existing Projects in Fyle COUNT: {len(existing_projects_in_fyle)}')
174+
while True:
175+
projects_batch = list(projects_queryset[offset:offset + BATCH_SIZE])
184176

185-
for project in projects_batch:
186-
payload = []
187-
cost_code_names = set()
188-
project_name = prepend_code_to_name(use_job_code_in_naming, project['project_name'], project['project_id'])
177+
if not projects_batch:
178+
break
189179

190-
if project_name in existing_projects_in_fyle:
191-
for cost_code in project['cost_codes']:
192-
cost_code_name = prepend_code_to_name(prepend_code_in_name=use_cost_code_in_naming, value=cost_code['cost_code_name'], code=cost_code['cost_code_code'])
193-
payload.append({
194-
'parent_expense_field_id': dependent_field_setting.project_field_id,
195-
'parent_expense_field_value': project_name,
196-
'expense_field_id': dependent_field_setting.cost_code_field_id,
197-
'expense_field_value': cost_code_name,
198-
'is_enabled': is_enabled
199-
})
200-
cost_code_names.add(cost_code['cost_code_name'])
180+
project_names_batch = [
181+
prepend_code_to_name(use_job_code_in_naming, project['project_name'], project['project_id'])
182+
for project in projects_batch
183+
]
201184

202-
if payload:
203-
sleep(0.2)
204-
try:
205-
total_batches += 1
206-
payload_set = remove_duplicate_payload_entries(payload)
207-
platform.dependent_fields.bulk_post_dependent_expense_field_values(payload_set)
208-
posted_cost_codes.update(cost_code_names)
209-
processed_batches += 1
210-
except Exception as exception:
211-
is_errored = True
212-
logger.error(f'Exception while posting dependent cost code | Error: {exception} | Payload: {payload}')
185+
existing_projects_in_fyle = set(
186+
ExpenseAttribute.objects.filter(
187+
workspace_id=dependent_field_setting.workspace_id,
188+
attribute_type='PROJECT',
189+
value__in=project_names_batch,
190+
active=True
191+
).values_list('value', flat=True)
192+
)
193+
194+
logger.info(f'Posting Cost Codes | WORKSPACE_ID: {dependent_field_setting.workspace_id} | Batch offset: {offset} | Existing Projects in Fyle COUNT: {len(existing_projects_in_fyle)}')
195+
196+
for project in projects_batch:
197+
payload = []
198+
cost_code_names = set()
199+
project_name = prepend_code_to_name(use_job_code_in_naming, project['project_name'], project['project_id'])
200+
201+
if project_name in existing_projects_in_fyle:
202+
for cost_code in project['cost_codes']:
203+
cost_code_name = prepend_code_to_name(prepend_code_in_name=use_cost_code_in_naming, value=cost_code['cost_code_name'], code=cost_code['cost_code_code'])
204+
payload.append({
205+
'parent_expense_field_id': dependent_field_setting.project_field_id,
206+
'parent_expense_field_value': project_name,
207+
'expense_field_id': dependent_field_setting.cost_code_field_id,
208+
'expense_field_value': cost_code_name,
209+
'is_enabled': is_enabled
210+
})
211+
cost_code_names.add(cost_code['cost_code_name'])
212+
213+
if payload:
214+
sleep(0.2)
215+
try:
216+
total_batches += 1
217+
payload_set = remove_duplicate_payload_entries(payload)
218+
platform.dependent_fields.bulk_post_dependent_expense_field_values(payload_set)
219+
posted_cost_codes.update(cost_code_names)
220+
processed_batches += 1
221+
except Exception as exception:
222+
is_errored = True
223+
logger.error(f'Exception while posting dependent cost code | Error: {exception} | Payload: {payload}')
224+
225+
offset += BATCH_SIZE
213226

214227
if is_errored or import_log.status != 'IN_PROGRESS':
215228
import_log.status = 'PARTIALLY_FAILED'
@@ -245,7 +258,7 @@ def post_dependent_cost_type(import_log: ImportLog, dependent_field_setting: Dep
245258
processed_batches = 0
246259
is_errored = False
247260

248-
cost_types_batch = (
261+
cost_types_queryset = (
249262
CostType.objects.filter(**filters)
250263
.values('task_name', 'task_id')
251264
.annotate(
@@ -259,35 +272,47 @@ def post_dependent_cost_type(import_log: ImportLog, dependent_field_setting: Dep
259272
distinct=True
260273
)
261274
)
275+
.order_by('task_id')
262276
)
263277

264-
logger.info(f'Posting Cost Types | WORKSPACE_ID: {dependent_field_setting.workspace_id} | Existing Cost Code in Fyle COUNT: {len(cost_types_batch)}')
265-
266-
for cost_types in cost_types_batch:
267-
payload = []
268-
cost_code_name = prepend_code_to_name(use_cost_code_in_naming, cost_types['task_name'], cost_types['task_id'])
269-
270-
for cost_type in cost_types['cost_types']:
271-
cost_type_name = prepend_code_to_name(use_cost_type_code_in_naming, cost_type['cost_type_name'], cost_type['cost_type_code'])
272-
payload.append({
273-
'parent_expense_field_id': dependent_field_setting.cost_code_field_id,
274-
'parent_expense_field_value': cost_code_name,
275-
'expense_field_id': dependent_field_setting.cost_type_field_id,
276-
'expense_field_value': cost_type_name,
277-
'is_enabled': True
278-
})
279-
280-
if payload:
281-
sleep(0.2)
282-
try:
283-
total_batches += 1
284-
payload_set = remove_duplicate_payload_entries(payload)
285-
platform.dependent_fields.bulk_post_dependent_expense_field_values(payload_set)
286-
CostType.objects.filter(task_name=cost_types['task_name'], task_id=cost_types['task_id'], workspace_id=dependent_field_setting.workspace_id).update(is_imported=True, updated_at=datetime.now(timezone.utc))
287-
processed_batches += 1
288-
except Exception as exception:
289-
is_errored = True
290-
logger.error(f'Exception while posting dependent cost type | Error: {exception} | Payload: {payload}')
278+
BATCH_SIZE = 5000
279+
offset = 0
280+
281+
while True:
282+
cost_types_batch = list(cost_types_queryset[offset:offset + BATCH_SIZE])
283+
284+
if not cost_types_batch:
285+
break
286+
287+
logger.info(f'Posting Cost Types | WORKSPACE_ID: {dependent_field_setting.workspace_id} | Batch offset: {offset} | Batch size: {len(cost_types_batch)}')
288+
289+
for cost_types in cost_types_batch:
290+
payload = []
291+
cost_code_name = prepend_code_to_name(use_cost_code_in_naming, cost_types['task_name'], cost_types['task_id'])
292+
293+
for cost_type in cost_types['cost_types']:
294+
cost_type_name = prepend_code_to_name(use_cost_type_code_in_naming, cost_type['cost_type_name'], cost_type['cost_type_code'])
295+
payload.append({
296+
'parent_expense_field_id': dependent_field_setting.cost_code_field_id,
297+
'parent_expense_field_value': cost_code_name,
298+
'expense_field_id': dependent_field_setting.cost_type_field_id,
299+
'expense_field_value': cost_type_name,
300+
'is_enabled': True
301+
})
302+
303+
if payload:
304+
sleep(0.2)
305+
try:
306+
total_batches += 1
307+
payload_set = remove_duplicate_payload_entries(payload)
308+
platform.dependent_fields.bulk_post_dependent_expense_field_values(payload_set)
309+
CostType.objects.filter(task_name=cost_types['task_name'], task_id=cost_types['task_id'], workspace_id=dependent_field_setting.workspace_id).update(is_imported=True, updated_at=datetime.now(timezone.utc))
310+
processed_batches += 1
311+
except Exception as exception:
312+
is_errored = True
313+
logger.error(f'Exception while posting dependent cost type | Error: {exception} | Payload: {payload}')
314+
315+
offset += BATCH_SIZE
291316

292317
if is_errored or import_log.status != 'IN_PROGRESS':
293318
import_log.status = 'PARTIALLY_FAILED'

0 commit comments

Comments
 (0)