Skip to content

Commit 109ec2f

Browse files
replace syncio event loop with .run()
1 parent d47bd1e commit 109ec2f

File tree

2 files changed

+12
-34
lines changed

2 files changed

+12
-34
lines changed

src/service/data/datasources/data_source.py

Lines changed: 11 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -83,29 +83,17 @@ def get_dataframe_with_batch_size(
8383
try:
8484
model_data = ModelData(model_id)
8585

86-
loop = asyncio.new_event_loop()
87-
asyncio.set_event_loop(loop)
86+
input_rows, output_rows, metadata_rows = asyncio.run(model_data.row_counts())
8887

89-
try:
90-
input_rows, output_rows, metadata_rows = loop.run_until_complete(
91-
model_data.row_counts()
92-
)
88+
available_rows = min(input_rows, output_rows, metadata_rows)
9389

94-
available_rows = min(input_rows, output_rows, metadata_rows)
90+
start_row = max(0, available_rows - batch_size)
91+
n_rows = min(batch_size, available_rows)
9592

96-
start_row = max(0, available_rows - batch_size)
97-
n_rows = min(batch_size, available_rows)
93+
input_data, output_data, metadata = asyncio.run(model_data.data(start_row=start_row, n_rows=n_rows))
9894

99-
input_data, output_data, metadata = loop.run_until_complete(
100-
model_data.data(start_row=start_row, n_rows=n_rows)
101-
)
95+
input_names, output_names, metadata_names = asyncio.run(model_data.column_names())
10296

103-
input_names, output_names, metadata_names = loop.run_until_complete(
104-
model_data.column_names()
105-
)
106-
107-
finally:
108-
loop.close()
10997

11098
# Combine the data into a single dataframe
11199
df_data = {}
@@ -176,18 +164,8 @@ def get_metadata(self, model_id: str) -> StorageMetadata:
176164
try:
177165
model_data = ModelData(model_id)
178166

179-
loop = asyncio.new_event_loop()
180-
asyncio.set_event_loop(loop)
181-
182-
try:
183-
input_rows, output_rows, metadata_rows = loop.run_until_complete(
184-
model_data.row_counts()
185-
)
186-
input_names, output_names, metadata_names = loop.run_until_complete(
187-
model_data.column_names()
188-
)
189-
finally:
190-
loop.close()
167+
input_rows, output_rows, metadata_rows = asyncio.run(model_data.row_counts())
168+
input_names, output_names, metadata_names = asyncio.run(model_data.column_names())
191169

192170
input_items = {}
193171
for i, name in enumerate(input_names):
@@ -228,9 +206,9 @@ def has_metadata(self, model_id: str) -> bool:
228206
True if metadata exists, False otherwise
229207
"""
230208
try:
231-
self.get_metadata(model_id)
232-
return True
233-
except (StorageReadException, Exception):
209+
return self.get_metadata(model_id) is not None
210+
except Exception as e:
211+
logger.error(f"Error checking if metadata exists for model={model_id}: {str(e)}")
234212
return False
235213

236214
# DATAFRAME QUERIES

src/service/prometheus/prometheus_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def register(
168168
RequestReconciler.reconcile(request, self.data_source)
169169
with self._requests_lock:
170170
if metric_name not in self.requests:
171-
self.requests[metric_name] = defaultdict(dict)
171+
self.requests[metric_name] = {}
172172
self.requests[metric_name][id] = request
173173

174174
def delete(self, metric_name: str, id: uuid.UUID) -> None:

0 commit comments

Comments
 (0)