1+ import time
2+ from typing import Any , List
3+ import uuid
14import docker
25import json
36import os
47import pytz
5- import re
6- from datetime import datetime
7- from submodules .model .business_objects import attribute , record , project , tokenization
8- from submodules .model .enums import DataTypes
8+
9+ import datetime
10+ from dateutil import parser
11+
12+ from submodules .model .business_objects import (
13+ attribute ,
14+ general ,
15+ record ,
16+ project ,
17+ tokenization ,
18+ )
19+ from submodules .model .models import Attribute
920from submodules .s3 import controller as s3
21+ from util import daemon , notification
1022
1123client = docker .from_env ()
1224image = os .getenv ("AC_EXEC_ENV_IMAGE" )
1325exec_env_network = os .getenv ("LF_NETWORK" )
26+ __tz = pytz .timezone ("Europe/Berlin" )
27+
28+ __containers_running = {}
1429
1530
1631def add_log_to_attribute_logs (
1732 project_id : str , attribute_id : str , log : str , append_to_logs : bool = True
1833) -> None :
1934 attribute_item = attribute .get (project_id , attribute_id )
20- berlin_now = datetime .now (pytz .timezone ("Europe/Berlin" ))
21- if append_to_logs :
22- logs = attribute_item .logs
23- logs .append (" " .join ([berlin_now .strftime ("%Y-%m-%dT%H:%M:%S" ), log ]))
35+ berlin_now = datetime .datetime .now (__tz )
36+ time_string = berlin_now .strftime ("%Y-%m-%dT%H:%M:%S" )
37+ line = f"{ time_string } { log } "
38+
39+ if not append_to_logs or not attribute_item .logs :
40+ logs = [line ]
41+ attribute .update (
42+ project_id = project_id ,
43+ attribute_id = attribute_id ,
44+ logs = logs ,
45+ with_commit = True ,
46+ )
2447 else :
25- logs = [" " .join ([berlin_now .strftime ("%Y-%m-%dT%H:%M:%S" ), log ])]
26- attribute .update (
27- project_id = project_id ,
28- attribute_id = attribute_id ,
29- logs = logs ,
30- with_commit = True ,
31- )
48+ attribute_item .logs .append (line )
49+ general .commit ()
3250
3351
3452def prepare_sample_records_doc_bin (attribute_id : str , project_id : str ) -> str :
@@ -55,6 +73,14 @@ def run_attribute_calculation_exec_env(
5573) -> None :
5674 attribute_item = attribute .get (project_id , attribute_id )
5775
76+ if attribute_item .logs :
77+ add_log_to_attribute_logs (
78+ project_id ,
79+ attribute_id ,
80+ "re-run attribute calculation" ,
81+ append_to_logs = False ,
82+ )
83+
5884 prefixed_function_name = f"{ attribute_id } _fn"
5985 prefixed_payload = f"{ attribute_id } _payload.json"
6086 project_item = project .get (project_id )
@@ -73,24 +99,32 @@ def run_attribute_calculation_exec_env(
7399 attribute_item .data_type ,
74100 ]
75101
76- container = client .containers .run (
102+ container_name = str (uuid .uuid4 ())
103+ container = client .containers .create (
77104 image = image ,
78105 command = command ,
79- remove = True ,
106+ auto_remove = True ,
80107 detach = True ,
81108 network = exec_env_network ,
82109 )
83-
84- logs = [
110+ set_progress (project_id , attribute_item , 0.05 )
111+ __containers_running [container_name ] = True
112+ daemon .run (
113+ read_container_logs_thread ,
114+ project_id ,
115+ container_name ,
116+ str (attribute_item .id ),
117+ container ,
118+ )
119+ container .start ()
120+ attribute_item .logs = [
85121 line .decode ("utf-8" ).strip ("\n " )
86122 for line in container .logs (
87123 stream = True , stdout = True , stderr = True , timestamps = True
88124 )
125+ if "progress" not in line .decode ("utf-8" )
89126 ]
90-
91- attribute .update (
92- project_id = project_id , attribute_id = attribute_id , logs = logs , with_commit = True
93- )
127+ del __containers_running [container_name ]
94128
95129 try :
96130 payload = s3 .get_object (org_id , project_id + "/" + prefixed_payload )
@@ -104,5 +138,96 @@ def run_attribute_calculation_exec_env(
104138 s3 .delete_object (org_id , project_id + "/" + doc_bin )
105139 s3 .delete_object (org_id , project_id + "/" + prefixed_function_name )
106140 s3 .delete_object (org_id , project_id + "/" + prefixed_payload )
141+ set_progress (project_id , attribute_item , 0.9 )
107142
108143 return calculated_attributes
144+
145+
146+ def extend_logs (
147+ project_id : str ,
148+ attribute : Attribute ,
149+ logs : List [str ],
150+ ) -> None :
151+ if not logs or len (logs ) == 0 :
152+ return
153+
154+ if not attribute .logs :
155+ attribute .logs = logs
156+ else :
157+ all_logs = [l for l in attribute .logs ]
158+ all_logs += logs
159+ attribute .logs = all_logs
160+ general .commit ()
161+ # currently dummy since frontend doesn't have a log change yet
162+ notification .send_organization_update (
163+ project_id , f"attributes_updated:{ str (attribute .id )} "
164+ )
165+
166+
167+ def read_container_logs_thread (
168+ project_id : str ,
169+ name : str ,
170+ attribute_id : str ,
171+ docker_container : Any ,
172+ ) -> None :
173+
174+ ctx_token = general .get_ctx_token ()
175+ # needs to be refetched since it is not thread safe
176+ attribute_item = attribute .get (project_id , attribute_id )
177+ previous_progress = - 1
178+ last_timestamp = None
179+ c = 0
180+ while name in __containers_running :
181+ time .sleep (1 )
182+ c += 1
183+ if c > 100 :
184+ ctx_token = general .remove_and_refresh_session (ctx_token , True )
185+ attribute_item = attribute .get (project_id , attribute_id )
186+ if not name in __containers_running :
187+ break
188+ try :
189+ # timestamps included to filter out logs that have already been read
190+ log_lines = docker_container .logs (
191+ stdout = True ,
192+ stderr = True ,
193+ timestamps = True ,
194+ since = last_timestamp ,
195+ )
196+ except :
197+ # failsafe for containers that shut down during the read
198+ break
199+ current_logs = [
200+ l for l in str (log_lines .decode ("utf-8" )).split ("\n " ) if len (l .strip ()) > 0
201+ ]
202+ if len (current_logs ) == 0 :
203+ continue
204+ last_entry = current_logs [- 1 ]
205+ last_timestamp_str = last_entry .split (" " )[0 ]
206+ last_timestamp = parser .parse (last_timestamp_str ).replace (
207+ tzinfo = None
208+ ) + datetime .timedelta (seconds = 1 )
209+ non_progress_logs = [l for l in current_logs if "progress" not in l ]
210+ progress_logs = [l for l in current_logs if "progress" in l ]
211+ if len (non_progress_logs ) > 0 :
212+ extend_logs (project_id , attribute_item , non_progress_logs )
213+ if len (progress_logs ) == 0 :
214+ continue
215+ last_entry = float (progress_logs [- 1 ].split ("progress: " )[1 ].strip ())
216+ if previous_progress == last_entry :
217+ continue
218+ previous_progress = last_entry
219+ set_progress (project_id , attribute_item , last_entry * 0.8 + 0.05 )
220+ general .remove_and_refresh_session (ctx_token )
221+
222+
223+ def set_progress (
224+ project_id : str ,
225+ attribute : Attribute ,
226+ progress : float ,
227+ ) -> None :
228+ final_progress = round (progress , 4 )
229+ attribute .progress = final_progress
230+ general .commit ()
231+ notification .send_organization_update (
232+ project_id , f"calculate_attribute:progress:{ attribute .id } :{ final_progress } "
233+ )
0 commit comments