@@ -49,6 +49,7 @@ def install_elser():
4949 es .ml .put_trained_model (
5050 model_id = ELSER_MODEL , input = {"field_names" : ["text_field" ]}
5151 )
52+
5253 while True :
5354 status = es .ml .get_trained_models (
5455 model_id = ELSER_MODEL , include = "definition_status"
@@ -57,19 +58,31 @@ def install_elser():
5758 break
5859 time .sleep (1 )
5960
60- # Step 1: Ensure ELSER_MODEL is deployed
61- try :
62- es .ml .start_trained_model_deployment (
63- model_id = ELSER_MODEL , wait_for = "fully_allocated"
64- )
65- print (f'"{ ELSER_MODEL } " model is deployed' )
66- except BadRequestError :
67- # This error means it already exists
68- pass
61+ # Step 1: Ensure ELSER_MODEL is fully allocated
62+ if not is_elser_fully_allocated ():
63+ try :
64+ es .ml .start_trained_model_deployment (
65+ model_id = ELSER_MODEL , wait_for = "fully_allocated"
66+ )
67+ print (f'"{ ELSER_MODEL } " model is deployed' )
68+ except BadRequestError :
69+ pass
70+
71+ while True :
72+ if is_elser_fully_allocated ():
73+ break
74+ time .sleep (1 )
6975
7076 print (f'"{ ELSER_MODEL } " model is ready' )
7177
7278
79+ def is_elser_fully_allocated ():
80+ stats = es .ml .get_trained_models_stats (model_id = ELSER_MODEL )
81+ deployment_stats = stats ["trained_model_stats" ][0 ].get ("deployment_stats" , {})
82+ allocation_status = deployment_stats .get ("allocation_status" , {})
83+ return allocation_status .get ("state" ) == "fully_allocated"
84+
85+
7386def main ():
7487 install_elser ()
7588
@@ -96,7 +109,7 @@ def main():
96109
97110 print (f"Split { len (workplace_docs )} documents into { len (docs )} chunks" )
98111
99- print (f"Creating Elasticsearch sparse vector store in { ELASTICSEARCH_URL } " )
112+ print (f"Creating Elasticsearch sparse vector store for { ELASTICSEARCH_URL } " )
100113
101114 store = ElasticsearchStore (
102115 es_connection = es ,
@@ -110,6 +123,13 @@ def main():
110123 #
111124 # Once elastic/elasticsearch#107077 is fixed, we can use bulk_kwargs to
112125 # adjust the timeout.
126+
127+ print (f"Adding documents to index { INDEX } " )
128+
129+ spinner = Halo (text = "Processing bulk operation" , spinner = "dots" )
130+ if stdout .isatty ():
131+ spinner .start ()
132+
113133 try :
114134 es .indices .delete (index = INDEX , ignore_unavailable = True )
115135 store .add_documents (list (docs ))
@@ -124,6 +144,11 @@ def main():
124144 es .indices .delete (index = INDEX , ignore_unavailable = True )
125145 store .add_documents (list (docs ))
126146
147+ if stdout .isatty ():
148+ spinner .stop ()
149+
150+ print (f"Documents added to index { INDEX } " )
151+
127152
128153def await_ml_tasks (max_timeout = 600 , interval = 5 ):
129154 """
@@ -141,23 +166,14 @@ def await_ml_tasks(max_timeout=600, interval=5):
141166 if not ml_tasks :
142167 return # likely a lost race on tasks
143168
144- spinner = Halo (text = "Awaiting ML tasks" , spinner = "dots" )
145- if stdout .isatty ():
146- spinner .start ()
147- else :
148- print (f"Awaiting { len (ml_tasks )} ML tasks" )
169+ print (f"Awaiting { len (ml_tasks )} ML tasks" )
149170
150171 while time .time () - start_time < max_timeout :
151172 ml_tasks = get_ml_tasks ()
152173 if len (ml_tasks ) == 0 :
153174 break
154175 time .sleep (interval )
155176
156- if stdout .isatty ():
157- spinner .stop ()
158- else :
159- print (f"ML tasks complete" )
160-
161177 if ml_tasks :
162178 raise TimeoutError (
163179 f"Timeout reached. ML tasks are still running: { ', ' .join (ml_tasks )} "
@@ -176,4 +192,4 @@ def get_ml_tasks():
176192
177193# Unless we run through flask, we can miss critical settings or telemetry signals.
178194if __name__ == "__main__" :
179- main ( )
195+ raise RuntimeError ( "Run via the parent directory: 'flask create-index'" )
0 commit comments