From dcfe0225d7caa9415d62f09614789df1363701b6 Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Mon, 29 Apr 2024 21:09:42 +0800 Subject: [PATCH 01/19] Add implementation for clearing cache table --- aidb_utilities/db_setup/clear_cache.py | 17 +++++++++++++++++ launch.py | 7 ++++++- 2 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 aidb_utilities/db_setup/clear_cache.py diff --git a/aidb_utilities/db_setup/clear_cache.py b/aidb_utilities/db_setup/clear_cache.py new file mode 100644 index 00000000..2597a9ef --- /dev/null +++ b/aidb_utilities/db_setup/clear_cache.py @@ -0,0 +1,17 @@ +from aidb.utils.logger import logger +from aidb.inference.bound_inference_service import CachedBoundInferenceService +from aidb.engine.engine import Engine +from sqlalchemy.sql import delete + +async def clear_ML_cache(engine: Engine): + ''' + Clear the cache table at start if the ML model has changed. + Delete the cache table for each service. + ''' + for service_binding in engine._config.inference_bindings: + if isinstance(service_binding, CachedBoundInferenceService): + async with service_binding._engine.begin() as conn: + stmt = delete(service_binding._cache_table) + await conn.execute(stmt) + else: + logger.debug(f"Service binding for {service_binding.service.name} is not cached") \ No newline at end of file diff --git a/launch.py b/launch.py index d2bceb93..4430c3ea 100644 --- a/launch.py +++ b/launch.py @@ -8,7 +8,7 @@ from aidb_utilities.db_setup.blob_table import BaseTablesSetup from aidb_utilities.db_setup.create_tables import create_output_tables from aidb.utils.asyncio import asyncio_run - +from aidb_utilities.db_setup.clear_cache import clear_ML_cache def setup_blob_tables(config): input_blobs = pd.read_csv(config.blobs_csv_file) @@ -22,6 +22,7 @@ def setup_blob_tables(config): parser.add_argument("--setup-blob-table", action='store_true') parser.add_argument("--setup-output-tables", action='store_true') parser.add_argument("--verbose", action='store_true') + parser.add_argument("--clear-cache", action='store_true') args = parser.parse_args() config = importlib.import_module(args.config) @@ -33,4 +34,8 @@ def setup_blob_tables(config): asyncio_run(create_output_tables(config.DB_URL, config.DB_NAME, config.tables)) aidb_engine = AIDB.from_config(args.config, args.verbose) + + if args.clear_cache: + asyncio_run(clear_ML_cache(aidb_engine)) + command_line_utility(aidb_engine) From 4fbf90abeca138f24dd5c5ca072c2ed3fb992230 Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Wed, 1 May 2024 22:06:38 +0800 Subject: [PATCH 02/19] Add tests for cache clearing & Fix bug that does not delete output --- aidb_utilities/db_setup/clear_cache.py | 4 ++ tests/tests_caching_logic.py | 59 ++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/aidb_utilities/db_setup/clear_cache.py b/aidb_utilities/db_setup/clear_cache.py index 2597a9ef..3764209a 100644 --- a/aidb_utilities/db_setup/clear_cache.py +++ b/aidb_utilities/db_setup/clear_cache.py @@ -13,5 +13,9 @@ async def clear_ML_cache(engine: Engine): async with service_binding._engine.begin() as conn: stmt = delete(service_binding._cache_table) await conn.execute(stmt) + tables = service_binding.get_tables(service_binding.binding.output_columns) + for table_name in tables: + stmt = delete(service_binding._tables[table_name]._table) + await conn.execute(stmt) else: logger.debug(f"Service binding for {service_binding.service.name} is not cached") \ No newline at end of file diff --git a/tests/tests_caching_logic.py b/tests/tests_caching_logic.py index f79539ee..a569857a 100644 --- a/tests/tests_caching_logic.py +++ b/tests/tests_caching_logic.py @@ -9,6 +9,8 @@ from tests.inference_service_utils.inference_service_setup import register_inference_services from tests.inference_service_utils.http_inference_service_setup import run_server from tests.utils import setup_gt_and_aidb_engine, setup_test_logger +from aidb.utils.asyncio import asyncio_run +from aidb_utilities.db_setup.clear_cache import clear_ML_cache setup_test_logger('caching_logic') @@ -64,7 +66,64 @@ async def test_num_infer_calls(self): del gt_engine del aidb_engine p.terminate() + + async def test_cache_clear(self): + dirname = os.path.dirname(__file__) + data_dir = os.path.join(dirname, 'data/jackson') + p = Process(target=run_server, args=[str(data_dir)]) + p.start() + time.sleep(1) + db_url_list = [POSTGRESQL_URL] + for db_url in db_url_list: + gt_engine, aidb_engine = await setup_gt_and_aidb_engine(db_url, data_dir) + + register_inference_services(aidb_engine, data_dir, batch_supported=False) + + queries = [ + ( + 'full_scan', + '''SELECT * FROM objects00 WHERE object_name='car' AND frame < 300;''', + '''SELECT * FROM objects00 WHERE object_name='car' AND frame < 300;''' + ), + ( + 'full_scan', + '''SELECT * FROM objects00 WHERE object_name='car' AND frame < 400;''', + '''SELECT * FROM objects00 WHERE object_name='car' AND frame < 400;''' + ), + ] + + # no service calls before executing query + assert aidb_engine._config.inference_services["objects00"].infer_one.calls == 0 + + calls = [[20, 40], [47, 74]] + # First 300 need 20 calls, 300 - 400 need 7 calls + for index, (query_type, aidb_query, exact_query) in enumerate(queries): + logger.info(f'Running query {exact_query} in ground truth database') + # Run the query on the ground truth database + async with gt_engine.begin() as conn: + gt_res = await conn.execute(text(exact_query)) + gt_res = gt_res.fetchall() + # Run the query on the aidb database + logger.info(f'Running initial query {aidb_query} in aidb database') + aidb_res = aidb_engine.execute(aidb_query) + assert len(gt_res) == len(aidb_res) + # running the same query, so number of inference calls should remain same + # temporarily commenting this out because we no longer call infer_one + assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][0], f"Wrong query count: Expected {calls[index][0]}, Actual {aidb_engine._config.inference_services["objects00"].infer_one.calls}" + logger.info(f'Running cached query {aidb_query} in aidb database') + aidb_res = aidb_engine.execute(aidb_query) + assert len(gt_res) == len(aidb_res) + # cleared cache, so the call count should accumulate as the first run + assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][0], f"Wrong query count: Expected {calls[index][0]}, Actual {aidb_engine._config.inference_services["objects00"].infer_one.calls}" + asyncio_run(clear_ML_cache(aidb_engine)) + logger.info(f'Running uncached query {aidb_query} in aidb database') + aidb_res = aidb_engine.execute(aidb_query) + assert len(gt_res) == len(aidb_res) + assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][1], f"Wrong query count: Expected {calls[index][1]}, Actual {aidb_engine._config.inference_services["objects00"].infer_one.calls}" + del gt_engine + del aidb_engine + p.terminate() if __name__ == '__main__': unittest.main() From 3400098c8f0e80395cf646da2788279b6d5d01cf Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Wed, 1 May 2024 22:10:13 +0800 Subject: [PATCH 03/19] Fix wrong quotation marks used --- tests/tests_caching_logic.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/tests_caching_logic.py b/tests/tests_caching_logic.py index a569857a..dba216ad 100644 --- a/tests/tests_caching_logic.py +++ b/tests/tests_caching_logic.py @@ -110,17 +110,17 @@ async def test_cache_clear(self): assert len(gt_res) == len(aidb_res) # running the same query, so number of inference calls should remain same # temporarily commenting this out because we no longer call infer_one - assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][0], f"Wrong query count: Expected {calls[index][0]}, Actual {aidb_engine._config.inference_services["objects00"].infer_one.calls}" + assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][0], f"Wrong query count: Expected {calls[index][0]}, Actual {aidb_engine._config.inference_services['objects00'].infer_one.calls}" logger.info(f'Running cached query {aidb_query} in aidb database') aidb_res = aidb_engine.execute(aidb_query) assert len(gt_res) == len(aidb_res) # cleared cache, so the call count should accumulate as the first run - assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][0], f"Wrong query count: Expected {calls[index][0]}, Actual {aidb_engine._config.inference_services["objects00"].infer_one.calls}" + assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][0], f"Wrong query count: Expected {calls[index][0]}, Actual {aidb_engine._config.inference_services['objects00'].infer_one.calls}" asyncio_run(clear_ML_cache(aidb_engine)) logger.info(f'Running uncached query {aidb_query} in aidb database') aidb_res = aidb_engine.execute(aidb_query) assert len(gt_res) == len(aidb_res) - assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][1], f"Wrong query count: Expected {calls[index][1]}, Actual {aidb_engine._config.inference_services["objects00"].infer_one.calls}" + assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][1], f"Wrong query count: Expected {calls[index][1]}, Actual {aidb_engine._config.inference_services['objects00'].infer_one.calls}" del gt_engine del aidb_engine p.terminate() From 35e5df26f0dde1658edca2af913efc951f501638 Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Wed, 1 May 2024 22:31:36 +0800 Subject: [PATCH 04/19] Fix: merge tests to single test --- tests/tests_caching_logic.py | 52 +++--------------------------------- 1 file changed, 3 insertions(+), 49 deletions(-) diff --git a/tests/tests_caching_logic.py b/tests/tests_caching_logic.py index dba216ad..15f875c3 100644 --- a/tests/tests_caching_logic.py +++ b/tests/tests_caching_logic.py @@ -24,53 +24,6 @@ async def test_num_infer_calls(self): dirname = os.path.dirname(__file__) data_dir = os.path.join(dirname, 'data/jackson') - p = Process(target=run_server, args=[str(data_dir)]) - p.start() - time.sleep(1) - db_url_list = [POSTGRESQL_URL] - for db_url in db_url_list: - gt_engine, aidb_engine = await setup_gt_and_aidb_engine(db_url, data_dir) - - register_inference_services(aidb_engine, data_dir, batch_supported=False) - - queries = [ - ( - 'full_scan', - '''SELECT * FROM objects00 WHERE object_name='car' AND frame < 300;''', - '''SELECT * FROM objects00 WHERE object_name='car' AND frame < 300;''' - ), - ( - 'full_scan', - '''SELECT * FROM objects00 WHERE object_name='car' AND frame < 400;''', - '''SELECT * FROM objects00 WHERE object_name='car' AND frame < 400;''' - ), - ] - - # no service calls before executing query - assert aidb_engine._config.inference_services["objects00"].infer_one.calls == 0 - - calls = [20, 27] - for index, (query_type, aidb_query, exact_query) in enumerate(queries): - logger.info(f'Running query {exact_query} in ground truth database') - # Run the query on the ground truth database - async with gt_engine.begin() as conn: - gt_res = await conn.execute(text(exact_query)) - gt_res = gt_res.fetchall() - # Run the query on the aidb database - logger.info(f'Running query {aidb_query} in aidb database') - aidb_res = aidb_engine.execute(aidb_query) - assert len(gt_res) == len(aidb_res) - # running the same query, so number of inference calls should remain same - # temporarily commenting this out because we no longer call infer_one - assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index] - del gt_engine - del aidb_engine - p.terminate() - - async def test_cache_clear(self): - dirname = os.path.dirname(__file__) - data_dir = os.path.join(dirname, 'data/jackson') - p = Process(target=run_server, args=[str(data_dir)]) p.start() time.sleep(1) @@ -97,7 +50,7 @@ async def test_cache_clear(self): assert aidb_engine._config.inference_services["objects00"].infer_one.calls == 0 calls = [[20, 40], [47, 74]] - # First 300 need 20 calls, 300 - 400 need 7 calls + # First 300 need 20 calls, 300 to 400 need 7 calls for index, (query_type, aidb_query, exact_query) in enumerate(queries): logger.info(f'Running query {exact_query} in ground truth database') # Run the query on the ground truth database @@ -114,12 +67,13 @@ async def test_cache_clear(self): logger.info(f'Running cached query {aidb_query} in aidb database') aidb_res = aidb_engine.execute(aidb_query) assert len(gt_res) == len(aidb_res) - # cleared cache, so the call count should accumulate as the first run + # run again, because cache exists, there should be no new calls assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][0], f"Wrong query count: Expected {calls[index][0]}, Actual {aidb_engine._config.inference_services['objects00'].infer_one.calls}" asyncio_run(clear_ML_cache(aidb_engine)) logger.info(f'Running uncached query {aidb_query} in aidb database') aidb_res = aidb_engine.execute(aidb_query) assert len(gt_res) == len(aidb_res) + # cleared cache, so should accumulate new calls same as the first call assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][1], f"Wrong query count: Expected {calls[index][1]}, Actual {aidb_engine._config.inference_services['objects00'].infer_one.calls}" del gt_engine del aidb_engine From c091cb577c244b41914326d6d89ca44432933b96 Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Thu, 2 May 2024 15:30:23 +0800 Subject: [PATCH 05/19] Fix code style & Add topological sorting for delete --- aidb_utilities/db_setup/clear_cache.py | 70 +++++++++++++++++++------- launch.py | 8 +-- tests/tests_caching_logic.py | 21 ++++---- 3 files changed, 69 insertions(+), 30 deletions(-) diff --git a/aidb_utilities/db_setup/clear_cache.py b/aidb_utilities/db_setup/clear_cache.py index 3764209a..42446d02 100644 --- a/aidb_utilities/db_setup/clear_cache.py +++ b/aidb_utilities/db_setup/clear_cache.py @@ -1,21 +1,55 @@ -from aidb.utils.logger import logger -from aidb.inference.bound_inference_service import CachedBoundInferenceService -from aidb.engine.engine import Engine +from collections import deque + +from sqlalchemy.schema import ForeignKeyConstraint from sqlalchemy.sql import delete +from aidb.engine.engine import Engine +from aidb.inference.bound_inference_service import CachedBoundInferenceService +from aidb.utils.logger import logger + + async def clear_ML_cache(engine: Engine): - ''' - Clear the cache table at start if the ML model has changed. - Delete the cache table for each service. - ''' - for service_binding in engine._config.inference_bindings: - if isinstance(service_binding, CachedBoundInferenceService): - async with service_binding._engine.begin() as conn: - stmt = delete(service_binding._cache_table) - await conn.execute(stmt) - tables = service_binding.get_tables(service_binding.binding.output_columns) - for table_name in tables: - stmt = delete(service_binding._tables[table_name]._table) - await conn.execute(stmt) - else: - logger.debug(f"Service binding for {service_binding.service.name} is not cached") \ No newline at end of file + ''' + Clear the cache table at start if the ML model has changed. + Delete the cache table for each service. + ''' + for inference_binding in engine._config.inference_bindings: + if isinstance(inference_binding, CachedBoundInferenceService): + async with inference_binding._engine.begin() as conn: + tables_to_delete = inference_binding.get_tables(inference_binding.binding.output_columns) + [inference_binding._cache_table_name] + fk_ref_counts = {table_name: 0 for table_name in tables_to_delete} + for table_name in tables_to_delete: + if table_name == inference_binding._cache_table_name: + table = inference_binding._cache_table + else: + table = inference_binding._tables[table_name]._table + for constraint in table.constraints: + if isinstance(constraint, ForeignKeyConstraint): + for fk in constraint.elements: + fk_ref_table_name = fk.column.table.name + if fk_ref_table_name in fk_ref_counts: + fk_ref_counts[fk_ref_table_name] += 1 + + delete_queue = deque() + for table_name, ref_count in fk_ref_counts.items(): + if ref_count == 0: + delete_queue.append(table_name) + + while delete_queue: + table_to_delete_name = delete_queue.popleft() + if table_to_delete_name == inference_binding._cache_table_name: + table_to_delete = inference_binding._cache_table + else: + table_to_delete = inference_binding._tables[table_to_delete_name]._table + + for constraint in table_to_delete.constraints: + if isinstance(constraint, ForeignKeyConstraint): + for fk in constraint.elements: + fk_ref_table_name = fk.column.table.name + if fk_ref_table_name in fk_ref_counts: + fk_ref_counts[fk_ref_table_name] -= 1 + if fk_ref_counts[fk_ref_table_name] == 0: + delete_queue.append(fk_ref_table_name) + await conn.execute(delete(table_to_delete)) + else: + logger.debug(f"Service binding for {inference_binding.service.name} is not cached") \ No newline at end of file diff --git a/launch.py b/launch.py index 4430c3ea..4d27945d 100644 --- a/launch.py +++ b/launch.py @@ -3,12 +3,14 @@ import pandas as pd -from aidb_utilities.command_line_setup.command_line_setup import command_line_utility +from aidb.utils.asyncio import asyncio_run from aidb_utilities.aidb_setup.aidb_factory import AIDB +from aidb_utilities.command_line_setup.command_line_setup import \ + command_line_utility from aidb_utilities.db_setup.blob_table import BaseTablesSetup -from aidb_utilities.db_setup.create_tables import create_output_tables -from aidb.utils.asyncio import asyncio_run from aidb_utilities.db_setup.clear_cache import clear_ML_cache +from aidb_utilities.db_setup.create_tables import create_output_tables + def setup_blob_tables(config): input_blobs = pd.read_csv(config.blobs_csv_file) diff --git a/tests/tests_caching_logic.py b/tests/tests_caching_logic.py index 15f875c3..793164aa 100644 --- a/tests/tests_caching_logic.py +++ b/tests/tests_caching_logic.py @@ -1,16 +1,19 @@ -from multiprocessing import Process import os -from sqlalchemy.sql import text import time import unittest +from multiprocessing import Process from unittest import IsolatedAsyncioTestCase -from aidb.utils.logger import logger -from tests.inference_service_utils.inference_service_setup import register_inference_services -from tests.inference_service_utils.http_inference_service_setup import run_server -from tests.utils import setup_gt_and_aidb_engine, setup_test_logger +from sqlalchemy.sql import text + from aidb.utils.asyncio import asyncio_run +from aidb.utils.logger import logger from aidb_utilities.db_setup.clear_cache import clear_ML_cache +from tests.inference_service_utils.http_inference_service_setup import \ + run_server +from tests.inference_service_utils.inference_service_setup import \ + register_inference_services +from tests.utils import setup_gt_and_aidb_engine, setup_test_logger setup_test_logger('caching_logic') @@ -63,18 +66,18 @@ async def test_num_infer_calls(self): assert len(gt_res) == len(aidb_res) # running the same query, so number of inference calls should remain same # temporarily commenting this out because we no longer call infer_one - assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][0], f"Wrong query count: Expected {calls[index][0]}, Actual {aidb_engine._config.inference_services['objects00'].infer_one.calls}" + assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][0] logger.info(f'Running cached query {aidb_query} in aidb database') aidb_res = aidb_engine.execute(aidb_query) assert len(gt_res) == len(aidb_res) # run again, because cache exists, there should be no new calls - assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][0], f"Wrong query count: Expected {calls[index][0]}, Actual {aidb_engine._config.inference_services['objects00'].infer_one.calls}" + assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][0] asyncio_run(clear_ML_cache(aidb_engine)) logger.info(f'Running uncached query {aidb_query} in aidb database') aidb_res = aidb_engine.execute(aidb_query) assert len(gt_res) == len(aidb_res) # cleared cache, so should accumulate new calls same as the first call - assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][1], f"Wrong query count: Expected {calls[index][1]}, Actual {aidb_engine._config.inference_services['objects00'].infer_one.calls}" + assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][1] del gt_engine del aidb_engine p.terminate() From ccc3a733e92b615ada35724e72fed9a78618aeaf Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Fri, 3 May 2024 23:14:42 +0800 Subject: [PATCH 06/19] Refactor code using networkx --- aidb_utilities/db_setup/clear_cache.py | 37 ++++++++++---------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/aidb_utilities/db_setup/clear_cache.py b/aidb_utilities/db_setup/clear_cache.py index 42446d02..76f73cab 100644 --- a/aidb_utilities/db_setup/clear_cache.py +++ b/aidb_utilities/db_setup/clear_cache.py @@ -1,5 +1,7 @@ from collections import deque +import networkx as nx + from sqlalchemy.schema import ForeignKeyConstraint from sqlalchemy.sql import delete @@ -16,8 +18,10 @@ async def clear_ML_cache(engine: Engine): for inference_binding in engine._config.inference_bindings: if isinstance(inference_binding, CachedBoundInferenceService): async with inference_binding._engine.begin() as conn: + fk_ref_graph = nx.DiGraph() + fk_ref_graph.add_nodes_from(inference_binding.get_tables(inference_binding.binding.output_columns)) + fk_ref_graph.add_node(inference_binding._cache_table_name) tables_to_delete = inference_binding.get_tables(inference_binding.binding.output_columns) + [inference_binding._cache_table_name] - fk_ref_counts = {table_name: 0 for table_name in tables_to_delete} for table_name in tables_to_delete: if table_name == inference_binding._cache_table_name: table = inference_binding._cache_table @@ -27,29 +31,16 @@ async def clear_ML_cache(engine: Engine): if isinstance(constraint, ForeignKeyConstraint): for fk in constraint.elements: fk_ref_table_name = fk.column.table.name - if fk_ref_table_name in fk_ref_counts: - fk_ref_counts[fk_ref_table_name] += 1 - - delete_queue = deque() - for table_name, ref_count in fk_ref_counts.items(): - if ref_count == 0: - delete_queue.append(table_name) + if fk_ref_graph.has_node(fk_ref_table_name): + fk_ref_graph.add_edge(table_name,fk_ref_table_name) - while delete_queue: - table_to_delete_name = delete_queue.popleft() - if table_to_delete_name == inference_binding._cache_table_name: - table_to_delete = inference_binding._cache_table + table_order = nx.topological_sort(fk_ref_graph) + for table_name in table_order: + if table_name == inference_binding._cache_table_name: + table = inference_binding._cache_table else: - table_to_delete = inference_binding._tables[table_to_delete_name]._table - - for constraint in table_to_delete.constraints: - if isinstance(constraint, ForeignKeyConstraint): - for fk in constraint.elements: - fk_ref_table_name = fk.column.table.name - if fk_ref_table_name in fk_ref_counts: - fk_ref_counts[fk_ref_table_name] -= 1 - if fk_ref_counts[fk_ref_table_name] == 0: - delete_queue.append(fk_ref_table_name) - await conn.execute(delete(table_to_delete)) + table = inference_binding._tables[table_name]._table + await conn.execute(delete(table)) + else: logger.debug(f"Service binding for {inference_binding.service.name} is not cached") \ No newline at end of file From e24e2578632d564660d05d86a5d901f702862825 Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Sat, 4 May 2024 00:47:27 +0800 Subject: [PATCH 07/19] Change order of getting tables and build graph --- aidb_utilities/db_setup/clear_cache.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/aidb_utilities/db_setup/clear_cache.py b/aidb_utilities/db_setup/clear_cache.py index 76f73cab..d60c3e75 100644 --- a/aidb_utilities/db_setup/clear_cache.py +++ b/aidb_utilities/db_setup/clear_cache.py @@ -1,7 +1,6 @@ from collections import deque import networkx as nx - from sqlalchemy.schema import ForeignKeyConstraint from sqlalchemy.sql import delete @@ -18,10 +17,9 @@ async def clear_ML_cache(engine: Engine): for inference_binding in engine._config.inference_bindings: if isinstance(inference_binding, CachedBoundInferenceService): async with inference_binding._engine.begin() as conn: - fk_ref_graph = nx.DiGraph() - fk_ref_graph.add_nodes_from(inference_binding.get_tables(inference_binding.binding.output_columns)) - fk_ref_graph.add_node(inference_binding._cache_table_name) tables_to_delete = inference_binding.get_tables(inference_binding.binding.output_columns) + [inference_binding._cache_table_name] + fk_ref_graph = nx.DiGraph() + fk_ref_graph.add_nodes_from(tables_to_delete) for table_name in tables_to_delete: if table_name == inference_binding._cache_table_name: table = inference_binding._cache_table @@ -31,7 +29,7 @@ async def clear_ML_cache(engine: Engine): if isinstance(constraint, ForeignKeyConstraint): for fk in constraint.elements: fk_ref_table_name = fk.column.table.name - if fk_ref_graph.has_node(fk_ref_table_name): + if fk_ref_table_name in fk_ref_graph: fk_ref_graph.add_edge(table_name,fk_ref_table_name) table_order = nx.topological_sort(fk_ref_graph) From ab5fc3a49c66a65f0cbd9cfc3817c4ed6ce63ed0 Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Wed, 8 May 2024 10:48:51 +0800 Subject: [PATCH 08/19] Modify code style * Move the clear cache function to engine * Function name change * Add code logic comment --- aidb/engine/engine.py | 46 ++++++++++++++++++++++++-- aidb_utilities/db_setup/clear_cache.py | 44 ------------------------ tests/tests_caching_logic.py | 3 +- 3 files changed, 45 insertions(+), 48 deletions(-) delete mode 100644 aidb_utilities/db_setup/clear_cache.py diff --git a/aidb/engine/engine.py b/aidb/engine/engine.py index 04738aac..416b16f8 100644 --- a/aidb/engine/engine.py +++ b/aidb/engine/engine.py @@ -1,9 +1,16 @@ -from aidb.engine.approx_aggregate_join_engine import ApproximateAggregateJoinEngine +import networkx as nx +from sqlalchemy.schema import ForeignKeyConstraint +from sqlalchemy.sql import delete + +from aidb.engine.approx_aggregate_join_engine import \ + ApproximateAggregateJoinEngine from aidb.engine.approx_select_engine import ApproxSelectEngine from aidb.engine.limit_engine import LimitEngine from aidb.engine.non_select_query_engine import NonSelectQueryEngine -from aidb.utils.asyncio import asyncio_run +from aidb.inference.bound_inference_service import CachedBoundInferenceService from aidb.query.query import Query +from aidb.utils.asyncio import asyncio_run +from aidb.utils.logger import logger class Engine(LimitEngine, NonSelectQueryEngine, ApproxSelectEngine, ApproximateAggregateJoinEngine): @@ -42,3 +49,38 @@ def execute(self, query: str, **kwargs): raise e finally: self.__del__() + + async def clear_ml_cache(self): + ''' + Clear the cache and output table if the ML model has changed. + For each cached inference service, build the reference graph of the tables based on fk constraints, + and then delete the tables following the graph's topological order to maintain integrity during deletion. + ''' + for inference_binding in self._config.inference_bindings: + if isinstance(inference_binding, CachedBoundInferenceService): + async with inference_binding._engine.begin() as conn: + tables_to_delete = inference_binding.get_tables(inference_binding.binding.output_columns) + [inference_binding._cache_table_name] + fk_ref_graph = nx.DiGraph() + fk_ref_graph.add_nodes_from(tables_to_delete) + for table_name in tables_to_delete: + if table_name == inference_binding._cache_table_name: + table = inference_binding._cache_table + else: + table = inference_binding._tables[table_name]._table + for constraint in table.constraints: + if isinstance(constraint, ForeignKeyConstraint): + for fk in constraint.elements: + fk_ref_table_name = fk.column.table.name + if fk_ref_table_name in fk_ref_graph: + fk_ref_graph.add_edge(table_name,fk_ref_table_name) + + table_order = nx.topological_sort(fk_ref_graph) + for table_name in table_order: + if table_name == inference_binding._cache_table_name: + table = inference_binding._cache_table + else: + table = inference_binding._tables[table_name]._table + await conn.execute(delete(table)) + + else: + logger.debug(f"Service binding for {inference_binding.service.name} is not cached") \ No newline at end of file diff --git a/aidb_utilities/db_setup/clear_cache.py b/aidb_utilities/db_setup/clear_cache.py deleted file mode 100644 index d60c3e75..00000000 --- a/aidb_utilities/db_setup/clear_cache.py +++ /dev/null @@ -1,44 +0,0 @@ -from collections import deque - -import networkx as nx -from sqlalchemy.schema import ForeignKeyConstraint -from sqlalchemy.sql import delete - -from aidb.engine.engine import Engine -from aidb.inference.bound_inference_service import CachedBoundInferenceService -from aidb.utils.logger import logger - - -async def clear_ML_cache(engine: Engine): - ''' - Clear the cache table at start if the ML model has changed. - Delete the cache table for each service. - ''' - for inference_binding in engine._config.inference_bindings: - if isinstance(inference_binding, CachedBoundInferenceService): - async with inference_binding._engine.begin() as conn: - tables_to_delete = inference_binding.get_tables(inference_binding.binding.output_columns) + [inference_binding._cache_table_name] - fk_ref_graph = nx.DiGraph() - fk_ref_graph.add_nodes_from(tables_to_delete) - for table_name in tables_to_delete: - if table_name == inference_binding._cache_table_name: - table = inference_binding._cache_table - else: - table = inference_binding._tables[table_name]._table - for constraint in table.constraints: - if isinstance(constraint, ForeignKeyConstraint): - for fk in constraint.elements: - fk_ref_table_name = fk.column.table.name - if fk_ref_table_name in fk_ref_graph: - fk_ref_graph.add_edge(table_name,fk_ref_table_name) - - table_order = nx.topological_sort(fk_ref_graph) - for table_name in table_order: - if table_name == inference_binding._cache_table_name: - table = inference_binding._cache_table - else: - table = inference_binding._tables[table_name]._table - await conn.execute(delete(table)) - - else: - logger.debug(f"Service binding for {inference_binding.service.name} is not cached") \ No newline at end of file diff --git a/tests/tests_caching_logic.py b/tests/tests_caching_logic.py index 793164aa..c484c471 100644 --- a/tests/tests_caching_logic.py +++ b/tests/tests_caching_logic.py @@ -8,7 +8,6 @@ from aidb.utils.asyncio import asyncio_run from aidb.utils.logger import logger -from aidb_utilities.db_setup.clear_cache import clear_ML_cache from tests.inference_service_utils.http_inference_service_setup import \ run_server from tests.inference_service_utils.inference_service_setup import \ @@ -72,7 +71,7 @@ async def test_num_infer_calls(self): assert len(gt_res) == len(aidb_res) # run again, because cache exists, there should be no new calls assert aidb_engine._config.inference_services["objects00"].infer_one.calls == calls[index][0] - asyncio_run(clear_ML_cache(aidb_engine)) + asyncio_run(aidb_engine.clear_ml_cache()) logger.info(f'Running uncached query {aidb_query} in aidb database') aidb_res = aidb_engine.execute(aidb_query) assert len(gt_res) == len(aidb_res) From 504da9e77e1ac076af43e06cac6411f5ec8b9fb5 Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Thu, 9 May 2024 23:25:54 +0800 Subject: [PATCH 09/19] Adding test for seperate service cache cleaning * Add deletion for services seperately * Fix launch.py * Add test to check whether only cache for one service is deleted --- aidb/engine/engine.py | 14 ++++++++--- launch.py | 7 +++--- tests/tests_caching_logic.py | 49 ++++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 7 deletions(-) diff --git a/aidb/engine/engine.py b/aidb/engine/engine.py index 416b16f8..e2b297cb 100644 --- a/aidb/engine/engine.py +++ b/aidb/engine/engine.py @@ -50,13 +50,15 @@ def execute(self, query: str, **kwargs): finally: self.__del__() - async def clear_ml_cache(self): + async def clear_ml_cache(self, service_name_list: list[str] | None = None): ''' Clear the cache and output table if the ML model has changed. For each cached inference service, build the reference graph of the tables based on fk constraints, and then delete the tables following the graph's topological order to maintain integrity during deletion. + service_name_list: the name of all the changed services. + If the service name list is not given, the output for all the services will be cleared. ''' - for inference_binding in self._config.inference_bindings: + async def clear_service_cache(inference_binding): if isinstance(inference_binding, CachedBoundInferenceService): async with inference_binding._engine.begin() as conn: tables_to_delete = inference_binding.get_tables(inference_binding.binding.output_columns) + [inference_binding._cache_table_name] @@ -83,4 +85,10 @@ async def clear_ml_cache(self): await conn.execute(delete(table)) else: - logger.debug(f"Service binding for {inference_binding.service.name} is not cached") \ No newline at end of file + logger.debug(f"Service binding for {inference_binding.service.name} is not cached") + + if service_name_list is not None: + service_name_set = set(service_name_list) + for inference_binding in self._config.inference_bindings: + if service_name_list is None or inference_binding.service.name in service_name_set: + await clear_service_cache(inference_binding) \ No newline at end of file diff --git a/launch.py b/launch.py index 4d27945d..291d7308 100644 --- a/launch.py +++ b/launch.py @@ -8,7 +8,6 @@ from aidb_utilities.command_line_setup.command_line_setup import \ command_line_utility from aidb_utilities.db_setup.blob_table import BaseTablesSetup -from aidb_utilities.db_setup.clear_cache import clear_ML_cache from aidb_utilities.db_setup.create_tables import create_output_tables @@ -24,7 +23,7 @@ def setup_blob_tables(config): parser.add_argument("--setup-blob-table", action='store_true') parser.add_argument("--setup-output-tables", action='store_true') parser.add_argument("--verbose", action='store_true') - parser.add_argument("--clear-cache", action='store_true') + parser.add_argument("--clear-cache", nargs='*') args = parser.parse_args() config = importlib.import_module(args.config) @@ -37,7 +36,7 @@ def setup_blob_tables(config): aidb_engine = AIDB.from_config(args.config, args.verbose) - if args.clear_cache: - asyncio_run(clear_ML_cache(aidb_engine)) + if args.clear_cache is not None: + asyncio_run(aidb_engine.clear_ml_cache(None if len(args.clear_cache) == 0 else args.clear_cache)) command_line_utility(aidb_engine) diff --git a/tests/tests_caching_logic.py b/tests/tests_caching_logic.py index c484c471..f16227fa 100644 --- a/tests/tests_caching_logic.py +++ b/tests/tests_caching_logic.py @@ -81,5 +81,54 @@ async def test_num_infer_calls(self): del aidb_engine p.terminate() + async def test_only_one_service_deleted(self): + ''' + Testing whether cache for other service remains when only one service is deleted. + Do query on two different services first. Then delete cache for one service. + Finally do query on these services again and check whether the call count changes. + ''' + dirname = os.path.dirname(__file__) + data_dir = os.path.join(dirname, 'data/jackson') + + p = Process(target=run_server, args=[str(data_dir)]) + p.start() + time.sleep(1) + db_url_list = [POSTGRESQL_URL] + + for db_url in db_url_list: + gt_engine, aidb_engine = await setup_gt_and_aidb_engine(db_url, data_dir) + + register_inference_services(aidb_engine, data_dir, batch_supported=False) + + queries = [ + '''SELECT * FROM objects00 WHERE object_name='car' AND frame < 300;''', + '''SELECT * FROM lights01 WHERE light_1='red' AND frame < 300;''' + ] + + # no service calls before executing query + assert aidb_engine._config.inference_services["objects00"].infer_one.calls == 0 + assert aidb_engine._config.inference_services["lights01"].infer_one.calls == 0 + + for index, aidb_query in enumerate(queries): + # Run the query on the aidb database + logger.info(f'Running initial query {aidb_query} in aidb database') + aidb_engine.execute(aidb_query) + + initial_objects00_calls = aidb_engine._config.inference_services["objects00"].infer_one.calls + initial_lights01_calls = aidb_engine._config.inference_services["lights01"].infer_one.calls + + asyncio_run(aidb_engine.clear_ml_cache(["objects00"])) + for index, aidb_query in enumerate(queries): + # Run the query on the aidb database + logger.info(f'Running query {aidb_query} in aidb database after clearing cache for one service') + aidb_engine.execute(aidb_query) + + assert initial_objects00_calls != aidb_engine._config.inference_services["objects00"].infer_one.calls + assert initial_lights01_calls == aidb_engine._config.inference_services["lights01"].infer_one.calls + + del gt_engine + del aidb_engine + p.terminate() + if __name__ == '__main__': unittest.main() From a845a00c610c204fddedcf9a28fc8ba0e6332ab4 Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Fri, 10 May 2024 07:32:23 +0800 Subject: [PATCH 10/19] Fix problems causing test failure * Change function param declaration to be compatible with python 3.8 * Change call count logic since all the inference service use the same counter * Add join() function to terminate the test server completely --- aidb/engine/engine.py | 12 ++++++-- tests/tests_caching_logic.py | 60 ++++++++++++++++++++++++------------ 2 files changed, 50 insertions(+), 22 deletions(-) diff --git a/aidb/engine/engine.py b/aidb/engine/engine.py index e2b297cb..2ffd67de 100644 --- a/aidb/engine/engine.py +++ b/aidb/engine/engine.py @@ -50,12 +50,13 @@ def execute(self, query: str, **kwargs): finally: self.__del__() - async def clear_ml_cache(self, service_name_list: list[str] | None = None): + async def clear_ml_cache(self, service_name_list = None): ''' Clear the cache and output table if the ML model has changed. For each cached inference service, build the reference graph of the tables based on fk constraints, and then delete the tables following the graph's topological order to maintain integrity during deletion. - service_name_list: the name of all the changed services. + + service_name_list: the name of all the changed services. Should be a list of str or None. If the service name list is not given, the output for all the services will be cleared. ''' async def clear_service_cache(inference_binding): @@ -91,4 +92,9 @@ async def clear_service_cache(inference_binding): service_name_set = set(service_name_list) for inference_binding in self._config.inference_bindings: if service_name_list is None or inference_binding.service.name in service_name_set: - await clear_service_cache(inference_binding) \ No newline at end of file + logger.debug(f"Clearing cache for service {inference_binding.service.name}") + await clear_service_cache(inference_binding) + if service_name_list is not None: + service_name_set.remove(inference_binding.service.name) + + logger.warning(f"Service binding {service_name_set} are not found.") \ No newline at end of file diff --git a/tests/tests_caching_logic.py b/tests/tests_caching_logic.py index f16227fa..87cd465c 100644 --- a/tests/tests_caching_logic.py +++ b/tests/tests_caching_logic.py @@ -80,12 +80,13 @@ async def test_num_infer_calls(self): del gt_engine del aidb_engine p.terminate() + p.join() async def test_only_one_service_deleted(self): ''' Testing whether cache for other service remains when only one service is deleted. Do query on two different services first. Then delete cache for one service. - Finally do query on these services again and check whether the call count changes. + Finally do query on these services again and check whether the call count is correct. ''' dirname = os.path.dirname(__file__) data_dir = os.path.join(dirname, 'data/jackson') @@ -97,38 +98,59 @@ async def test_only_one_service_deleted(self): for db_url in db_url_list: gt_engine, aidb_engine = await setup_gt_and_aidb_engine(db_url, data_dir) - register_inference_services(aidb_engine, data_dir, batch_supported=False) - + queries = [ - '''SELECT * FROM objects00 WHERE object_name='car' AND frame < 300;''', + ( + 'full_scan', + '''SELECT * FROM lights01 WHERE light_1='red' AND frame < 300;''', '''SELECT * FROM lights01 WHERE light_1='red' AND frame < 300;''' + ), + ( + 'full_scan', + '''SELECT * FROM counts03 WHERE count = 1 AND frame < 300;''', + '''SELECT * FROM counts03 WHERE count = 1 AND frame < 300;''' + ), ] # no service calls before executing query - assert aidb_engine._config.inference_services["objects00"].infer_one.calls == 0 - assert aidb_engine._config.inference_services["lights01"].infer_one.calls == 0 - - for index, aidb_query in enumerate(queries): + # all the infer_one call use the same counter, so checking only one of them should be enough + assert aidb_engine._config.inference_services["counts03"].infer_one.calls == 0 + + calls = [[20, 40], [60, 60]] + # each query calls 20 inference. + # For the first two queries, all the inference is needed. + # The third query need to infer again but the fourth don't. + for index, (query_type, aidb_query, exact_query) in enumerate(queries): # Run the query on the aidb database + logger.info(f'Running query {exact_query} in ground truth database') + # Run the query on the ground truth database + async with gt_engine.begin() as conn: + gt_res = await conn.execute(text(exact_query)) + gt_res = gt_res.fetchall() logger.info(f'Running initial query {aidb_query} in aidb database') - aidb_engine.execute(aidb_query) - - initial_objects00_calls = aidb_engine._config.inference_services["objects00"].infer_one.calls - initial_lights01_calls = aidb_engine._config.inference_services["lights01"].infer_one.calls + aidb_res = aidb_engine.execute(aidb_query) + assert len(gt_res) == len(aidb_res) + assert aidb_engine._config.inference_services["counts03"].infer_one.calls == calls[0][index] - asyncio_run(aidb_engine.clear_ml_cache(["objects00"])) - for index, aidb_query in enumerate(queries): - # Run the query on the aidb database - logger.info(f'Running query {aidb_query} in aidb database after clearing cache for one service') - aidb_engine.execute(aidb_query) + asyncio_run(aidb_engine.clear_ml_cache(["lights01"])) - assert initial_objects00_calls != aidb_engine._config.inference_services["objects00"].infer_one.calls - assert initial_lights01_calls == aidb_engine._config.inference_services["lights01"].infer_one.calls + for index, (query_type, aidb_query, exact_query) in enumerate(queries): + # Run the query on the aidb database + logger.info(f'Running query {exact_query} in ground truth database') + # Run the query on the ground truth database + async with gt_engine.begin() as conn: + gt_res = await conn.execute(text(exact_query)) + gt_res = gt_res.fetchall() + logger.info(f'Running query {aidb_query} in aidb database after cache deleted') + aidb_res = aidb_engine.execute(aidb_query) + assert len(gt_res) == len(aidb_res) + assert aidb_engine._config.inference_services["counts03"].infer_one.calls == calls[1][index] del gt_engine del aidb_engine p.terminate() + p.join() if __name__ == '__main__': unittest.main() From 49ad1e1576837442e14dd7b4a00bfbeebfc5543d Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Fri, 10 May 2024 08:00:45 +0800 Subject: [PATCH 11/19] Fix bugs occured in multiple tests * Clear cache before test run * Set count target corresponding to initial call count --- tests/tests_caching_logic.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/tests/tests_caching_logic.py b/tests/tests_caching_logic.py index 87cd465c..9100c031 100644 --- a/tests/tests_caching_logic.py +++ b/tests/tests_caching_logic.py @@ -47,11 +47,15 @@ async def test_num_infer_calls(self): '''SELECT * FROM objects00 WHERE object_name='car' AND frame < 400;''' ), ] + + # Get the initial call count since inference services may be called by other tests before + initial_infer_one_calls = aidb_engine._config.inference_services["objects00"].infer_one.calls + + # May have cache before test so clear them + aidb_engine.clear_ml_cache() - # no service calls before executing query - assert aidb_engine._config.inference_services["objects00"].infer_one.calls == 0 - - calls = [[20, 40], [47, 74]] + calls = [[initial_infer_one_calls + 20, initial_infer_one_calls + 40], + [initial_infer_one_calls + 47, initial_infer_one_calls + 74]] # First 300 need 20 calls, 300 to 400 need 7 calls for index, (query_type, aidb_query, exact_query) in enumerate(queries): logger.info(f'Running query {exact_query} in ground truth database') @@ -80,7 +84,6 @@ async def test_num_infer_calls(self): del gt_engine del aidb_engine p.terminate() - p.join() async def test_only_one_service_deleted(self): ''' @@ -113,11 +116,15 @@ async def test_only_one_service_deleted(self): ), ] - # no service calls before executing query + # Get the initial call count since inference services may be called by other tests before # all the infer_one call use the same counter, so checking only one of them should be enough - assert aidb_engine._config.inference_services["counts03"].infer_one.calls == 0 - - calls = [[20, 40], [60, 60]] + initial_infer_one_calls = aidb_engine._config.inference_services["counts03"].infer_one.calls + + # May have cache before test so clear them + aidb_engine.clear_ml_cache() + + calls = [[initial_infer_one_calls + 20, initial_infer_one_calls + 40], + [initial_infer_one_calls + 60, initial_infer_one_calls + 60]] # each query calls 20 inference. # For the first two queries, all the inference is needed. # The third query need to infer again but the fourth don't. @@ -150,7 +157,6 @@ async def test_only_one_service_deleted(self): del gt_engine del aidb_engine p.terminate() - p.join() if __name__ == '__main__': unittest.main() From ae8ec23ec02d1e3f6757afcb5c86b8eb04eab97a Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Fri, 10 May 2024 08:02:38 +0800 Subject: [PATCH 12/19] Fix typo when clearing cache * run cache clearing using asyncio --- tests/tests_caching_logic.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/tests_caching_logic.py b/tests/tests_caching_logic.py index 9100c031..61bc5f57 100644 --- a/tests/tests_caching_logic.py +++ b/tests/tests_caching_logic.py @@ -52,7 +52,7 @@ async def test_num_infer_calls(self): initial_infer_one_calls = aidb_engine._config.inference_services["objects00"].infer_one.calls # May have cache before test so clear them - aidb_engine.clear_ml_cache() + asyncio_run(aidb_engine.clear_ml_cache()) calls = [[initial_infer_one_calls + 20, initial_infer_one_calls + 40], [initial_infer_one_calls + 47, initial_infer_one_calls + 74]] @@ -121,7 +121,7 @@ async def test_only_one_service_deleted(self): initial_infer_one_calls = aidb_engine._config.inference_services["counts03"].infer_one.calls # May have cache before test so clear them - aidb_engine.clear_ml_cache() + asyncio_run(aidb_engine.clear_ml_cache()) calls = [[initial_infer_one_calls + 20, initial_infer_one_calls + 40], [initial_infer_one_calls + 60, initial_infer_one_calls + 60]] From 76d84d101005be364cba5a518d538538fe4ff28a Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Fri, 10 May 2024 08:06:31 +0800 Subject: [PATCH 13/19] Fix typo in engine * Fix typo --- aidb/engine/engine.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aidb/engine/engine.py b/aidb/engine/engine.py index 2ffd67de..25658738 100644 --- a/aidb/engine/engine.py +++ b/aidb/engine/engine.py @@ -96,5 +96,6 @@ async def clear_service_cache(inference_binding): await clear_service_cache(inference_binding) if service_name_list is not None: service_name_set.remove(inference_binding.service.name) - - logger.warning(f"Service binding {service_name_set} are not found.") \ No newline at end of file + + if service_name_list is not None and len(service_name_set) != 0: + logger.warning(f"Service binding {service_name_set} are not found.") \ No newline at end of file From 1962e0263cc1976d45532ed8cdcf642aa1573db5 Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Fri, 10 May 2024 08:12:17 +0800 Subject: [PATCH 14/19] Fix: close server completely * Add join() and sleep to make sure the server terminates --- tests/tests_caching_logic.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/tests_caching_logic.py b/tests/tests_caching_logic.py index 61bc5f57..7eb2bb3a 100644 --- a/tests/tests_caching_logic.py +++ b/tests/tests_caching_logic.py @@ -84,6 +84,8 @@ async def test_num_infer_calls(self): del gt_engine del aidb_engine p.terminate() + p.join() + time.sleep(1) async def test_only_one_service_deleted(self): ''' @@ -157,6 +159,8 @@ async def test_only_one_service_deleted(self): del gt_engine del aidb_engine p.terminate() + p.join() + time.sleep(1) if __name__ == '__main__': unittest.main() From 195c8cf0a23cb6da028fc44d28efbb77a1a2a41e Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Tue, 14 May 2024 23:12:11 +0800 Subject: [PATCH 15/19] Refactor cache clearing * Use the existing inference topological order as delete order --- aidb/engine/engine.py | 61 ++++++++++++------------------------------- 1 file changed, 17 insertions(+), 44 deletions(-) diff --git a/aidb/engine/engine.py b/aidb/engine/engine.py index 25658738..639360db 100644 --- a/aidb/engine/engine.py +++ b/aidb/engine/engine.py @@ -53,49 +53,22 @@ def execute(self, query: str, **kwargs): async def clear_ml_cache(self, service_name_list = None): ''' Clear the cache and output table if the ML model has changed. - For each cached inference service, build the reference graph of the tables based on fk constraints, - and then delete the tables following the graph's topological order to maintain integrity during deletion. - - service_name_list: the name of all the changed services. Should be a list of str or None. + Delete the tables following the inference services' topological order to maintain integrity during deletion. + service_name_list: the name of all the changed services. A list of str or None. If the service name list is not given, the output for all the services will be cleared. ''' - async def clear_service_cache(inference_binding): - if isinstance(inference_binding, CachedBoundInferenceService): - async with inference_binding._engine.begin() as conn: - tables_to_delete = inference_binding.get_tables(inference_binding.binding.output_columns) + [inference_binding._cache_table_name] - fk_ref_graph = nx.DiGraph() - fk_ref_graph.add_nodes_from(tables_to_delete) - for table_name in tables_to_delete: - if table_name == inference_binding._cache_table_name: - table = inference_binding._cache_table - else: - table = inference_binding._tables[table_name]._table - for constraint in table.constraints: - if isinstance(constraint, ForeignKeyConstraint): - for fk in constraint.elements: - fk_ref_table_name = fk.column.table.name - if fk_ref_table_name in fk_ref_graph: - fk_ref_graph.add_edge(table_name,fk_ref_table_name) - - table_order = nx.topological_sort(fk_ref_graph) - for table_name in table_order: - if table_name == inference_binding._cache_table_name: - table = inference_binding._cache_table - else: - table = inference_binding._tables[table_name]._table - await conn.execute(delete(table)) - - else: - logger.debug(f"Service binding for {inference_binding.service.name} is not cached") - - if service_name_list is not None: - service_name_set = set(service_name_list) - for inference_binding in self._config.inference_bindings: - if service_name_list is None or inference_binding.service.name in service_name_set: - logger.debug(f"Clearing cache for service {inference_binding.service.name}") - await clear_service_cache(inference_binding) - if service_name_list is not None: - service_name_set.remove(inference_binding.service.name) - - if service_name_list is not None and len(service_name_set) != 0: - logger.warning(f"Service binding {service_name_set} are not found.") \ No newline at end of file + async with self._sql_engine.begin() as conn: + service_ordering = self._config.inference_topological_order + if service_name_list is None: + service_name_list = [bounded_service.service.name for bounded_service in service_ordering] + service_name_list = set(service_name_list) + for bounded_service in reversed(service_ordering): + if isinstance(bounded_service, CachedBoundInferenceService): + if bounded_service.service.name in service_name_list: + for input_column in bounded_service.binding.input_columns: + service_name_list.add(input_column.split('.')[0]) + asyncio_run(conn.execute(delete(bounded_service._cache_table))) + for output_column in bounded_service.binding.output_columns: + asyncio_run(conn.execute(delete(bounded_service._tables[output_column.split('.')[0]]._table))) + else: + logger.debug(f"Service binding for {bounded_service.service.name} is not cached") \ No newline at end of file From ac4eeff93fce0f02a2a231f0cfa00a5c4cdad0f7 Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Wed, 15 May 2024 23:41:42 +0800 Subject: [PATCH 16/19] Fix code style and service collection * Fix how the service to clear cache are collected * Reduce redundant table deletion * Comment Fix * Remove redundant sleep --- aidb/engine/engine.py | 15 ++++++++++++--- tests/tests_caching_logic.py | 6 ++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/aidb/engine/engine.py b/aidb/engine/engine.py index 639360db..6f3399fc 100644 --- a/aidb/engine/engine.py +++ b/aidb/engine/engine.py @@ -62,13 +62,22 @@ async def clear_ml_cache(self, service_name_list = None): if service_name_list is None: service_name_list = [bounded_service.service.name for bounded_service in service_ordering] service_name_list = set(service_name_list) + + # Get all the services that need to be cleared because of foreign key constraints + for bounded_service in service_ordering: + if bounded_service.service.name in service_name_list: + for in_edge in self._config.table_graph.in_edges(bounded_service.service.name): + service_name_list.add(in_edge[0]) + + # Clear the services in reversed topological order for bounded_service in reversed(service_ordering): if isinstance(bounded_service, CachedBoundInferenceService): if bounded_service.service.name in service_name_list: - for input_column in bounded_service.binding.input_columns: - service_name_list.add(input_column.split('.')[0]) asyncio_run(conn.execute(delete(bounded_service._cache_table))) + output_tables_to_be_deleted = set() for output_column in bounded_service.binding.output_columns: - asyncio_run(conn.execute(delete(bounded_service._tables[output_column.split('.')[0]]._table))) + output_tables_to_be_deleted.add(output_column.split('.')[0]) + for table_name in output_tables_to_be_deleted: + asyncio_run(conn.execute(delete(bounded_service._tables[table_name]._table))) else: logger.debug(f"Service binding for {bounded_service.service.name} is not cached") \ No newline at end of file diff --git a/tests/tests_caching_logic.py b/tests/tests_caching_logic.py index 7eb2bb3a..4b2dc325 100644 --- a/tests/tests_caching_logic.py +++ b/tests/tests_caching_logic.py @@ -85,7 +85,6 @@ async def test_num_infer_calls(self): del aidb_engine p.terminate() p.join() - time.sleep(1) async def test_only_one_service_deleted(self): ''' @@ -131,13 +130,13 @@ async def test_only_one_service_deleted(self): # For the first two queries, all the inference is needed. # The third query need to infer again but the fourth don't. for index, (query_type, aidb_query, exact_query) in enumerate(queries): - # Run the query on the aidb database logger.info(f'Running query {exact_query} in ground truth database') # Run the query on the ground truth database async with gt_engine.begin() as conn: gt_res = await conn.execute(text(exact_query)) gt_res = gt_res.fetchall() logger.info(f'Running initial query {aidb_query} in aidb database') + # Run the query on the aidb database aidb_res = aidb_engine.execute(aidb_query) assert len(gt_res) == len(aidb_res) assert aidb_engine._config.inference_services["counts03"].infer_one.calls == calls[0][index] @@ -145,13 +144,13 @@ async def test_only_one_service_deleted(self): asyncio_run(aidb_engine.clear_ml_cache(["lights01"])) for index, (query_type, aidb_query, exact_query) in enumerate(queries): - # Run the query on the aidb database logger.info(f'Running query {exact_query} in ground truth database') # Run the query on the ground truth database async with gt_engine.begin() as conn: gt_res = await conn.execute(text(exact_query)) gt_res = gt_res.fetchall() logger.info(f'Running query {aidb_query} in aidb database after cache deleted') + # Run the query on the aidb database aidb_res = aidb_engine.execute(aidb_query) assert len(gt_res) == len(aidb_res) assert aidb_engine._config.inference_services["counts03"].infer_one.calls == calls[1][index] @@ -160,7 +159,6 @@ async def test_only_one_service_deleted(self): del aidb_engine p.terminate() p.join() - time.sleep(1) if __name__ == '__main__': unittest.main() From 3a9e94e6b7250a2a7d936337b9fef6bdb09e72b1 Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Thu, 16 May 2024 00:46:40 +0800 Subject: [PATCH 17/19] Fix service collection * Use the correct graph to collect service --- aidb/engine/engine.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aidb/engine/engine.py b/aidb/engine/engine.py index 6f3399fc..ac7f0b4f 100644 --- a/aidb/engine/engine.py +++ b/aidb/engine/engine.py @@ -66,8 +66,9 @@ async def clear_ml_cache(self, service_name_list = None): # Get all the services that need to be cleared because of foreign key constraints for bounded_service in service_ordering: if bounded_service.service.name in service_name_list: - for in_edge in self._config.table_graph.in_edges(bounded_service.service.name): - service_name_list.add(in_edge[0]) + for input_column in bounded_service.binding.input_columns: + for in_edge in self._config.inference_graph.in_edges(input_column): + service_name_list.add(in_edge.bound_service.service.name) # Clear the services in reversed topological order for bounded_service in reversed(service_ordering): From 9a0b072c362a29ba598eb7a144b4172616c7a4b7 Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Thu, 16 May 2024 00:57:13 +0800 Subject: [PATCH 18/19] Fix the way getting edge attribute * Fix --- aidb/engine/engine.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aidb/engine/engine.py b/aidb/engine/engine.py index ac7f0b4f..e693ea4d 100644 --- a/aidb/engine/engine.py +++ b/aidb/engine/engine.py @@ -64,11 +64,12 @@ async def clear_ml_cache(self, service_name_list = None): service_name_list = set(service_name_list) # Get all the services that need to be cleared because of foreign key constraints + inference_graph = self._config.inference_graph for bounded_service in service_ordering: if bounded_service.service.name in service_name_list: for input_column in bounded_service.binding.input_columns: - for in_edge in self._config.inference_graph.in_edges(input_column): - service_name_list.add(in_edge.bound_service.service.name) + for in_edge in inference_graph.in_edges(input_column): + service_name_list.add(inference_graph.get_edge_data(*in_edge)['bound_service']) # Clear the services in reversed topological order for bounded_service in reversed(service_ordering): From 0ece539576b93799e2d791613be8dd6b71e2f9ad Mon Sep 17 00:00:00 2001 From: hjk1030 Date: Sun, 19 May 2024 20:15:09 +0800 Subject: [PATCH 19/19] Merge test stages & Refactor cache clearing * Merge the two stage of testing using a loop * Refactor the cache clearing using the table graph --- aidb/engine/engine.py | 93 +++++++++++++++++++++++++----------- tests/tests_caching_logic.py | 40 ++++++---------- 2 files changed, 80 insertions(+), 53 deletions(-) diff --git a/aidb/engine/engine.py b/aidb/engine/engine.py index e693ea4d..4428fbac 100644 --- a/aidb/engine/engine.py +++ b/aidb/engine/engine.py @@ -1,3 +1,5 @@ +from collections import deque + import networkx as nx from sqlalchemy.schema import ForeignKeyConstraint from sqlalchemy.sql import delete @@ -50,36 +52,71 @@ def execute(self, query: str, **kwargs): finally: self.__del__() - async def clear_ml_cache(self, service_name_list = None): + async def clear_ml_cache(self, services_to_clear = None): ''' Clear the cache and output table if the ML model has changed. - Delete the tables following the inference services' topological order to maintain integrity during deletion. - service_name_list: the name of all the changed services. A list of str or None. + 1. Collect the output tables directly related to the selected services. + 2. Collect the output tables that need to be cleared considering the fk and service constraints. + 3. Delete the cache tables. + 4. Delete the output tables in the reversed topological order of table_graph. + + services_to_clear: the name of all the changed services. A list of str or None. If the service name list is not given, the output for all the services will be cleared. + Note that the output for some other services may be also cleared because of fk constraints. ''' + if services_to_clear is None: + services_to_clear = [bound_service.service.name for bound_service in self._config.inference_bindings] + services_to_clear = set(services_to_clear) + + # The services that has output columns in the table + table_related_service = {table_name: set() for table_name in self._config.tables.keys()} + # The output tables of each service + output_tables = {service_name: set() for service_name in self._config.inference_services.keys()} + tables_to_clear = set() + + for bound_service in self._config.inference_bindings: + if isinstance(bound_service, CachedBoundInferenceService): + # Construct the table to service map and the output table list + service_name = bound_service.service.name + for output_column in bound_service.binding.output_columns: + output_tables[service_name].add(output_column.split('.')[0]) + for output_table_name in output_tables[service_name]: + table_related_service[output_table_name].add(service_name) + # Collect the output tables directly related to service_to_clear + if service_name in services_to_clear: + tables_to_clear.update(output_tables[service_name]) + else: + logger.debug(f"Service binding for {bound_service.service.name} is not cached") + + # Collect the output tables that need to be cleared considering the fk and service constraints + # Do a bfs on the reversed table graph + table_graph = self._config.table_graph + table_queue = deque(tables_to_clear) + + def add_table_to_queue(table): + if table not in tables_to_clear: + tables_to_clear.add(table) + table_queue.append(table) + + while len(table_queue) > 0: + table_name = table_queue.popleft() + # Add tables considering fk constraints + for in_edge in table_graph.in_edges(table_name): + add_table_to_queue(in_edge[0]) + # Add tables considering service constraints + services_to_clear.update(table_related_service[table_name]) + for service_name in table_related_service[table_name]: + for table_to_add in output_tables[service_name]: + add_table_to_queue(table_to_add) + async with self._sql_engine.begin() as conn: - service_ordering = self._config.inference_topological_order - if service_name_list is None: - service_name_list = [bounded_service.service.name for bounded_service in service_ordering] - service_name_list = set(service_name_list) - - # Get all the services that need to be cleared because of foreign key constraints - inference_graph = self._config.inference_graph - for bounded_service in service_ordering: - if bounded_service.service.name in service_name_list: - for input_column in bounded_service.binding.input_columns: - for in_edge in inference_graph.in_edges(input_column): - service_name_list.add(inference_graph.get_edge_data(*in_edge)['bound_service']) - - # Clear the services in reversed topological order - for bounded_service in reversed(service_ordering): - if isinstance(bounded_service, CachedBoundInferenceService): - if bounded_service.service.name in service_name_list: - asyncio_run(conn.execute(delete(bounded_service._cache_table))) - output_tables_to_be_deleted = set() - for output_column in bounded_service.binding.output_columns: - output_tables_to_be_deleted.add(output_column.split('.')[0]) - for table_name in output_tables_to_be_deleted: - asyncio_run(conn.execute(delete(bounded_service._tables[table_name]._table))) - else: - logger.debug(f"Service binding for {bounded_service.service.name} is not cached") \ No newline at end of file + # Delete cache tables + for bound_service in self._config.inference_bindings: + if bound_service.service.name in services_to_clear: + asyncio_run(conn.execute(delete(bound_service._cache_table))) + + # Delete output tables + table_order = nx.topological_sort(table_graph) + for table_name in table_order: + if table_name in tables_to_clear: + asyncio_run(conn.execute(delete(self._config.tables[table_name]._table))) \ No newline at end of file diff --git a/tests/tests_caching_logic.py b/tests/tests_caching_logic.py index 4b2dc325..ef7704f6 100644 --- a/tests/tests_caching_logic.py +++ b/tests/tests_caching_logic.py @@ -129,31 +129,21 @@ async def test_only_one_service_deleted(self): # each query calls 20 inference. # For the first two queries, all the inference is needed. # The third query need to infer again but the fourth don't. - for index, (query_type, aidb_query, exact_query) in enumerate(queries): - logger.info(f'Running query {exact_query} in ground truth database') - # Run the query on the ground truth database - async with gt_engine.begin() as conn: - gt_res = await conn.execute(text(exact_query)) - gt_res = gt_res.fetchall() - logger.info(f'Running initial query {aidb_query} in aidb database') - # Run the query on the aidb database - aidb_res = aidb_engine.execute(aidb_query) - assert len(gt_res) == len(aidb_res) - assert aidb_engine._config.inference_services["counts03"].infer_one.calls == calls[0][index] - - asyncio_run(aidb_engine.clear_ml_cache(["lights01"])) - - for index, (query_type, aidb_query, exact_query) in enumerate(queries): - logger.info(f'Running query {exact_query} in ground truth database') - # Run the query on the ground truth database - async with gt_engine.begin() as conn: - gt_res = await conn.execute(text(exact_query)) - gt_res = gt_res.fetchall() - logger.info(f'Running query {aidb_query} in aidb database after cache deleted') - # Run the query on the aidb database - aidb_res = aidb_engine.execute(aidb_query) - assert len(gt_res) == len(aidb_res) - assert aidb_engine._config.inference_services["counts03"].infer_one.calls == calls[1][index] + for round in range(2): + for index, (query_type, aidb_query, exact_query) in enumerate(queries): + logger.info(f'Running query {exact_query} in ground truth database') + # Run the query on the ground truth database + async with gt_engine.begin() as conn: + gt_res = await conn.execute(text(exact_query)) + gt_res = gt_res.fetchall() + logger.info(f'Running query {aidb_query} in aidb database') + # Run the query on the aidb database + aidb_res = aidb_engine.execute(aidb_query) + assert len(gt_res) == len(aidb_res) + # Check the call number + assert aidb_engine._config.inference_services["counts03"].infer_one.calls == calls[round][index] + # Clear the cache for one of the services and retain the other one + asyncio_run(aidb_engine.clear_ml_cache(["lights01"])) del gt_engine del aidb_engine