Skip to content

Commit 5194887

Browse files
committed
Delete unused code in jobs
1 parent ba294e0 commit 5194887

File tree

1 file changed

+1
-151
lines changed

1 file changed

+1
-151
lines changed

llmstack/jobs/jobs.py

Lines changed: 1 addition & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@
1010

1111
from llmstack.apps.apis import AppViewSet
1212
from llmstack.apps.models import App
13-
from llmstack.data.apis import DataSourceEntryViewSet
14-
from llmstack.data.models import DataSource
15-
from llmstack.jobs.models import AdhocJob, TaskRunLog, TaskStatus
13+
from llmstack.jobs.models import TaskRunLog, TaskStatus
1614

1715
logger = logging.getLogger(__name__)
1816

@@ -226,151 +224,3 @@ def run_app_task(app_id=None, input_data=None, *args, **kwargs):
226224
)
227225

228226
return [SubTaskResult().model_dump()] * len(input_data)
229-
230-
231-
def upsert_datasource_entry_subtask(datasource_id, input_data):
232-
request_input_data = {
233-
"datasource_id": datasource_id,
234-
"input_data": input_data,
235-
}
236-
datasource = DataSource.objects.get(uuid=uuid.UUID(datasource_id))
237-
request = RequestFactory().post(
238-
"/api/datasource_entries/upsert",
239-
data=request_input_data,
240-
format="json",
241-
)
242-
request.user = datasource.owner
243-
request.data = request_input_data
244-
response = DataSourceEntryViewSet().upsert(request)
245-
246-
return {
247-
"status_code": response.status_code,
248-
"data": response.data,
249-
}
250-
251-
252-
def post_upsert_datasource_task(
253-
task_run_log_uuid,
254-
input_index,
255-
status,
256-
response,
257-
job,
258-
):
259-
if task_run_log_uuid:
260-
task_run_log = TaskRunLog.objects.get(
261-
uuid=uuid.UUID(task_run_log_uuid),
262-
)
263-
if status == TaskStatus.SUCCESS and response["status_code"] == 200:
264-
task_run_log.result[input_index] = SubTaskResult(
265-
status=TaskStatus.SUCCESS,
266-
output="success",
267-
).model_dump()
268-
else:
269-
task_run_log.result[input_index] = SubTaskResult(
270-
status=TaskStatus.FAILURE,
271-
error=response,
272-
).model_dump()
273-
task_run_log.save()
274-
275-
# If there are more input data to process, schedule the next task
276-
input_data = job.meta["input_data"]
277-
if input_index + 1 < len(input_data):
278-
time_elapsed = (job.ended_at - job.started_at).total_seconds()
279-
time_remaining_to_schedule_next_task = max(
280-
(settings.TASK_RUN_DELAY - time_elapsed),
281-
1,
282-
)
283-
logger.info(
284-
f"Scheduling next task in {time_remaining_to_schedule_next_task} seconds",
285-
)
286-
287-
django_rq.get_queue(job.meta["queue_name"]).enqueue(
288-
upsert_datasource_entry_subtask,
289-
args=(job.meta["datasource_id"], input_data[input_index + 1]),
290-
on_success=upsert_datasource_entry_subtask_success_callback,
291-
on_failure=upsert_datasource_entry_subtask_failure_callback,
292-
meta={
293-
"datasource_id": job.meta["datasource_id"],
294-
"task_run_log_uuid": task_run_log_uuid,
295-
"task_job_uuid": job.meta.get("task_job_uuid", None),
296-
"input_data": input_data,
297-
"input_data_index": input_index + 1,
298-
"queue_name": job.meta["queue_name"],
299-
"result_ttl": job.meta["result_ttl"],
300-
},
301-
result_ttl=job.meta["result_ttl"],
302-
)
303-
else:
304-
# All tasks are completed. Update the task status to completed
305-
task_job_uuid = job.meta.get("task_job_uuid", None)
306-
if task_job_uuid:
307-
job = AdhocJob.objects.get(uuid=uuid.UUID(task_job_uuid))
308-
if job:
309-
job.status = "finished"
310-
job.save()
311-
312-
313-
def upsert_datasource_entry_subtask_success_callback(
314-
job,
315-
connection,
316-
result,
317-
*args,
318-
**kwargs,
319-
):
320-
try:
321-
post_upsert_datasource_task(
322-
job.meta["task_run_log_uuid"],
323-
job.meta["input_data_index"],
324-
TaskStatus.SUCCESS,
325-
result,
326-
job,
327-
)
328-
except Exception as e:
329-
logger.error(f"Exception: {e}")
330-
331-
332-
def upsert_datasource_entry_subtask_failure_callback(
333-
job,
334-
connection,
335-
type,
336-
value,
337-
traceback,
338-
):
339-
logger.error(
340-
f'task_run_log_uuid: {job.meta["task_run_log_uuid"]}, type: {type}, value: {value}, Traceback: {traceback} ',
341-
)
342-
try:
343-
post_upsert_datasource_task(
344-
job.meta["task_run_log_uuid"],
345-
job.meta["input_data_index"],
346-
TaskStatus.FAILURE,
347-
f"Exception: {type}, detail: {value}",
348-
job,
349-
)
350-
except Exception as e:
351-
logger.error(f"Exception: {e}")
352-
353-
354-
def upsert_datasource_entries_task(datasource_id, input_data, *args, **kwargs):
355-
job_metadata = kwargs["_job_metadata"]
356-
357-
result_ttl = 86400
358-
359-
django_rq.get_queue("default").enqueue(
360-
upsert_datasource_entry_subtask,
361-
args=(datasource_id, input_data[0]),
362-
on_success=upsert_datasource_entry_subtask_success_callback,
363-
on_failure=upsert_datasource_entry_subtask_failure_callback,
364-
meta={
365-
"datasource_id": datasource_id,
366-
"task_run_log_uuid": job_metadata.get("task_run_log_uuid", None),
367-
"task_job_uuid": job_metadata.get("task_job_uuid", None),
368-
"input_data": input_data,
369-
"input_data_index": 0,
370-
"queue_name": "default",
371-
"result_ttl": result_ttl,
372-
},
373-
result_ttl=result_ttl,
374-
)
375-
376-
return [SubTaskResult().model_dump()] * len(input_data)

0 commit comments

Comments
 (0)