@@ -28,7 +28,7 @@ <h1 class="title">Module <code>topicnet.cooking_machine.cubes.base_cube</code></
2828from tqdm import tqdm
2929import warnings
3030from multiprocessing import Queue, Process
31- from queue import Empty
31+ # from queue import Empty
3232from artm.wrapper.exceptions import ArtmException
3333
3434from .strategy import BaseStrategy
@@ -72,14 +72,15 @@ <h1 class="title">Module <code>topicnet.cooking_machine.cubes.base_cube</code></
7272 return model.scores[self.score_name][-1]
7373
7474
75- def get_from_queue_till_fail(queue, error_message='', fail_margin=1,):
76- fail_counter = 0
77- while fail_counter < fail_margin:
78- try:
79- return queue.get(timeout=0.1)
80- except Empty:
81- fail_counter += 1
82- raise Empty(error_message)
75+ # exists for multiprocessing debug
76+ def put_to_queue(queue, puttable):
77+ queue.put(puttable)
78+
79+
80+ # exists for multiprocessing debug
81+ def get_from_queue_till_fail(queue, error_message='',):
82+ while True:
83+ return queue.get()
8384
8485
8586class BaseCube:
@@ -205,29 +206,33 @@ <h1 class="title">Module <code>topicnet.cooking_machine.cubes.base_cube</code></
205206 raise ValueError(
206207 f'Cannot alter and fit artm model with parameters {search_point}.\n'
207208 "ARTM failed with following: " + error_message
209+
208210 )
209211 # add cube description to the model history
210212 new_model.add_cube(model_cube)
211213 new_model.experiment = experiment
212214 new_model.save()
215+ assert os.path.exists(new_model.model_default_save_path)
213216
214217 returned_paths.append(new_model.model_default_save_path)
215218
216219 # some strategies depend on previous train results, therefore scores must be updated
217220 if self.tracked_score_function:
218221 current_score = self.tracked_score_function(new_model)
219- else:
222+ self.strategy.update_scores(current_score)
223+ # else:
220224 # we return number of iterations as a placeholder
221- current_score = len(returned_paths)
222- self.strategy.update_scores(current_score)
225+ # current_score = len(returned_paths)
226+
223227 return returned_paths
224228
225229 def _retrieve_results_from_process(self, queue, experiment):
226230 from ..models import DummyTopicModel
227231 models_num = get_from_queue_till_fail(queue, NUM_MODELS_ERROR)
228232 topic_models = []
229233 for _ in range(models_num):
230- path = get_from_queue_till_fail(queue, MODEL_RETRIEVE_ERROR.format(_, models_num))
234+ path = get_from_queue_till_fail(queue,
235+ MODEL_RETRIEVE_ERROR.format(_, models_num))
231236 topic_models.append(DummyTopicModel.load(path, experiment=experiment))
232237
233238 strategy_parameters = get_from_queue_till_fail(queue, STRATEGY_RETRIEVE_ERROR)
@@ -237,6 +242,7 @@ <h1 class="title">Module <code>topicnet.cooking_machine.cubes.base_cube</code></
237242 for (warning_message, warning_class) in caught_warnings:
238243 # if issubclass(warning_class, UserWarning):
239244 warnings.warn(warning_message)
245+
240246 return topic_models
241247
242248 def _train_models_and_report_results(self, queue, experiment, topic_model, dataset,
@@ -249,18 +255,17 @@ <h1 class="title">Module <code>topicnet.cooking_machine.cubes.base_cube</code></
249255 """
250256 with warnings.catch_warnings(record=True) as caught_warnings:
251257 returned_paths = self._train_models(experiment, topic_model, dataset, search_space)
252-
253- queue.put(len(returned_paths))
258+ put_to_queue(queue, len(returned_paths))
254259 for path in returned_paths:
255- queue.put( path)
260+ put_to_queue(queue, path)
256261
257262 # to work with strategy we recover consistency by sending important parameters
258263 strategy_parameters = self.strategy._get_strategy_parameters(saveable_only=True)
259- queue.put( strategy_parameters)
264+ put_to_queue(queue, strategy_parameters)
260265
261266 caught_warnings = [(warning.message, warning.category)
262267 for warning in caught_warnings]
263- queue.put( caught_warnings)
268+ put_to_queue(queue, caught_warnings)
264269
265270 def _run_cube(self, topic_model, dataset):
266271 """
@@ -325,10 +330,10 @@ <h1 class="title">Module <code>topicnet.cooking_machine.cubes.base_cube</code></
325330 process = Process(
326331 target=self._train_models_and_report_results,
327332 args=(queue, experiment, topic_model, dataset,
328- search_space, search_length)
333+ search_space, search_length),
334+ daemon=True
329335 )
330336 process.start()
331- process.join()
332337 topic_models = self._retrieve_results_from_process(queue, experiment)
333338 else:
334339 returned_paths = self._train_models(experiment, topic_model, dataset, search_space)
@@ -417,22 +422,30 @@ <h2 id="returns">Returns</h2>
417422</ details >
418423</ dd >
419424< dt id ="topicnet.cooking_machine.cubes.base_cube.get_from_queue_till_fail "> < code class ="name flex ">
420- < span > def < span class ="ident "> get_from_queue_till_fail</ span > </ span > (< span > queue, error_message='', fail_margin=1 )</ span >
425+ < span > def < span class ="ident "> get_from_queue_till_fail</ span > </ span > (< span > queue, error_message='')</ span >
421426</ code > </ dt >
422427< dd >
423428< section class ="desc "> </ section >
424429< details class ="source ">
425430< summary >
426431< span > Expand source code</ span >
427432</ summary >
428- < pre > < code class ="python "> def get_from_queue_till_fail(queue, error_message='', fail_margin=1,):
429- fail_counter = 0
430- while fail_counter < fail_margin:
431- try:
432- return queue.get(timeout=0.1)
433- except Empty:
434- fail_counter += 1
435- raise Empty(error_message)</ code > </ pre >
433+ < pre > < code class ="python "> def get_from_queue_till_fail(queue, error_message='',):
434+ while True:
435+ return queue.get()</ code > </ pre >
436+ </ details >
437+ </ dd >
438+ < dt id ="topicnet.cooking_machine.cubes.base_cube.put_to_queue "> < code class ="name flex ">
439+ < span > def < span class ="ident "> put_to_queue</ span > </ span > (< span > queue, puttable)</ span >
440+ </ code > </ dt >
441+ < dd >
442+ < section class ="desc "> </ section >
443+ < details class ="source ">
444+ < summary >
445+ < span > Expand source code</ span >
446+ </ summary >
447+ < pre > < code class ="python "> def put_to_queue(queue, puttable):
448+ queue.put(puttable)</ code > </ pre >
436449</ details >
437450</ dd >
438451</ dl >
@@ -594,29 +607,33 @@ <h2 id="parameters">Parameters</h2>
594607 raise ValueError(
595608 f'Cannot alter and fit artm model with parameters {search_point}.\n'
596609 "ARTM failed with following: " + error_message
610+
597611 )
598612 # add cube description to the model history
599613 new_model.add_cube(model_cube)
600614 new_model.experiment = experiment
601615 new_model.save()
616+ assert os.path.exists(new_model.model_default_save_path)
602617
603618 returned_paths.append(new_model.model_default_save_path)
604619
605620 # some strategies depend on previous train results, therefore scores must be updated
606621 if self.tracked_score_function:
607622 current_score = self.tracked_score_function(new_model)
608- else:
623+ self.strategy.update_scores(current_score)
624+ # else:
609625 # we return number of iterations as a placeholder
610- current_score = len(returned_paths)
611- self.strategy.update_scores(current_score)
626+ # current_score = len(returned_paths)
627+
612628 return returned_paths
613629
614630 def _retrieve_results_from_process(self, queue, experiment):
615631 from ..models import DummyTopicModel
616632 models_num = get_from_queue_till_fail(queue, NUM_MODELS_ERROR)
617633 topic_models = []
618634 for _ in range(models_num):
619- path = get_from_queue_till_fail(queue, MODEL_RETRIEVE_ERROR.format(_, models_num))
635+ path = get_from_queue_till_fail(queue,
636+ MODEL_RETRIEVE_ERROR.format(_, models_num))
620637 topic_models.append(DummyTopicModel.load(path, experiment=experiment))
621638
622639 strategy_parameters = get_from_queue_till_fail(queue, STRATEGY_RETRIEVE_ERROR)
@@ -626,6 +643,7 @@ <h2 id="parameters">Parameters</h2>
626643 for (warning_message, warning_class) in caught_warnings:
627644 # if issubclass(warning_class, UserWarning):
628645 warnings.warn(warning_message)
646+
629647 return topic_models
630648
631649 def _train_models_and_report_results(self, queue, experiment, topic_model, dataset,
@@ -638,18 +656,17 @@ <h2 id="parameters">Parameters</h2>
638656 """
639657 with warnings.catch_warnings(record=True) as caught_warnings:
640658 returned_paths = self._train_models(experiment, topic_model, dataset, search_space)
641-
642- queue.put(len(returned_paths))
659+ put_to_queue(queue, len(returned_paths))
643660 for path in returned_paths:
644- queue.put( path)
661+ put_to_queue(queue, path)
645662
646663 # to work with strategy we recover consistency by sending important parameters
647664 strategy_parameters = self.strategy._get_strategy_parameters(saveable_only=True)
648- queue.put( strategy_parameters)
665+ put_to_queue(queue, strategy_parameters)
649666
650667 caught_warnings = [(warning.message, warning.category)
651668 for warning in caught_warnings]
652- queue.put( caught_warnings)
669+ put_to_queue(queue, caught_warnings)
653670
654671 def _run_cube(self, topic_model, dataset):
655672 """
@@ -714,10 +731,10 @@ <h2 id="parameters">Parameters</h2>
714731 process = Process(
715732 target=self._train_models_and_report_results,
716733 args=(queue, experiment, topic_model, dataset,
717- search_space, search_length)
734+ search_space, search_length),
735+ daemon=True
718736 )
719737 process.start()
720- process.join()
721738 topic_models = self._retrieve_results_from_process(queue, experiment)
722739 else:
723740 returned_paths = self._train_models(experiment, topic_model, dataset, search_space)
@@ -883,6 +900,7 @@ <h1>Index</h1>
883900< ul class ="">
884901< li > < code > < a title ="topicnet.cooking_machine.cubes.base_cube.check_experiment_existence " href ="#topicnet.cooking_machine.cubes.base_cube.check_experiment_existence "> check_experiment_existence</ a > </ code > </ li >
885902< li > < code > < a title ="topicnet.cooking_machine.cubes.base_cube.get_from_queue_till_fail " href ="#topicnet.cooking_machine.cubes.base_cube.get_from_queue_till_fail "> get_from_queue_till_fail</ a > </ code > </ li >
903+ < li > < code > < a title ="topicnet.cooking_machine.cubes.base_cube.put_to_queue " href ="#topicnet.cooking_machine.cubes.base_cube.put_to_queue "> put_to_queue</ a > </ code > </ li >
886904</ ul >
887905</ li >
888906< li > < h3 > < a href ="#header-classes "> Classes</ a > </ h3 >
0 commit comments