-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtasks.py
More file actions
76 lines (61 loc) · 3.56 KB
/
tasks.py
File metadata and controls
76 lines (61 loc) · 3.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
import requests
from celery import shared_task
from django.conf import settings
from drdroid_debug_toolkit.core.integrations.source_metadata_extractor import SourceMetadataExtractor
from drdroid_debug_toolkit.core.integrations.source_metadata_extractor_facade import source_metadata_extractor_facade
from agent.shutdown import is_shutting_down
logger = logging.getLogger(__name__)
@shared_task(max_retries=3, default_retry_delay=10)
def populate_connector_metadata(request_id, connector_name, connector_type, connector_credentials_dict):
logger.info(f"Running populate_connector_metadata for connector: {connector_name} with request_id: {request_id}")
drd_cloud_host = settings.DRD_CLOUD_API_HOST
drd_cloud_api_token = settings.DRD_CLOUD_API_TOKEN
try:
extractor_class = source_metadata_extractor_facade.get_connector_metadata_extractor_class(connector_type)
except Exception as e:
logger.warning(f"Exception occurred while fetching extractor class for connector: {connector_name}, "
f"with error: {e}")
return False
extractor = extractor_class(request_id=request_id, connector_name=connector_name, **connector_credentials_dict)
# Set API credentials for metadata saving
extractor.api_host = drd_cloud_host
extractor.api_token = drd_cloud_api_token
# Only include extract_* methods that actually persist data (not get_*_data helper methods)
extractor_methods = [method for method in dir(extractor) if
callable(getattr(extractor, method)) and method not in dir(SourceMetadataExtractor)
and method.startswith('extract_')]
for extractor_method in extractor_methods:
# Check for shutdown between extraction methods
if is_shutting_down():
logger.info(f"Shutdown in progress - stopping asset extraction for {connector_name} at method {extractor_method}")
break
logger.info(f"Running method: {extractor_method} for connector: {connector_name}")
try:
extractor_async_method_call(request_id, connector_name, connector_type, connector_credentials_dict,
extractor_method)
except Exception as e:
logger.error(
f"Exception occurred while scheduling method: {extractor_method} for connector: {connector_name}, "
f"with error: {e}")
continue
@shared_task(max_retries=3, default_retry_delay=10)
def extractor_async_method_call(request_id, connector_name, connector_type, connector_credentials_dict,
extractor_method):
logger.info(f"Running extractor_async_method_call: {extractor_method} for connector: {connector_name} with "
f"request_id: {request_id}")
drd_cloud_host = settings.DRD_CLOUD_API_HOST
drd_cloud_api_token = settings.DRD_CLOUD_API_TOKEN
extractor_class = source_metadata_extractor_facade.get_connector_metadata_extractor_class(connector_type)
extractor = extractor_class(request_id=request_id, connector_name=connector_name, **connector_credentials_dict)
# Set API credentials for metadata saving
extractor.api_host = drd_cloud_host
extractor.api_token = drd_cloud_api_token
method = getattr(extractor, extractor_method)
try:
method()
except Exception as e:
logger.error(f"Exception occurred while running method: {extractor_method} for connector: {connector_name}, "
f"request ID: {request_id}, with error: {e}")
return False
return True