Skip to content

Commit 2b20c2f

Browse files
authored
Changes to new deamon structure (#258)
* Changes to new deamon structure * Remove old daemon file * fix import * Submodule dev change
1 parent 9ddadfd commit 2b20c2f

File tree

21 files changed

+103
-99
lines changed

21 files changed

+103
-99
lines changed

api/transfer.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
from util.notification import create_notification
3737
from submodules.model.enums import NotificationType
3838
from submodules.model.models import UploadTask
39-
from util import daemon, notification
39+
from util import notification
40+
from submodules.model import daemon
4041
from controller.transfer.cognition.minio_upload import handle_cognition_file_upload
4142

4243
from controller.task_master import manager as task_master_manager
@@ -232,7 +233,7 @@ def put(self, request) -> PlainTextResponse:
232233
return PlainTextResponse("Bad project id", status_code=400)
233234
task_id = request.path_params["task_id"]
234235

235-
daemon.run(
236+
daemon.run_without_db_token(
236237
cognition_import_wizard.prepare_and_finalize_setup,
237238
cognition_project_id=cognition_project_id,
238239
task_id=task_id,
@@ -302,7 +303,7 @@ def init_file_import(task: UploadTask, project_id: str, is_global_update: bool)
302303
cognition_preparator.prepare_cognition_import(project_id, task)
303304
else:
304305
transfer_manager.import_records_from_file(project_id, task)
305-
daemon.run(
306+
daemon.run_with_db_token(
306307
__recalculate_missing_attributes_and_embeddings,
307308
project_id,
308309
str(task.user_id),
@@ -378,7 +379,6 @@ def __recalculate_missing_attributes_and_embeddings(
378379
def __calculate_missing_attributes(project_id: str, user_id: str) -> None:
379380
# wait a second to ensure that the process is started in the tokenization service
380381
time.sleep(5)
381-
ctx_token = general.get_ctx_token()
382382
attributes_usable = attribute.get_all_ordered(
383383
project_id,
384384
True,
@@ -387,7 +387,6 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None:
387387
],
388388
)
389389
if len(attributes_usable) == 0:
390-
general.remove_and_refresh_session(ctx_token, False)
391390
return
392391

393392
# stored as list so connection results do not affect
@@ -405,7 +404,7 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None:
405404
i += 1
406405
if i >= 60:
407406
i = 0
408-
ctx_token = general.remove_and_refresh_session(ctx_token, True)
407+
daemon.reset_session_token_in_thread()
409408
if tokenization.is_doc_bin_creation_running_or_queued(project_id):
410409
time.sleep(2)
411410
continue
@@ -420,7 +419,7 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None:
420419
break
421420
if i >= 60:
422421
i = 0
423-
ctx_token = general.remove_and_refresh_session(ctx_token, True)
422+
daemon.reset_session_token_in_thread()
424423

425424
current_att_id = attribute_ids[0]
426425
current_att = attribute.get(project_id, current_att_id)
@@ -468,4 +467,3 @@ def __calculate_missing_attributes(project_id: str, user_id: str) -> None:
468467
project_id=project_id,
469468
message="calculate_attribute:finished:all",
470469
)
471-
general.remove_and_refresh_session(ctx_token, False)

controller/attribute/manager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
RecordTokenizationScope,
1717
AttributeVisibility,
1818
)
19-
from util import daemon, notification
19+
from util import notification
20+
21+
from submodules.model import daemon
2022

2123
from controller.task_master import manager as task_master_manager
2224
from submodules.model.enums import TaskType
@@ -246,7 +248,7 @@ def calculate_user_attribute_all_records(
246248
notification.send_organization_update(
247249
project_id=project_id, message=f"calculate_attribute:started:{attribute_id}"
248250
)
249-
daemon.run(
251+
daemon.run_without_db_token(
250252
__calculate_user_attribute_all_records,
251253
project_id,
252254
org_id,

controller/attribute/util.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
)
1919
from submodules.model.models import Attribute
2020
from submodules.s3 import controller as s3
21-
from util import daemon, notification
21+
from util import notification
2222
from controller.knowledge_base import util as knowledge_base
2323
from submodules.model import enums
24+
from submodules.model import daemon
2425

2526
client = docker.from_env()
2627
image = os.getenv("AC_EXEC_ENV_IMAGE")
@@ -118,7 +119,7 @@ def run_attribute_calculation_exec_env(
118119
)
119120
set_progress(project_id, attribute_item, 0.05)
120121
__containers_running[container_name] = True
121-
daemon.run(
122+
daemon.run_without_db_token(
122123
read_container_logs_thread,
123124
project_id,
124125
container_name,
@@ -163,7 +164,7 @@ def extend_logs(
163164
if not attribute.logs:
164165
attribute.logs = logs
165166
else:
166-
all_logs = [l for l in attribute.logs]
167+
all_logs = [ll for ll in attribute.logs]
167168
all_logs += logs
168169
attribute.logs = all_logs
169170
general.commit()
@@ -195,7 +196,7 @@ def read_container_logs_thread(
195196
break
196197
if attribute_item.state == enums.AttributeState.FAILED.value:
197198
break
198-
if not name in __containers_running:
199+
if name not in __containers_running:
199200
break
200201
try:
201202
# timestamps included to filter out logs that have already been read
@@ -205,11 +206,13 @@ def read_container_logs_thread(
205206
timestamps=True,
206207
since=last_timestamp,
207208
)
208-
except:
209+
except Exception:
209210
# failsafe for containers that shut down during the read
210211
break
211212
current_logs = [
212-
l for l in str(log_lines.decode("utf-8")).split("\n") if len(l.strip()) > 0
213+
ll
214+
for ll in str(log_lines.decode("utf-8")).split("\n")
215+
if len(ll.strip()) > 0
213216
]
214217
if len(current_logs) == 0:
215218
continue
@@ -218,8 +221,8 @@ def read_container_logs_thread(
218221
last_timestamp = parser.parse(last_timestamp_str).replace(
219222
tzinfo=None
220223
) + datetime.timedelta(seconds=1)
221-
non_progress_logs = [l for l in current_logs if "progress" not in l]
222-
progress_logs = [l for l in current_logs if "progress" in l]
224+
non_progress_logs = [ll for ll in current_logs if "progress" not in ll]
225+
progress_logs = [ll for ll in current_logs if "progress" in ll]
223226
if len(non_progress_logs) > 0:
224227
extend_logs(project_id, attribute_item, non_progress_logs)
225228
if len(progress_logs) == 0:

controller/embedding/manager.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from submodules.model import enums
66
from submodules.model.models import Embedding
7-
from util import daemon, notification
7+
from util import notification
88
from . import util
99
from . import connector
1010
from .terms import TERMS_INFO
@@ -16,6 +16,7 @@
1616
general,
1717
project,
1818
)
19+
from submodules.model import daemon
1920
from submodules.model.util import sql_alchemy_to_dict
2021
from controller.embedding.connector import collection_on_qdrant
2122

@@ -74,14 +75,14 @@ def get_recommended_encoders(is_managed: bool) -> List[Any]:
7475

7576

7677
def create_embedding(project_id: str, embedding_id: str) -> None:
77-
daemon.run(connector.request_embedding, project_id, embedding_id)
78+
daemon.run_without_db_token(connector.request_embedding, project_id, embedding_id)
7879

7980

8081
def create_embeddings_one_by_one(
8182
project_id: str,
8283
embeddings_ids: List[str],
8384
) -> None:
84-
daemon.run(__embed_one_by_one_helper, project_id, embeddings_ids)
85+
daemon.run_without_db_token(__embed_one_by_one_helper, project_id, embeddings_ids)
8586

8687

8788
def request_tensor_upload(project_id: str, embedding_id: str) -> Any:
@@ -319,7 +320,9 @@ def __recreate_embedding(project_id: str, embedding_id: str) -> Embedding:
319320
general.commit()
320321

321322
connector.request_deleting_embedding(project_id, old_id)
322-
daemon.run(connector.request_embedding, project_id, new_embedding_item.id)
323+
daemon.run_without_db_token(
324+
connector.request_embedding, project_id, new_embedding_item.id
325+
)
323326
return new_embedding_item
324327

325328

controller/embedding/util.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
from controller.embedding import connector
2-
from submodules.model import enums
3-
from submodules.model.business_objects import agreement, embedding, general
4-
from submodules.model.models import Embedding
5-
from util import daemon
1+
from submodules.model.business_objects import embedding
62

73

84
def has_encoder_running(project_id: str) -> bool:

controller/information_source/manager.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
import json
21
import os
3-
from typing import List, Optional
2+
from typing import List
43
from controller.information_source.util import resolve_source_return_type
54
from submodules.model import InformationSource, LabelingTask, enums
65
from submodules.model.business_objects import (
@@ -10,8 +9,7 @@
109
)
1110
from controller.misc import config_service
1211
from controller.labeling_access_link import manager as link_manager
13-
from controller.record_label_association import manager as rla_manager
14-
from util import daemon
12+
from submodules.model import daemon
1513

1614

1715
def get_information_source(project_id: str, source_id: str) -> InformationSource:
@@ -65,11 +63,7 @@ def update_information_source(
6563
) -> None:
6664
labeling_task_item: LabelingTask = labeling_task.get(project_id, labeling_task_id)
6765
return_type: str = resolve_source_return_type(labeling_task_item)
68-
item = information_source.get(project_id, source_id)
69-
new_payload_needed = (
70-
str(item.source_code) != code or str(item.labeling_task_id) != labeling_task_id
71-
)
72-
item = information_source.update(
66+
information_source.update(
7367
project_id,
7468
source_id,
7569
labeling_task_id=labeling_task_id,
@@ -94,7 +88,9 @@ def delete_information_source(project_id: str, source_id: str) -> None:
9488
== enums.InformationSourceType.ACTIVE_LEARNING.value
9589
and config_service.get_config_value("is_managed")
9690
):
97-
daemon.run(__delete_active_learner_from_inference_dir, project_id, source_id)
91+
daemon.run_without_db_token(
92+
__delete_active_learner_from_inference_dir, project_id, source_id
93+
)
9894

9995
information_source.delete(project_id, source_id, with_commit=True)
10096

controller/misc/config_service.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
from typing import Dict, Any, Optional, Union
22
import requests
3-
import json
43
import time
5-
from util import daemon
4+
from submodules.model import daemon
65
from util import service_requests
76

87
__config = None
@@ -28,7 +27,7 @@ def refresh_config():
2827
)
2928
global __config
3029
__config = response.json()
31-
daemon.run(invalidate_after, 3600) # one hour as failsave
30+
daemon.run_with_db_token(invalidate_after, 3600) # one hour as failsave
3231

3332

3433
def get_config_value(

controller/payload/payload_scheduler.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,14 @@
66
import pytz
77
import json
88
import docker
9-
import timeit
109
import traceback
1110

1211
# from datetime import datetime
1312
from dateutil import parser
1413
import datetime
1514

1615
from exceptions.exceptions import PayloadSchedulerError
17-
from submodules.model import enums, events
16+
from submodules.model import enums
1817
from submodules.model.business_objects import (
1918
information_source,
2019
embedding,
@@ -26,6 +25,7 @@
2625
project,
2726
organization,
2827
)
28+
from submodules.model import daemon
2929
from submodules.model.business_objects.embedding import get_embedding_record_ids
3030
from submodules.model.business_objects.information_source import (
3131
get_exclusion_record_ids,
@@ -46,7 +46,7 @@
4646
RecordLabelAssociation,
4747
InformationSourcePayload,
4848
)
49-
from util import daemon, notification
49+
from util import notification
5050
from submodules.s3 import controller as s3
5151
from controller.knowledge_base import util as knowledge_base
5252
from controller.misc import config_service
@@ -232,7 +232,6 @@ def execution_pipeline(
232232
project_id,
233233
information_source_item.name,
234234
)
235-
start = timeit.default_timer()
236235
run_container(
237236
payload_item,
238237
project_id,
@@ -289,7 +288,6 @@ def execution_pipeline(
289288
project_id,
290289
f"payload_failed:{information_source_item.id}:{payload_item.id}:{information_source_item.type}",
291290
)
292-
stop = timeit.default_timer()
293291
general.commit()
294292

295293
org_id = organization.get_id_by_project_id(project_id)
@@ -309,7 +307,7 @@ def execution_pipeline(
309307
print(traceback.format_exc())
310308

311309
if asynchronous:
312-
daemon.run(
310+
daemon.run_without_db_token(
313311
prepare_and_run_execution_pipeline,
314312
str(payload.id),
315313
project_id,
@@ -386,7 +384,7 @@ def run_container(
386384
)
387385
set_payload_progress(project_id, information_source_payload, 0.05)
388386
__containers_running[container_name] = True
389-
daemon.run(
387+
daemon.run_without_db_token(
390388
read_container_logs_thread,
391389
project_id,
392390
container_name,

controller/project/manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
information_source,
1818
general,
1919
)
20+
from submodules.model import daemon
2021
from fast_api.types import HuddleData, ProjectSize
21-
from util import daemon
2222
from controller.task_master import manager as task_master_manager
2323
from submodules.model.enums import TaskType, RecordTokenizationScope
2424
from submodules.model.business_objects import util as db_util
@@ -139,7 +139,7 @@ def delete_project(project_id: str) -> None:
139139
org_id = organization.get_id_by_project_id(project_id)
140140
project.delete_by_id(project_id, with_commit=True)
141141

142-
daemon.run(__background_cleanup, org_id, project_id)
142+
daemon.run_without_db_token(__background_cleanup, org_id, project_id)
143143

144144

145145
def __background_cleanup(org_id: str, project_id: str) -> None:
@@ -295,7 +295,7 @@ def __get_first_data_id(project_id: str, user_id: str, huddle_type: str) -> str:
295295

296296
def check_in_deletion_projects() -> None:
297297
# this is only supposed to be called during startup of the application
298-
daemon.run(__check_in_deletion_projects)
298+
daemon.run_without_db_token(__check_in_deletion_projects)
299299

300300

301301
def __check_in_deletion_projects() -> None:

controller/record/manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
)
1616
from service.search import search
1717
from submodules.model import enums
18+
from submodules.model import daemon
1819

1920
from controller.embedding import connector as embedding_connector
2021
from controller.record import neural_search_connector
2122
from controller.embedding import manager as embedding_manager
2223
from controller.tokenization import tokenization_service
23-
from util import daemon
2424
from util.miscellaneous_functions import chunk_list
2525
import time
2626
import traceback
@@ -109,7 +109,7 @@ def get_records_by_extended_search(
109109

110110
def delete_record(project_id: str, record_id: str) -> None:
111111
record.delete(project_id, record_id, with_commit=True)
112-
daemon.run(__reupload_embeddings, project_id)
112+
daemon.run_without_db_token(__reupload_embeddings, project_id)
113113

114114

115115
def delete_all_records(project_id: str) -> None:
@@ -251,7 +251,7 @@ def __check_and_prep_edit_records(
251251
f"can't find embedding PCA for {embedding_item.name}. Try rebuilding or removing the embeddings on settings page."
252252
)
253253
continue
254-
if not embedding_item.attribute_id in useable_embeddings:
254+
if embedding_item.attribute_id not in useable_embeddings:
255255
useable_embeddings[embedding_item.attribute_id] = []
256256
useable_embeddings[embedding_item.attribute_id].append(embedding_item)
257257

0 commit comments

Comments
 (0)