diff --git a/.github/workflows/pytest-dbutils-influxdbfuncts.yml b/.github/workflows/pytest-dbutils-influxdbfuncts.yml deleted file mode 100644 index c8e7eaf32a..0000000000 --- a/.github/workflows/pytest-dbutils-influxdbfuncts.yml +++ /dev/null @@ -1,72 +0,0 @@ ---- -# This workflow is meant as a foundational workflow for running integration/unit tests on the -# platform. -# This workflow also shows the caching mechanisms available for storage -# and retrieval of cache for quicker setup of test environments. - - -name: Testing influxdbutils -on: - workflow_dispatch: - push: - branches: - - develop - - releases/** - pull_request: - branches: - - main - - develop - - releases/** -concurrency: - group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} - cancel-in-progress: true - -jobs: - build: - env: - TEST_FILE: volttrontesting/platform/dbutils/test_influxdbutils.py - - # The strategy allows customization of the build and allows matrixing the version of os and software - # https://docs.github.com/en/free-pro-team@l.atest/actions/reference/workflow-syntax-for-github-actions#jobsjob_idstrategy - strategy: - fail-fast: false - matrix: - # Each entry in the os and python-version matrix will be run so for the 3 x 4 there will be 12 jobs run - os: [ ubuntu-22.04 ] - python-version: [ '3.10' ] - - runs-on: ${{ matrix.os }} - - steps: - # checkout the volttron repository and set current directory to it - - uses: actions/checkout@v4 - - # Attempt to restore the cache from the build-dependency-cache workflow if present then - # the output value steps.check_files.outputs.files_exists will be set (see the next step for usage) - - name: Set up Python ${{matrix.os}} ${{ matrix.python-version }} - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - - name: Create output suffix - run: | - echo "OUTPUT_SUFFIX=$(basename $TEST_FILE)" >> $GITHUB_ENV - - # Run the specified tests and save the results to a unique file that can be archived for later analysis. - - name: Run pytest on ${{ matrix.python-version }}, ${{ matrix.os }} - uses: volttron/volttron-build-action@no_rmq - timeout-minutes: 600 - with: - python_version: ${{ matrix.python-version }} - os: ${{ matrix.os }} - test_path: ${{ env.TEST_FILE }} - test_output_suffix: ${{ env.OUTPUT_SUFFIX }} - -# Archive the results from the pytest to storage. - - name: Archive test results - uses: actions/upload-artifact@v4 - if: always() - with: - name: pytest-report - # should match test-- ... - path: output/test-${{ env.OUTPUT_SUFFIX }}-${{matrix.os}}-${{ matrix.python-version }}-results.xml diff --git a/.github/workflows/pytest-dbutils-postgresqlfuncts.yml b/.github/workflows/pytest-dbutils-postgresqlfuncts.yml index 3483215824..4f8458bcde 100644 --- a/.github/workflows/pytest-dbutils-postgresqlfuncts.yml +++ b/.github/workflows/pytest-dbutils-postgresqlfuncts.yml @@ -37,6 +37,25 @@ jobs: runs-on: ${{ matrix.os }} + services: + # Label used to access the service container + postgres: + # Docker Hub image + image: postgres + # Provide the password for postgres + env: + POSTGRES_PASSWORD: postgres + ports: + # Maps tcp port 5432 on service container to the host + - 5432:5432 + timescale: + image: timescale/timescaledb:latest-pg15 + env: + POSTGRES_PASSWORD: postgres + ports: + # Maps tcp port 5432 on service container to the host + - 5433:5432 + steps: # checkout the volttron repository and set current directory to it - uses: actions/checkout@v4 @@ -51,7 +70,16 @@ jobs: - name: Create output suffix run: | echo "OUTPUT_SUFFIX=$(basename $TEST_FILE)" >> $GITHUB_ENV - + + - name: Create test historian database + run: | + PGPASSWORD="postgres" psql -U postgres -h localhost -p 5432 -c "CREATE DATABASE test_historian;" + PGPASSWORD="postgres" psql -U postgres -h localhost -p 5433 -c "CREATE DATABASE test_historian;" + env: + # The hostname used to communicate with the PostgreSQL service container + POSTGRES_HOST: localhost + POSTGRES_PASSWORD: postgres + # Run the specified tests and save the results to a unique file that can be archived for later analysis. - name: Run pytest on ${{ matrix.python-version }}, ${{ matrix.os }} uses: volttron/volttron-build-action@no_rmq diff --git a/.github/workflows/pytest-dbutils-timescaldbfuncts.yml b/.github/workflows/pytest-dbutils-timescaldbfuncts.yml deleted file mode 100644 index b4140384f0..0000000000 --- a/.github/workflows/pytest-dbutils-timescaldbfuncts.yml +++ /dev/null @@ -1,71 +0,0 @@ ---- -# This workflow is meant as a foundational workflow for running integration/unit tests on the -# platform. -# This workflow also shows the caching mechanisms available for storage -# and retrieval of cache for quicker setup of test environments. - - -name: Testing postgresql_timescaledb_functs -on: - workflow_dispatch: - push: - branches: - - develop - - releases/** - pull_request: - branches: - - main - - develop - - releases/** -concurrency: - group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} - cancel-in-progress: true -jobs: - build: - env: - TEST_FILE: volttrontesting/platform/dbutils/test_postgresql_timescaledb.py - - # The strategy allows customization of the build and allows matrixing the version of os and software - # https://docs.github.com/en/free-pro-team@l.atest/actions/reference/workflow-syntax-for-github-actions#jobsjob_idstrategy - strategy: - fail-fast: false - matrix: - # Each entry in the os and python-version matrix will be run so for the 3 x 4 there will be 12 jobs run - os: [ ubuntu-22.04 ] - python-version: [ '3.10' ] - - runs-on: ${{ matrix.os }} - - steps: - # checkout the volttron repository and set current directory to it - - uses: actions/checkout@v4 - - # Attempt to restore the cache from the build-dependency-cache workflow if present then - # the output value steps.check_files.outputs.files_exists will be set (see the next step for usage) - - name: Set up Python ${{matrix.os}} ${{ matrix.python-version }} - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - - name: Create output suffix - run: | - echo "OUTPUT_SUFFIX=$(basename $TEST_FILE)" >> $GITHUB_ENV - - # Run the specified tests and save the results to a unique file that can be archived for later analysis. - - name: Run pytest on ${{ matrix.python-version }}, ${{ matrix.os }} - uses: volttron/volttron-build-action@no_rmq - timeout-minutes: 600 - with: - python_version: ${{ matrix.python-version }} - os: ${{ matrix.os }} - test_path: ${{ env.TEST_FILE }} - test_output_suffix: ${{ env.OUTPUT_SUFFIX }} - -# Archive the results from the pytest to storage. - - name: Archive test results - uses: actions/upload-artifact@v4 - if: always() - with: - name: pytest-report - # should match test-- ... - path: output/test-${{ env.OUTPUT_SUFFIX }}-${{matrix.os}}-${{ matrix.python-version }}-results.xml diff --git a/README.md b/README.md index bea8985a36..6b2139b8ed 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ devices and provides an environment for developing applications which interact with that data. ## Upgrading Pre-8 to VOLTTRON 9.x +VOLTTRON 9.0.4 requires python 3.10 and was tested on Ubuntu 22.04 VOLTTRON 8 introduces four changes that require an explict upgrade step when upgrading from an earlier VOLTTRON version @@ -72,16 +73,16 @@ SQLAggregateHistorian source directory ## Installation -VOLTTRON is written in Python 3.6+ and runs on Linux Operating Systems. For +VOLTTRON is written in Python 3.10 and runs on Linux Operating Systems. For users unfamiliar with those technologies, the following resources are recommended: -- +- - ### 1. Install prerequisites [Requirements Reference](https://volttron.readthedocs.io/en/latest/introduction/platform-install.html#step-1-install-prerequisites) - +From version 9.0.4, VOLTTRON requires python 3.10. And it was tested on Ubuntu 22.04 From version 7.0, VOLTTRON requires python 3 with a minimum version of 3.6; it is tested only systems supporting that as a native package. On Debian-based systems (Ubuntu bionic, debian buster, raspbian buster), these can all be installed with the following commands: @@ -94,7 +95,7 @@ sudo apt-get install build-essential libffi-dev python3-dev python3-venv openssl On Redhat or CENTOS systems, these can all be installed with the following command: ```sh sudo yum update -sudo yum install make automake gcc gcc-c++ kernel-devel python3.6-devel pythone3.6-venv openssl openssl-devel libevent-devel git +sudo yum install make automake gcc gcc-c++ kernel-devel python3.10-devel pythone3.10-venv openssl openssl-devel libevent-devel git ``` ### 2. Clone VOLTTRON code diff --git a/bootstrap.py b/bootstrap.py index 8121b03c4c..520b030bd6 100644 --- a/bootstrap.py +++ b/bootstrap.py @@ -239,7 +239,7 @@ def main(argv=sys.argv): parser = argparse.ArgumentParser( description='Bootstrap and update a virtual Python environment ' 'for VOLTTRON development.', - usage='\n bootstrap: python3.6 %(prog)s [options]' + usage='\n bootstrap: python3.10 %(prog)s [options]' '\n update: {} %(prog)s [options]'.format(python), prog=os.path.basename(argv[0]), epilog=""" diff --git a/volttrontesting/platform/dbutils/test_mongoutils.py b/deprecated/MongodbHistorian/tests/test_mongoutils.py similarity index 100% rename from volttrontesting/platform/dbutils/test_mongoutils.py rename to deprecated/MongodbHistorian/tests/test_mongoutils.py diff --git a/docs/source/conf.py b/docs/source/conf.py index e9acb22963..9e8a8d8dd9 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -238,7 +238,7 @@ def __getattr__(cls, name): # Example configuration for intersphinx: refer to the Python standard library. intersphinx_mapping = { - 'https://docs.python.org/3.6': + 'https://docs.python.org/3.10': None, 'volttron-ansible': ('https://volttron.readthedocs.io/projects/volttron-ansible/en/main/', diff --git a/docs/source/introduction/platform-install.rst b/docs/source/introduction/platform-install.rst index 5b77d8ae82..05c2cdb7ea 100644 --- a/docs/source/introduction/platform-install.rst +++ b/docs/source/introduction/platform-install.rst @@ -7,10 +7,10 @@ Installing the Platform ======================= -VOLTTRON is written in Python 3.6+ and runs on Linux Operating Systems. For users unfamiliar with those technologies, +VOLTTRON is written in Python 3.10 and runs on Linux Operating Systems. For users unfamiliar with those technologies, the following resources are recommended: -- `Python 3.6 Tutorial `_ +- `Python 3.10 Tutorial `_ - `Linux Tutorial `_ This guide will specify commands to use to successfully install the platform on supported Linux distributions, but a @@ -18,7 +18,7 @@ working knowledge of Linux will be helpful for troubleshooting and may improve y deployment. .. note:: - + volttron version 9.0.4 was tested on Ubuntu 22.04 and python 3.10 Volttron version 7.0rc1 is currently tested for Ubuntu versions 18.04 and 18.10 as well as Linux Mint version 19.3. Version 6.x is tested for Ubuntu versions 16.04 and 18.04 as well as Linux Mint version 19.1. @@ -32,8 +32,8 @@ The following packages will need to be installed on the system: * git * build-essential -* python3.6-dev -* python3.6-venv +* python3.10-dev +* python3.10-venv * openssl * libssl-dev * libevent-dev @@ -43,14 +43,14 @@ On **Debian-based systems**, these can all be installed with the following comma .. code-block:: bash sudo apt-get update - sudo apt-get install build-essential python3-dev python3-venv openssl libssl-dev libevent-dev git + sudo apt-get install build-essential python3.10-dev python3.10-venv openssl libssl-dev libevent-dev git -On Ubuntu-based systems, available packages allow you to specify the Python3 version, 3.6 or greater is required +On Ubuntu-based systems, available packages allow you to specify the Python3 version, 3.10 or greater is required (Debian itself does not provide those packages). .. code-block:: bash - sudo apt-get install build-essential python3.6-dev python3.6-venv openssl libssl-dev libevent-dev git + sudo apt-get install build-essential python3.10-dev python3.10-venv openssl libssl-dev libevent-dev git On arm-based systems (including, but not limited to, Raspbian), you must also install libffi-dev, you can do this with: @@ -74,7 +74,7 @@ command: sudo yum install make automake gcc gcc-c++ kernel-devel python3-devel openssl openssl-devel libevent-devel git .. warning:: - Python 3.6 or greater is required, please ensure you have installed a supported version with :bash:`python3 --version` + Python 3.10 or greater is required, please ensure you have installed a supported version with :bash:`python3 --version` If you have an agent which requires the pyodbc package, install the following additional requirements: diff --git a/volttrontesting/platform/dbutils/test_influxdbutils.py b/services/contrib/InfluxdbHistorian/tests/test_influxdbutils.py similarity index 100% rename from volttrontesting/platform/dbutils/test_influxdbutils.py rename to services/contrib/InfluxdbHistorian/tests/test_influxdbutils.py diff --git a/volttron/platform/keystore.py b/volttron/platform/keystore.py index 6bf7ee9de7..832f6554e4 100644 --- a/volttron/platform/keystore.py +++ b/volttron/platform/keystore.py @@ -196,6 +196,8 @@ def serverkey(self, addr): @staticmethod def _parse_addr(addr): + if not "://" in addr: # no schema urlparse parses it wrong and return port number as path + addr = "//" + addr url = urllib.parse.urlparse(addr) if url.netloc: return url.netloc diff --git a/volttrontesting/platform/auth_tests/test_auth_control.py b/volttrontesting/platform/auth_tests/test_auth_control.py index a17c6bf1b0..8531ac8f76 100644 --- a/volttrontesting/platform/auth_tests/test_auth_control.py +++ b/volttrontesting/platform/auth_tests/test_auth_control.py @@ -333,7 +333,7 @@ def test_auth_rpc_method_add(auth_instance): assert entries[-1]['rpc_method_authorizations'] == {'test_method': ["test_auth"]} - +@pytest.mark.xfail(reason="Known issue. ToDo - https://github.com/VOLTTRON/volttron/issues/3215") @pytest.mark.control def test_auth_rpc_method_remove(auth_instance): """Add an entry then update it with a different entry""" diff --git a/volttrontesting/platform/dbutils/test_postgresql_timescaledb.py b/volttrontesting/platform/dbutils/test_postgresql_timescaledb.py deleted file mode 100644 index c84cab34dd..0000000000 --- a/volttrontesting/platform/dbutils/test_postgresql_timescaledb.py +++ /dev/null @@ -1,716 +0,0 @@ -import datetime -import itertools -import os -import logging -import pytest -from time import time, sleep - -try: - import psycopg2 - from psycopg2.sql import SQL, Identifier -except ImportError: - pytest.skip( - "Required imports for testing are not installed; thus, not running tests. " - "Install imports with: python bootstrap.py --postgres", - allow_module_level=True - ) - -from volttron.platform import jsonapi -from volttron.platform.dbutils.postgresqlfuncts import PostgreSqlFuncts -from volttrontesting.fixtures.docker_wrapper import create_container -from volttrontesting.utils.utils import get_rand_port - -logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO) -pytestmark = [ - pytest.mark.postgresqlfuncts_timescaledb, - pytest.mark.dbutils, - pytest.mark.unit, -] - - -IMAGES = ["timescale/timescaledb:latest-pg12"] -if "CI" in os.environ: - IMAGES.extend( - ["timescale/timescaledb:latest-pg11", "timescale/timescaledb:latest-pg10"] - ) - -CONNECTION_HOST = "localhost" -ALLOW_CONNECTION_TIME = 3 -TEST_DATABASE = "test_historian" -ROOT_USER = "postgres" -ROOT_PASSWORD = "password" -ENV_POSTGRESQL = { - "POSTGRES_USER": ROOT_USER, # defining user not necessary but added to be explicit - "POSTGRES_PASSWORD": ROOT_PASSWORD, - "POSTGRES_DB": TEST_DATABASE, -} -DATA_TABLE = "data" -TOPICS_TABLE = "topics" -META_TABLE = "meta" -AGG_TOPICS_TABLE = "aggregate_topics" -AGG_META_TABLE = "aggregate_meta" -METADATA_TABLE = "metadata" - - -def test_insert_meta_should_return_true(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - if historian_version != "<4.0.0": - pytest.skip("insert_meta() is called by historian only for schema <4.0.0") - topic_id = "44" - metadata = "foobar44" - expected_data = (44, '"foobar44"') - - res = postgresqlfuncts.insert_meta(topic_id, metadata) - - assert res is True - assert get_data_in_table(port_on_host, "meta")[0] == expected_data - - -def test_update_meta_should_succeed(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func - metadata = {"units": "count"} - metadata_s = jsonapi.dumps(metadata) - topic = "foobar" - - id = sqlfuncts.insert_topic(topic) - sqlfuncts.insert_meta(id, {"fdjlj": "XXXX"}) - assert metadata_s not in get_data_in_table(connection_port, TOPICS_TABLE)[0] - - res = sqlfuncts.update_meta(id, metadata) - - expected_lt_4 = [(1, metadata_s)] - expected_gteq_4 = [(1, topic, metadata_s)] - assert res is True - if historian_version == "<4.0.0": - assert get_data_in_table(connection_port, META_TABLE) == expected_lt_4 - else: - assert get_data_in_table(connection_port, TOPICS_TABLE) == expected_gteq_4 - - -def test_setup_historian_tables_should_create_tables(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func - # get_container initializes db and sqlfuncts - # to test setup explicitly drop tables and see if tables get created correctly - drop_all_tables(connection_port) - - assert get_tables(connection_port) == set() - expected_tables = set(["data", "topics"]) - sqlfuncts.setup_historian_tables() - actual_tables = get_tables(connection_port) - assert actual_tables == expected_tables - - -def test_setup_aggregate_historian_tables_should_create_aggregate_tables( - get_container_func, -): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - # get_container initializes db and sqlfuncts to test setup explicitly drop tables and see if tables get created - drop_all_tables(port_on_host) - create_historian_tables(container, historian_version) - agg_topic_table = "aggregate_topics" - agg_meta_table = "aggregate_meta" - - original_tables = get_tables(port_on_host) - assert agg_topic_table not in original_tables - assert agg_meta_table not in original_tables - - expected_agg_topic_fields = { - "agg_topic_id", - "agg_topic_name", - "agg_time_period", - "agg_type", - } - expected_agg_meta_fields = {"agg_topic_id", "metadata"} - - postgresqlfuncts.setup_aggregate_historian_tables() - - updated_tables = get_tables(port_on_host) - assert agg_topic_table in updated_tables - assert agg_meta_table in updated_tables - assert describe_table(port_on_host, agg_topic_table) == expected_agg_topic_fields - assert describe_table(port_on_host, agg_meta_table) == expected_agg_meta_fields - assert postgresqlfuncts.agg_topics_table == agg_topic_table - assert postgresqlfuncts.agg_meta_table == agg_meta_table - assert postgresqlfuncts.data_table == DATA_TABLE - assert postgresqlfuncts.topics_table == TOPICS_TABLE - if postgresqlfuncts.meta_table != TOPICS_TABLE: - assert postgresqlfuncts.meta_table == META_TABLE - - -@pytest.mark.parametrize( - "topic_ids, id_name_map, expected_values", - [ - ([42], {42: "topic42"}, {"topic42": []}), - ( - [43], - {43: "topic43"}, - {"topic43": [("2020-06-01T12:30:59.000000+00:00", [2, 3])]}, - ), - ], -) -def test_query_should_return_data( - get_container_func, topic_ids, id_name_map, expected_values -): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - query = f""" - INSERT INTO {DATA_TABLE} VALUES ('2020-06-01 12:30:59', 43, '[2,3]') - """ - seed_database(container, query) - actual_values = postgresqlfuncts.query(topic_ids, id_name_map) - assert actual_values == expected_values - - -def test_insert_topic_should_return_topic_id(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - topic = "football" - expected_topic_id = 1 - actual_topic_id = postgresqlfuncts.insert_topic(topic) - assert actual_topic_id == expected_topic_id - - -def test_insert_topic_and_meta_query_should_succeed(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func - if historian_version == "<4.0.0": - pytest.skip("Not relevant for historian schema before 4.0.0") - topic = "football" - metadata = {"units": "count"} - actual_id = sqlfuncts.insert_topic(topic, metadata=metadata) - assert isinstance(actual_id, int) - result = get_data_in_table(connection_port, "topics")[0] - assert (actual_id, topic) == result[0:2] - assert metadata == jsonapi.loads(result[2]) - - -def test_insert_agg_topic_should_return_agg_topic_id(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - - topic = "some_agg_topic" - agg_type = "AVG" - agg_time_period = "2019" - expected_data = (1, "some_agg_topic", "AVG", "2019") - - actual_id = postgresqlfuncts.insert_agg_topic(topic, agg_type, agg_time_period) - - assert isinstance(actual_id, int) - assert get_data_in_table(port_on_host, AGG_TOPICS_TABLE)[0] == expected_data - - -def test_insert_data_should_return_true(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - - ts = "2001-09-11 08:46:00" - topic_id = "11" - data = "1wtc" - expected_data = [(datetime.datetime(2001, 9, 11, 8, 46), 11, '"1wtc"')] - - res = postgresqlfuncts.insert_data(ts, topic_id, data) - - assert res is True - assert get_data_in_table(port_on_host, "data") == expected_data - - -def test_update_topic_should_return_true(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - topic = "football" - - actual_id = postgresqlfuncts.insert_topic(topic) - - assert isinstance(actual_id, int) - - result = postgresqlfuncts.update_topic("soccer", actual_id) - - assert result is True - assert (actual_id, "soccer") == get_data_in_table(port_on_host, "topics")[0][0:2] - - -def test_update_topic_and_metadata_should_succeed(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func - if historian_version == "<4.0.0": - pytest.skip("Not relevant for historian schema before 4.0.0") - topic = "football" - actual_id = sqlfuncts.insert_topic(topic) - - assert isinstance(actual_id, int) - - result = sqlfuncts.update_topic("soccer", actual_id, metadata={"test": "test value"}) - - assert result is True - assert (actual_id, "soccer", '{"test": "test value"}') == get_data_in_table(connection_port, "topics")[0] - - - -def test_get_aggregation_list_should_return_list(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - expected_list = [ - "AVG", - "MIN", - "MAX", - "COUNT", - "SUM", - "BIT_AND", - "BIT_OR", - "BOOL_AND", - "BOOL_OR", - "MEDIAN", - "STDDEV", - "STDDEV_POP", - "STDDEV_SAMP", - "VAR_POP", - "VAR_SAMP", - "VARIANCE", - ] - - assert postgresqlfuncts.get_aggregation_list() == expected_list - - -def test_insert_agg_topic_should_return_true(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - - topic = "some_agg_topic" - agg_type = "AVG" - agg_time_period = "2019" - expected_data = (1, "some_agg_topic", "AVG", "2019") - - actual_id = postgresqlfuncts.insert_agg_topic(topic, agg_type, agg_time_period) - - assert isinstance(actual_id, int) - assert get_data_in_table(port_on_host, AGG_TOPICS_TABLE)[0] == expected_data - - -def test_update_agg_topic_should_return_true(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - - topic = "cars" - agg_type = "SUM" - agg_time_period = "2100ZULU" - expected_data = (1, "cars", "SUM", "2100ZULU") - - actual_id = postgresqlfuncts.insert_agg_topic(topic, agg_type, agg_time_period) - - assert isinstance(actual_id, int) - assert get_data_in_table(port_on_host, AGG_TOPICS_TABLE)[0] == expected_data - - new_agg_topic_name = "boats" - expected_data = (1, "boats", "SUM", "2100ZULU") - - result = postgresqlfuncts.update_agg_topic(actual_id, new_agg_topic_name) - - assert result is True - assert get_data_in_table(port_on_host, AGG_TOPICS_TABLE)[0] == expected_data - - -def test_insert_agg_meta_should_return_true(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - topic_id = 42 - # metadata must be in the following convention because aggregation methods, i.e. get_agg_topics, rely on metadata having a key called "configured_topics" - metadata = {"configured_topics": "meaning of life"} - expected_data = (42, '{"configured_topics": "meaning of life"}') - - result = postgresqlfuncts.insert_agg_meta(topic_id, metadata) - - assert result is True - assert get_data_in_table(port_on_host, AGG_META_TABLE)[0] == expected_data - - -def test_get_topic_map_should_return_maps(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - query = """ - INSERT INTO topics (topic_name) - VALUES ('football'); - INSERT INTO topics (topic_name) - VALUES ('baseball'); - """ - seed_database(container, query) - expected = ( - {"baseball": 2, "football": 1}, - {"baseball": "baseball", "football": "football"}, - ) - - actual = postgresqlfuncts.get_topic_map() - - assert actual == expected - - -def test_get_topic_meta_map_should_return_maps(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func - if historian_version == "<4.0.0": - pytest.skip("method applied only to version >=4.0.0") - else: - query = """ - INSERT INTO topics (topic_name) - VALUES ('football'); - INSERT INTO topics (topic_name, metadata) - VALUES ('baseball', '{\\"meta\\":\\"value\\"}'); - """ - seed_database(container, query) - expected = {1: None, 2: {"meta": "value"}} - actual = sqlfuncts.get_topic_meta_map() - assert actual == expected - -def test_get_agg_topics_should_return_list(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - topic = "some_agg_topic" - agg_type = "AVG" - agg_time_period = "2019" - topic_id = postgresqlfuncts.insert_agg_topic(topic, agg_type, agg_time_period) - metadata = {"configured_topics": "meaning of life"} - postgresqlfuncts.insert_agg_meta(topic_id, metadata) - expected_list = [("some_agg_topic", "AVG", "2019", "meaning of life")] - - actual_list = postgresqlfuncts.get_agg_topics() - - assert actual_list == expected_list - - -def test_get_agg_topic_map_should_return_dict(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - query = f""" - INSERT INTO {AGG_TOPICS_TABLE} - (agg_topic_name, agg_type, agg_time_period) - VALUES ('topic_name', 'AVG', '2001'); - """ - seed_database(container, query) - - expected = {("topic_name", "AVG", "2001"): 1} - - actual = postgresqlfuncts.get_agg_topic_map() - - assert actual == expected - - -def test_query_topics_by_pattern_should_return_matching_results(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - - query = f""" - INSERT INTO {TOPICS_TABLE} (topic_name) - VALUES ('football'); - INSERT INTO {TOPICS_TABLE} (topic_name) - VALUES ('foobar'); - INSERT INTO {TOPICS_TABLE} (topic_name) - VALUES ('xyzzzzzzzz'); - """ - seed_database(container, query) - expected = {"football": 1, "foobar": 2} - topic_pattern = "foo" - - actual = postgresqlfuncts.query_topics_by_pattern(topic_pattern) - - assert actual == expected - - -def test_create_aggregate_store_should_succeed(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - agg_type = "AVG" - agg_time_period = "1984" - expected_aggregate_table = "AVG_1984" - expected_fields = {"agg_value", "topics_list", "topic_id", "ts"} - - postgresqlfuncts.create_aggregate_store(agg_type, agg_time_period) - - assert expected_aggregate_table in get_tables(port_on_host) - assert describe_table(port_on_host, expected_aggregate_table) == expected_fields - - -def test_insert_aggregate_stmt_should_succeed(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - # be aware that Postgresql will automatically fold unquoted names into lower case - # From : https://www.postgresql.org/docs/current/sql-syntax-lexical.html - # Quoting an identifier also makes it case-sensitive, whereas unquoted names are always folded to lower case. - # For example, the identifiers FOO, foo, and "foo" are considered the same by PostgreSQL, - # but "Foo" and "FOO" are different from these three and each other. - # (The folding of unquoted names to lower case in PostgreSQL is incompatible with the SQL standard, - # which says that unquoted names should be folded to upper case. - # Thus, foo should be equivalent to "FOO" not "foo" according to the standard. - # If you want to write portable applications you are advised to always quote a particular name or never quote it.) - query = """ - CREATE TABLE AVG_1776 ( - ts timestamp NOT NULL, - topic_id INTEGER NOT NULL, - agg_value DOUBLE PRECISION NOT NULL, - topics_list TEXT, - UNIQUE(ts, topic_id)); - CREATE INDEX IF NOT EXISTS idx_avg_1776 ON avg_1776 (ts ASC); - """ - seed_database(container, query) - - agg_topic_id = 42 - agg_type = "avg" - period = "1776" - ts = "2020-06-01 12:30:59" - data = 42.44 - topic_ids = [12, 54, 65] - expected_data = ( - datetime.datetime(2020, 6, 1, 12, 30, 59), - 42, - 42.44, - "[12, 54, 65]", - ) - - res = postgresqlfuncts.insert_aggregate( - agg_topic_id, agg_type, period, ts, data, topic_ids - ) - - assert res is True - assert get_data_in_table(port_on_host, "avg_1776")[0] == expected_data - - -def test_collect_aggregate_stmt_should_return_rows(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - query = f""" - INSERT INTO {DATA_TABLE} - VALUES ('2020-06-01 12:30:59', 42, '2'); - INSERT INTO {DATA_TABLE} - VALUES ('2020-06-01 12:31:59', 43, '8') - """ - seed_database(container, query) - - topic_ids = [42, 43] - agg_type = "avg" - expected_aggregate = (5.0, 2) - - actual_aggregate = postgresqlfuncts.collect_aggregate(topic_ids, agg_type) - - assert actual_aggregate == expected_aggregate - - -def test_collect_aggregate_stmt_should_raise_value_error(get_container_func): - container, postgresqlfuncts, port_on_host, historian_version = get_container_func - with pytest.raises(ValueError): - postgresqlfuncts.collect_aggregate("dfdfadfdadf", "Invalid agg type") - - -def get_postgresqlfuncts(port): - connect_params = { - "dbname": TEST_DATABASE, - "user": ROOT_USER, - "password": ROOT_PASSWORD, - "host": CONNECTION_HOST, - "port": port, - "timescale_dialect": True, - } - - table_names = { - "data_table": DATA_TABLE, - "topics_table": TOPICS_TABLE, - "meta_table": META_TABLE, - "agg_topics_table": AGG_TOPICS_TABLE, - "agg_meta_table": AGG_META_TABLE, - } - - return PostgreSqlFuncts(connect_params, table_names) - - -@pytest.fixture(scope="module", params=itertools.product(IMAGES, ["<4.0.0", ">=4.0.0"])) -def get_container_func(request): - global CONNECTION_HOST - historian_version = request.param[1] - kwargs = {"env": ENV_POSTGRESQL} - if os.path.exists("/.dockerenv"): - print("Running test within docker container.") - connection_port = 5432 - CONNECTION_HOST = "postgresql_test" - kwargs["hostname"] = CONNECTION_HOST - else: - ports_dict = ports_config() - kwargs["ports"] = ports_dict["ports"] - connection_port = ports_dict["port_on_host"] - CONNECTION_HOST = "localhost" - - with create_container(request.param[0], **kwargs) as container: - wait_for_connection(container, connection_port) - create_all_tables(container, historian_version) - postgresqlfuncts = get_postgresqlfuncts(connection_port) - sleep(1) - postgresqlfuncts.setup_historian_tables() - yield container, postgresqlfuncts, connection_port, historian_version - - -def ports_config(): - port_on_host = get_rand_port(ip="5432") - return {"port_on_host": port_on_host, "ports": {"5432/tcp": port_on_host}} - - -def create_all_tables(container, historian_version): - create_historian_tables(container, historian_version) - create_aggregate_tables(container, historian_version) - - -def create_historian_tables(container, historian_version): - if historian_version == "<4.0.0": - query = f""" - CREATE TABLE IF NOT EXISTS {DATA_TABLE} ( - ts TIMESTAMP NOT NULL, - topic_id INTEGER NOT NULL, - value_string TEXT NOT NULL, - UNIQUE (topic_id, ts)); - CREATE TABLE IF NOT EXISTS {TOPICS_TABLE} ( - topic_id SERIAL PRIMARY KEY NOT NULL, - topic_name VARCHAR(512) NOT NULL, - UNIQUE (topic_name)); - CREATE TABLE IF NOT EXISTS {META_TABLE} ( - topic_id INTEGER PRIMARY KEY NOT NULL, - metadata TEXT NOT NULL); - """ - else: - query = f""" - CREATE TABLE IF NOT EXISTS {DATA_TABLE} ( - ts TIMESTAMP NOT NULL, - topic_id INTEGER NOT NULL, - value_string TEXT NOT NULL, - UNIQUE (topic_id, ts)); - CREATE TABLE IF NOT EXISTS {TOPICS_TABLE} ( - topic_id SERIAL PRIMARY KEY NOT NULL, - topic_name VARCHAR(512) NOT NULL, - metadata TEXT, - UNIQUE (topic_name)); - """ - - seed_database(container, query) - - -def create_aggregate_tables(container, historian_version): - if historian_version == "<4.0.0": - query = f""" - CREATE TABLE IF NOT EXISTS {AGG_TOPICS_TABLE} ( - agg_topic_id SERIAL PRIMARY KEY NOT NULL, - agg_topic_name VARCHAR(512) NOT NULL, - agg_type VARCHAR(512) NOT NULL, - agg_time_period VARCHAR(512) NOT NULL, - UNIQUE (agg_topic_name, agg_type, agg_time_period)); - CREATE TABLE IF NOT EXISTS {AGG_META_TABLE} ( - agg_topic_id INTEGER PRIMARY KEY NOT NULL, - metadata TEXT NOT NULL); - """ - else: - query = f""" - CREATE TABLE IF NOT EXISTS {AGG_TOPICS_TABLE} ( - agg_topic_id SERIAL PRIMARY KEY NOT NULL, - agg_topic_name VARCHAR(512) NOT NULL, - agg_type VARCHAR(20) NOT NULL, - agg_time_period VARCHAR(20) NOT NULL, - UNIQUE (agg_topic_name, agg_type, agg_time_period)); - CREATE TABLE IF NOT EXISTS {AGG_META_TABLE} ( - agg_topic_id INTEGER PRIMARY KEY NOT NULL, - metadata TEXT NOT NULL); - """ - seed_database(container, query) - - -def seed_database(container, query): - command = ( - f'psql --username="{ROOT_USER}" --dbname="{TEST_DATABASE}" --command="{query}"' - ) - r = container.exec_run(cmd=command, tty=True) - sleep(1) - if r[0] == 1: - raise RuntimeError( - f"SQL query did not successfully complete on the container: \n {r}" - ) - return - - -def get_tables(port): - cnx, cursor = get_cnx_cursor(port) - # unlike MYSQL, Postgresql does not have a "SHOW TABLES" shortcut - # we have to create the query ourselves - query = SQL( - "SELECT table_name " - "FROM information_schema.tables " - "WHERE table_type = 'BASE TABLE' and " - "table_schema not in ('pg_catalog', 'information_schema', '_timescaledb_catalog', '_timescaledb_config', '_timescaledb_internal', '_timescaledb_cache')" - ) - results = execute_statement(cnx, cursor, query) - - return {t[0] for t in results} - - -def describe_table(port, table): - cnx, cursor = get_cnx_cursor(port) - query = SQL( - "SELECT column_name " "FROM information_schema.columns " "WHERE table_name = %s" - ) - - results = execute_statement(cnx, cursor, query, args=[table]) - - return {t[0] for t in results} - - -def get_data_in_table(port, table): - cnx, cursor = get_cnx_cursor(port) - query = SQL("SELECT * " "FROM {table_name}").format(table_name=Identifier(table)) - - results = execute_statement(cnx, cursor, query) - - return results - - -def execute_statement(cnx, cursor, query, args=None): - cursor.execute(query, vars=args) - - results = cursor.fetchall() - - cursor.close() - cnx.close() - - return results - - -def get_cnx_cursor(port): - global CONNECTION_HOST - connect_params = { - "database": TEST_DATABASE, - "user": ROOT_USER, - "password": ROOT_PASSWORD, - "host": CONNECTION_HOST, - "port": port, - } - - cnx = psycopg2.connect(**connect_params) - - return cnx, cnx.cursor() - - -def wait_for_connection(container, port): - start_time = time() - while time() - start_time < ALLOW_CONNECTION_TIME: - command = f"psql --user={ROOT_USER} --dbname={TEST_DATABASE} --port={port}" - response = container.exec_run(command, tty=True) - # https://www.postgresql.org/docs/10/app-psql.html#id-1.9.4.18.7 - # psql returns 0 to the shell if it finished normally, - # 1 if a fatal error of its own occurs (e.g. out of memory, file not found), - # 2 if the connection to the server went bad and the session was not interactive, - # and 3 if an error occurred in a script and the variable ON_ERROR_STOP was set. - exit_code = response[0] - - if exit_code == 0: - return - elif exit_code == 1: - raise RuntimeError(response) - elif exit_code == 2: - continue - elif exit_code == 3: - raise RuntimeError(response) - - # if we break out of the loop, we assume that connection has been verified given enough sleep time - return - - -def drop_all_tables(port): - tables = get_tables(port) - cnx, cursor = get_cnx_cursor(port) - try: - for t in tables: - cursor.execute(SQL(f"DROP TABLE {t}")) - cnx.commit() - except Exception as e: - print("Error deleting tables {}".format(e)) - finally: - if cursor: - cursor.close() - - -@pytest.fixture(autouse=True) -def cleanup_tables(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func - drop_all_tables(connection_port) - create_all_tables(container, historian_version) diff --git a/volttrontesting/platform/dbutils/test_postgresqlfuncts.py b/volttrontesting/platform/dbutils/test_postgresqlfuncts.py index 360da93f41..99724a4799 100644 --- a/volttrontesting/platform/dbutils/test_postgresqlfuncts.py +++ b/volttrontesting/platform/dbutils/test_postgresqlfuncts.py @@ -1,10 +1,9 @@ import datetime -import itertools import os import logging -import pytest -from time import time +import gevent +import pytest try: import psycopg2 @@ -17,38 +16,40 @@ ) from volttron.platform import jsonapi from volttron.platform.dbutils.postgresqlfuncts import PostgreSqlFuncts -from volttrontesting.fixtures.docker_wrapper import create_container -from volttrontesting.utils.utils import get_rand_port logging.getLogger("urllib3.connectionpool").setLevel(logging.INFO) pytestmark = [pytest.mark.postgresqlfuncts, pytest.mark.dbutils, pytest.mark.unit] - -IMAGES = ["postgres:13"] -if "CI" in os.environ: - IMAGES.extend( - ["postgres:12", "postgres:11"] - ) - -ALLOW_CONNECTION_TIME = 10 -CONNECTION_HOST = "localhost" -TEST_DATABASE = "test_historian" -ROOT_USER = "postgres" -ROOT_PASSWORD = "password" -ENV_POSTGRESQL = { - "POSTGRES_USER": ROOT_USER, # defining user not necessary but added to be explicit - "POSTGRES_PASSWORD": ROOT_PASSWORD, - "POSTGRES_DB": TEST_DATABASE, -} DATA_TABLE = "data" TOPICS_TABLE = "topics" META_TABLE = "meta" AGG_TOPICS_TABLE = "aggregate_topics" AGG_META_TABLE = "aggregate_meta" +db_connection = None +user = 'postgres' +password = 'postgres' +historian_config = { + "connection": { + "type": "postgresql", + "params": { + 'dbname': 'test_historian', + 'port': os.environ.get("POSTGRES_PORT", 5432), + 'host': 'localhost', + 'user': os.environ.get("POSTGRES_USER", user), + 'password': os.environ.get("POSTGRES_PASSWORD", password) + } + } +} +table_names = { + "data_table": DATA_TABLE, + "topics_table": TOPICS_TABLE, + "meta_table": META_TABLE, + "agg_topics_table": AGG_TOPICS_TABLE, + "agg_meta_table": AGG_META_TABLE, +} - -def test_insert_meta_should_return_true(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_insert_meta_should_return_true(setup_functs): + sqlfuncts, historian_version = setup_functs if historian_version != "<4.0.0": pytest.skip("insert_meta() is called by historian only for schema <4.0.0") topic_id = "44" @@ -56,20 +57,21 @@ def test_insert_meta_should_return_true(get_container_func): expected_data = (44, '"foobar44"') res = sqlfuncts.insert_meta(topic_id, metadata) - + #db_connection.commit() + gevent.sleep(1) assert res is True - assert get_data_in_table(connection_port, "meta")[0] == expected_data + assert get_data_in_table("meta")[0] == expected_data + cleanup_tables(truncate_tables=["meta"], drop_tables=False) - -def test_update_meta_should_succeed(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_update_meta_should_succeed(setup_functs): + sqlfuncts, historian_version = setup_functs metadata = {"units": "count"} metadata_s = jsonapi.dumps(metadata) topic = "foobar" id = sqlfuncts.insert_topic(topic) sqlfuncts.insert_meta(id, {"fdjlj": "XXXX"}) - assert metadata_s not in get_data_in_table(connection_port, TOPICS_TABLE)[0] + assert metadata_s not in get_data_in_table(TOPICS_TABLE)[0] res = sqlfuncts.update_meta(id, metadata) @@ -77,34 +79,35 @@ def test_update_meta_should_succeed(get_container_func): expected_gteq_4 = [(1, topic, metadata_s)] assert res is True if historian_version == "<4.0.0": - assert get_data_in_table(connection_port, META_TABLE) == expected_lt_4 + assert get_data_in_table(META_TABLE) == expected_lt_4 + cleanup_tables(truncate_tables=[META_TABLE], drop_tables=False) else: - assert get_data_in_table(connection_port, TOPICS_TABLE) == expected_gteq_4 - + assert get_data_in_table(TOPICS_TABLE) == expected_gteq_4 + cleanup_tables(truncate_tables=[TOPICS_TABLE], drop_tables=False) -def test_setup_historian_tables_should_create_tables(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_setup_historian_tables_should_create_tables(setup_functs): + sqlfuncts, historian_version = setup_functs # get_container initializes db and sqlfuncts # to test setup explicitly drop tables and see if tables get created correctly - drop_all_tables(connection_port) + cleanup_tables(None, drop_tables=True) - tables_before_setup = get_tables(connection_port) + tables_before_setup = select_all_historian_tables() assert tables_before_setup == set() expected_tables = set(["data", "topics"]) sqlfuncts.setup_historian_tables() - actual_tables = get_tables(connection_port) + actual_tables = select_all_historian_tables() assert actual_tables == expected_tables -def test_setup_aggregate_historian_tables_should_create_aggregate_tables(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_setup_aggregate_historian_tables_should_create_aggregate_tables(setup_functs): + sqlfuncts, historian_version = setup_functs # get_container initializes db and sqlfuncts to test setup explicitly drop tables and see if tables get created - drop_all_tables(connection_port) - create_historian_tables(container, historian_version) + cleanup_tables(None, drop_tables=True) + create_historian_tables(historian_version) agg_topic_table = "aggregate_topics" agg_meta_table = "aggregate_meta" - original_tables = get_tables(connection_port) + original_tables = select_all_historian_tables() assert agg_topic_table not in original_tables assert agg_meta_table not in original_tables @@ -118,15 +121,15 @@ def test_setup_aggregate_historian_tables_should_create_aggregate_tables(get_con sqlfuncts.setup_aggregate_historian_tables() - updated_tables = get_tables(connection_port) + updated_tables = select_all_historian_tables() assert agg_topic_table in updated_tables assert agg_meta_table in updated_tables assert ( - describe_table(connection_port, agg_topic_table) + describe_table(agg_topic_table) == expected_agg_topic_fields ) assert ( - describe_table(connection_port, agg_meta_table) == expected_agg_meta_fields + describe_table(agg_meta_table) == expected_agg_meta_fields ) assert sqlfuncts.agg_topics_table == agg_topic_table assert sqlfuncts.agg_meta_table == agg_meta_table @@ -147,28 +150,30 @@ def test_setup_aggregate_historian_tables_should_create_aggregate_tables(get_con ), ], ) -def test_query_should_return_data(get_container_func, topic_ids, id_name_map, expected_values): - container, sqlfuncts, connection_port, historian_version = get_container_func - - query = f""" - INSERT INTO {DATA_TABLE} VALUES ('2020-06-01 12:30:59', 43, '[2,3]') - """ - seed_database(container, query) +def test_query_should_return_data(setup_functs, topic_ids, id_name_map, expected_values): + global db_connection + sqlfuncts, historian_version = setup_functs + # explicit drop and recreate as the test is repeated it multiple times (number of params * historian version) + create_all_tables(historian_version, sqlfuncts) + db_connection.commit() + query = f"""INSERT INTO {DATA_TABLE} VALUES ('2020-06-01 12:30:59', 43, '[2,3]')""" + seed_database(query) actual_values = sqlfuncts.query(topic_ids, id_name_map) assert actual_values == expected_values -def test_insert_topic_should_return_topic_id(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_insert_topic_should_return_topic_id(setup_functs): + sqlfuncts, historian_version = setup_functs topic = "football" expected_topic_id = 1 actual_topic_id = sqlfuncts.insert_topic(topic) assert actual_topic_id == expected_topic_id + cleanup_tables(truncate_tables=[TOPICS_TABLE], drop_tables=False) -def test_insert_topic_and_meta_query_should_succeed(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_insert_topic_and_meta_query_should_succeed(setup_functs): + sqlfuncts, historian_version = setup_functs if historian_version == "<4.0.0": pytest.skip("Not relevant for historian schema before 4.0.0") topic = "football" @@ -176,13 +181,14 @@ def test_insert_topic_and_meta_query_should_succeed(get_container_func): actual_id = sqlfuncts.insert_topic(topic, metadata=metadata) assert isinstance(actual_id, int) - result = get_data_in_table(connection_port, "topics")[0] + result = get_data_in_table("topics")[0] assert (actual_id, topic) == result[0:2] assert metadata == jsonapi.loads(result[2]) + cleanup_tables(truncate_tables=[TOPICS_TABLE], drop_tables=False) -def test_insert_agg_topic_should_return_agg_topic_id(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_insert_agg_topic_should_return_agg_topic_id(setup_functs): + sqlfuncts, historian_version = setup_functs topic = "some_agg_topic" agg_type = "AVG" @@ -194,12 +200,13 @@ def test_insert_agg_topic_should_return_agg_topic_id(get_container_func): ) assert isinstance(actual_id, int) - assert get_data_in_table(connection_port, AGG_TOPICS_TABLE)[0] == expected_data + assert get_data_in_table(AGG_TOPICS_TABLE)[0] == expected_data + cleanup_tables(truncate_tables=[AGG_TOPICS_TABLE], drop_tables=False) -def test_insert_data_should_return_true(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func - +def test_insert_data_should_return_true(setup_functs): + sqlfuncts, historian_version = setup_functs + cleanup_tables(truncate_tables=[DATA_TABLE], drop_tables=False) ts = "2001-09-11 08:46:00" topic_id = "11" data = "1wtc" @@ -208,11 +215,12 @@ def test_insert_data_should_return_true(get_container_func): res = sqlfuncts.insert_data(ts, topic_id, data) assert res is True - assert get_data_in_table(connection_port, "data") == expected_data + assert get_data_in_table(DATA_TABLE) == expected_data + cleanup_tables(truncate_tables=[DATA_TABLE], drop_tables=False) -def test_update_topic_should_return_true(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_update_topic_should_return_true(setup_functs): + sqlfuncts, historian_version = setup_functs topic = "football" actual_id = sqlfuncts.insert_topic(topic) @@ -220,11 +228,12 @@ def test_update_topic_should_return_true(get_container_func): result = sqlfuncts.update_topic("soccer", actual_id) assert result is True - assert (actual_id, "soccer") == get_data_in_table(connection_port, "topics")[0][0:2] + assert (actual_id, "soccer") == get_data_in_table(TOPICS_TABLE)[0][0:2] + cleanup_tables(truncate_tables=[TOPICS_TABLE], drop_tables=False) -def test_update_topic_and_metadata_should_succeed(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_update_topic_and_metadata_should_succeed(setup_functs): + sqlfuncts, historian_version = setup_functs if historian_version == "<4.0.0": pytest.skip("Not relevant for historian schema before 4.0.0") topic = "football" @@ -235,12 +244,12 @@ def test_update_topic_and_metadata_should_succeed(get_container_func): result = sqlfuncts.update_topic("soccer", actual_id, metadata={"test": "test value"}) assert result is True - assert (actual_id, "soccer", '{"test": "test value"}') == get_data_in_table(connection_port, "topics")[0] - + assert (actual_id, "soccer", '{"test": "test value"}') == get_data_in_table("topics")[0] + cleanup_tables(truncate_tables=[TOPICS_TABLE], drop_tables=False) -def test_get_aggregation_list_should_return_list(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_get_aggregation_list_should_return_list(setup_functs): + sqlfuncts, historian_version = setup_functs expected_list = [ "AVG", @@ -264,49 +273,49 @@ def test_get_aggregation_list_should_return_list(get_container_func): assert sqlfuncts.get_aggregation_list() == expected_list -def test_insert_agg_topic_should_return_true(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func - +def test_insert_agg_topic_should_return_true(setup_functs): + sqlfuncts, historian_version = setup_functs + cleanup_tables(truncate_tables=[AGG_TOPICS_TABLE], drop_tables=False) topic = "some_agg_topic" agg_type = "AVG" agg_time_period = "2019" - expected_data = (1, "some_agg_topic", "AVG", "2019") + expected_data = ("some_agg_topic", "AVG", "2019") actual_id = sqlfuncts.insert_agg_topic( topic, agg_type, agg_time_period ) assert isinstance(actual_id, int) - assert get_data_in_table(connection_port, AGG_TOPICS_TABLE)[0] == expected_data - + assert get_data_in_table(AGG_TOPICS_TABLE)[0][1:] == expected_data -def test_update_agg_topic_should_return_true(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_update_agg_topic_should_return_true(setup_functs): + sqlfuncts, historian_version = setup_functs + cleanup_tables(truncate_tables=[AGG_TOPICS_TABLE], drop_tables=False) topic = "cars" agg_type = "SUM" agg_time_period = "2100ZULU" - expected_data = (1, "cars", "SUM", "2100ZULU") + expected_data = ("cars", "SUM", "2100ZULU") actual_id = sqlfuncts.insert_agg_topic( topic, agg_type, agg_time_period ) assert isinstance(actual_id, int) - assert get_data_in_table(connection_port, AGG_TOPICS_TABLE)[0] == expected_data + assert get_data_in_table(AGG_TOPICS_TABLE)[0][1:] == expected_data new_agg_topic_name = "boats" - expected_data = (1, "boats", "SUM", "2100ZULU") + expected_data = ("boats", "SUM", "2100ZULU") result = sqlfuncts.update_agg_topic(actual_id, new_agg_topic_name) assert result is True - assert get_data_in_table(connection_port, AGG_TOPICS_TABLE)[0] == expected_data + assert get_data_in_table(AGG_TOPICS_TABLE)[0][1:] == expected_data -def test_insert_agg_meta_should_return_true(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func - +def test_insert_agg_meta_should_return_true(setup_functs): + sqlfuncts, historian_version = setup_functs + cleanup_tables(truncate_tables=[AGG_META_TABLE], drop_tables=False) topic_id = 42 # metadata must be in the following convention because aggregation methods, i.e. get_agg_topics, rely on metadata having a key called "configured_topics" metadata = {"configured_topics": "meaning of life"} @@ -315,19 +324,22 @@ def test_insert_agg_meta_should_return_true(get_container_func): result = sqlfuncts.insert_agg_meta(topic_id, metadata) assert result is True - assert get_data_in_table(connection_port, AGG_META_TABLE)[0] == expected_data - + assert get_data_in_table(AGG_META_TABLE)[0] == expected_data -def test_get_topic_map_should_return_maps(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_get_topic_map_should_return_maps(setup_functs): + global db_connection + sqlfuncts, historian_version = setup_functs + create_all_tables(historian_version, sqlfuncts) + db_connection.commit() + gevent.sleep(0.5) query = """ INSERT INTO topics (topic_name) VALUES ('football'); INSERT INTO topics (topic_name) VALUES ('baseball'); """ - seed_database(container, query) + seed_database(query) expected = ( {"baseball": 2, "football": 1}, {"baseball": "baseball", "football": "football"}, @@ -338,25 +350,28 @@ def test_get_topic_map_should_return_maps(get_container_func): assert actual == expected -def test_get_topic_meta_map_should_return_maps(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_get_topic_meta_map_should_return_maps(setup_functs): + sqlfuncts, historian_version = setup_functs + if historian_version == "<4.0.0": pytest.skip("method applied only to version >=4.0.0") else: + create_all_tables(historian_version, sqlfuncts) + gevent.sleep(1) query = """ INSERT INTO topics (topic_name) VALUES ('football'); INSERT INTO topics (topic_name, metadata) - VALUES ('baseball', '{\\"meta\\":\\"value\\"}'); + VALUES ('baseball', '{"meta":"value"}'); """ - seed_database(container, query) + seed_database(query) expected = {1: None, 2: {"meta": "value"}} actual = sqlfuncts.get_topic_meta_map() assert actual == expected -def test_get_agg_topics_should_return_list(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_get_agg_topics_should_return_list(setup_functs): + sqlfuncts, historian_version = setup_functs topic = "some_agg_topic" agg_type = "AVG" @@ -373,19 +388,17 @@ def test_get_agg_topics_should_return_list(get_container_func): assert actual_list == expected_list -def test_get_agg_topic_map_should_return_dict(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func - +def test_get_agg_topic_map_should_return_dict(setup_functs): + sqlfuncts, historian_version = setup_functs + create_all_tables(historian_version, sqlfuncts) query = f""" INSERT INTO {AGG_TOPICS_TABLE} (agg_topic_name, agg_type, agg_time_period) VALUES ('topic_name', 'AVG', '2001'); """ - seed_database(container, query) + seed_database(query) expected = {("topic_name", "AVG", "2001"): 1} - actual = sqlfuncts.get_agg_topic_map() - assert actual == expected @@ -407,15 +420,17 @@ def test_get_agg_topic_map_should_return_dict(get_container_func): ], ) def test_query_topics_by_pattern_should_return_matching_results( - get_container_func, + setup_functs, topic_1, topic_2, topic_3, topic_pattern, expected_result ): - container, sqlfuncts, connection_port, historian_version = get_container_func + sqlfuncts, historian_version = setup_functs + create_all_tables(historian_version, sqlfuncts) + gevent.sleep(2) query = f""" INSERT INTO {TOPICS_TABLE} (topic_name) VALUES ({topic_1}); @@ -424,14 +439,13 @@ def test_query_topics_by_pattern_should_return_matching_results( INSERT INTO {TOPICS_TABLE} (topic_name) VALUES ({topic_3}); """ - seed_database(container, query) - + seed_database(query) actual_result = sqlfuncts.query_topics_by_pattern(topic_pattern) assert actual_result == expected_result -def test_create_aggregate_store_should_succeed(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_create_aggregate_store_should_succeed(setup_functs): + sqlfuncts, historian_version = setup_functs agg_type = "AVG" agg_time_period = "1984" @@ -440,12 +454,12 @@ def test_create_aggregate_store_should_succeed(get_container_func): sqlfuncts.create_aggregate_store(agg_type, agg_time_period) - assert expected_aggregate_table in get_tables(connection_port) - assert describe_table(connection_port, expected_aggregate_table) == expected_fields + assert expected_aggregate_table in select_all_historian_tables() + assert describe_table(expected_aggregate_table) == expected_fields -def test_insert_aggregate_stmt_should_succeed(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_insert_aggregate_stmt_should_succeed(setup_functs): + sqlfuncts, historian_version = setup_functs # be aware that Postgresql will automatically fold unquoted names into lower case # From : https://www.postgresql.org/docs/current/sql-syntax-lexical.html @@ -465,7 +479,7 @@ def test_insert_aggregate_stmt_should_succeed(get_container_func): UNIQUE(ts, topic_id)); CREATE INDEX IF NOT EXISTS idx_avg_1776 ON avg_1776 (ts ASC); """ - seed_database(container, query) + seed_database(query) agg_topic_id = 42 agg_type = "avg" @@ -485,11 +499,11 @@ def test_insert_aggregate_stmt_should_succeed(get_container_func): ) assert res is True - assert get_data_in_table(connection_port, "avg_1776")[0] == expected_data + assert get_data_in_table("avg_1776")[0] == expected_data -def test_collect_aggregate_stmt_should_return_rows(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_collect_aggregate_stmt_should_return_rows(setup_functs): + sqlfuncts, historian_version = setup_functs query = f""" INSERT INTO {DATA_TABLE} @@ -497,7 +511,7 @@ def test_collect_aggregate_stmt_should_return_rows(get_container_func): INSERT INTO {DATA_TABLE} VALUES ('2020-06-01 12:31:59', 43, '8') """ - seed_database(container, query) + seed_database(query) topic_ids = [42, 43] agg_type = "avg" @@ -508,106 +522,81 @@ def test_collect_aggregate_stmt_should_return_rows(get_container_func): assert actual_aggregate == expected_aggregate -def test_collect_aggregate_stmt_should_raise_value_error(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func +def test_collect_aggregate_stmt_should_raise_value_error(setup_functs): + sqlfuncts, historian_version = setup_functs with pytest.raises(ValueError): sqlfuncts.collect_aggregate("dfdfadfdadf", "Invalid agg type") -def get_postgresqlfuncts(port): - connect_params = { - "dbname": TEST_DATABASE, - "user": ROOT_USER, - "password": ROOT_PASSWORD, - "host": "localhost", - "port": port, - } +@pytest.fixture(scope="module", params=[ + ('<4.0.0', os.environ.get("POSTGRES_PORT", 5432)), + ('<4.0.0', os.environ.get("TIMESCALE_PORT", 5433)), + ('>=4.0.0', os.environ.get("POSTGRES_PORT", 5432)), + ('>=4.0.0', os.environ.get("POSTGRES_PORT", 5433)) + ]) +def setup_functs(request): + global db_connection, historian_config, table_names + historian_version = request.param[0] + port = request.param[1] + historian_config["connection"]["params"]["port"] = port - table_names = { - "data_table": DATA_TABLE, - "topics_table": TOPICS_TABLE, - "meta_table": META_TABLE, - "agg_topics_table": AGG_TOPICS_TABLE, - "agg_meta_table": AGG_META_TABLE, - } + db_connection = psycopg2.connect(**historian_config["connection"]["params"]) + db_connection.autocommit = True + create_all_tables(historian_version) + postgresfuncts = PostgreSqlFuncts(historian_config["connection"]["params"], table_names) + postgresfuncts.setup_historian_tables() + yield postgresfuncts, historian_version - return PostgreSqlFuncts(connect_params, table_names) - -@pytest.fixture(scope="module", params=itertools.product( - IMAGES, - [ - '<4.0.0', - '>=4.0.0' - ])) -def get_container_func(request): - global CONNECTION_HOST - historian_version = request.param[1] - kwargs = {'env': ENV_POSTGRESQL} - if os.path.exists("/.dockerenv"): - print("Running test within docker container.") - connection_port = 5432 - CONNECTION_HOST = 'postgresql_test' - kwargs['hostname'] = CONNECTION_HOST - else: - ports_dict = ports_config() - kwargs['ports'] = ports_dict["ports"] - connection_port = ports_dict["port_on_host"] - CONNECTION_HOST = 'localhost' - - with create_container(request.param[0], **kwargs) as container: - wait_for_connection(container, connection_port) - create_all_tables(container, historian_version) - postgresfuncts = get_postgresqlfuncts(connection_port) - postgresfuncts.setup_historian_tables() - yield container, postgresfuncts, connection_port, historian_version - - -def ports_config(): - port_on_host = get_rand_port(ip="5432") - return {"port_on_host": port_on_host, "ports": {"5432/tcp": port_on_host}} - - -def create_all_tables(container, historian_version): - create_historian_tables(container, historian_version) - create_aggregate_tables(container, historian_version) +def create_all_tables(historian_version, sqlfuncts=None): + try: + cleanup_tables(table_names.values(), drop_tables=True) + except Exception as exc: + print('Error truncating existing tables: {}'.format(exc)) + create_historian_tables(historian_version, sqlfuncts) + create_aggregate_tables(historian_version) -def create_historian_tables(container, historian_version): +def create_historian_tables(historian_version, sqlfuncts=None): + global db_connection, historian_config, table_names + cursor = db_connection.cursor() if historian_version == "<4.0.0": - query = f""" - CREATE TABLE IF NOT EXISTS {DATA_TABLE} ( - ts TIMESTAMP NOT NULL, - topic_id INTEGER NOT NULL, - value_string TEXT NOT NULL, - UNIQUE (topic_id, ts)); - CREATE TABLE IF NOT EXISTS {TOPICS_TABLE} ( - topic_id SERIAL PRIMARY KEY NOT NULL, - topic_name VARCHAR(512) NOT NULL, - UNIQUE (topic_name)); - CREATE TABLE IF NOT EXISTS {META_TABLE} ( - topic_id INTEGER PRIMARY KEY NOT NULL, - metadata TEXT NOT NULL); - """ - else: - query = f""" - CREATE TABLE IF NOT EXISTS {DATA_TABLE} ( - ts TIMESTAMP NOT NULL, - topic_id INTEGER NOT NULL, - value_string TEXT NOT NULL, - UNIQUE (topic_id, ts)); - CREATE TABLE IF NOT EXISTS {TOPICS_TABLE} ( - topic_id SERIAL PRIMARY KEY NOT NULL, - topic_name VARCHAR(512) NOT NULL, - metadata TEXT, - UNIQUE (topic_name)); - """ - seed_database(container, query) + print("Setting up for version <4.0.0") + cursor = db_connection.cursor() + cursor.execute(SQL( + 'CREATE TABLE IF NOT EXISTS {} (' + 'ts TIMESTAMP NOT NULL, ' + 'topic_id INTEGER NOT NULL, ' + 'value_string TEXT NOT NULL, ' + 'UNIQUE (topic_id, ts)' + ')').format(Identifier(table_names['data_table']))) + cursor.execute(SQL( + 'CREATE INDEX IF NOT EXISTS {} ON {} (ts ASC)').format( + Identifier('idx_' + table_names['data_table']), + Identifier(table_names['data_table']))) + cursor.execute(SQL( + 'CREATE TABLE IF NOT EXISTS {} (' + 'topic_id SERIAL PRIMARY KEY NOT NULL, ' + 'topic_name VARCHAR(512) NOT NULL, ' + 'UNIQUE (topic_name)' + ')').format(Identifier(table_names['topics_table']))) + cursor.execute(SQL( + 'CREATE TABLE IF NOT EXISTS {} (' + 'topic_id INTEGER PRIMARY KEY NOT NULL, ' + 'metadata TEXT NOT NULL' + ')').format(Identifier(table_names['meta_table']))) + db_connection.commit() + cursor.close() + elif sqlfuncts: + sqlfuncts.setup_historian_tables() + gevent.sleep(5) return -def create_aggregate_tables(container, historian_version): +def create_aggregate_tables(historian_version): + global db_connection + cursor = db_connection.cursor() if historian_version == "<4.0.0": query = f""" CREATE TABLE IF NOT EXISTS {AGG_TOPICS_TABLE} ( @@ -632,125 +621,73 @@ def create_aggregate_tables(container, historian_version): agg_topic_id INTEGER PRIMARY KEY NOT NULL, metadata TEXT NOT NULL); """ - seed_database(container, query) - return - - -def seed_database(container, query): - command = ( - f'psql --username="{ROOT_USER}" --dbname="{TEST_DATABASE}" --command="{query}"' - ) - r = container.exec_run(cmd=command, tty=True) - print(r) - if r[0] == 1: - raise RuntimeError( - f"SQL query did not successfully complete on the container: \n {r}" - ) + cursor.execute(SQL(query)) + db_connection.commit() + cursor.close() return -def get_tables(port): - cnx, cursor = get_cnx_cursor(port) - # unlike MYSQL, Postgresql does not have a "SHOW TABLES" shortcut - # we have to create the query ourselves - query = SQL( - "SELECT table_name " - "FROM information_schema.tables " - "WHERE table_type = 'BASE TABLE' and " - "table_schema not in ('pg_catalog', 'information_schema')" - ) - results = execute_statement(cnx, cursor, query) - - return {t[0] for t in results} - - -def describe_table(port, table): - cnx, cursor = get_cnx_cursor(port) - query = SQL( - "SELECT column_name " "FROM information_schema.columns " "WHERE table_name = %s" - ) +def select_all_historian_tables(): + global db_connection + cursor = db_connection.cursor() + tables = [] + try: + cursor.execute(f"""SELECT table_name FROM information_schema.tables + WHERE table_catalog = 'test_historian' and table_schema = 'public'""") + rows = cursor.fetchall() + print(f"table names {rows}") + tables = [columns[0] for columns in rows] + except Exception as e: + print("Error getting list of {}".format(e)) + finally: + if cursor: + cursor.close() + return set(tables) - results = execute_statement(cnx, cursor, query, args=[table]) +def describe_table(table): + global db_connection + cursor = db_connection.cursor() + query = SQL(f"SELECT column_name FROM information_schema.columns WHERE table_name='{table}'") + cursor.execute(query, vars=table) + results = cursor.fetchall() + cursor.close() return {t[0] for t in results} -def get_data_in_table(port, table): - cnx, cursor = get_cnx_cursor(port) +def get_data_in_table(table): + global db_connection + cursor = db_connection.cursor() query = SQL("SELECT * " "FROM {table_name}").format(table_name=Identifier(table)) - - results = execute_statement(cnx, cursor, query) - - return results - - -def execute_statement(cnx, cursor, query, args=None): - cursor.execute(query, vars=args) - + cursor.execute(query) results = cursor.fetchall() - cursor.close() - cnx.close() - return results +def cleanup_tables(truncate_tables, drop_tables=False): + global db_connection + cursor = db_connection.cursor() + if truncate_tables is None: + truncate_tables = select_all_historian_tables() -def get_cnx_cursor(port): - connect_params = { - "database": TEST_DATABASE, - "user": ROOT_USER, - "password": ROOT_PASSWORD, - "host": "localhost", - "port": port, - } - - cnx = psycopg2.connect(**connect_params) - cursor = cnx.cursor() - - return cnx, cursor - - -def wait_for_connection(container, port): - start_time = time() - while time() - start_time < ALLOW_CONNECTION_TIME: - command = f"psql --user={ROOT_USER} --dbname={TEST_DATABASE} --port={port}" - response = container.exec_run(command, tty=True) - # https://www.postgresql.org/docs/10/app-psql.html#id-1.9.4.18.7 - # psql returns 0 to the shell if it finished normally, - # 1 if a fatal error of its own occurs (e.g. out of memory, file not found), - # 2 if the connection to the server went bad and the session was not interactive, - # and 3 if an error occurred in a script and the variable ON_ERROR_STOP was set. - exit_code = response[0] - - if exit_code == 0: - return - elif exit_code == 1: - raise RuntimeError(response) - elif exit_code == 2: - continue - elif exit_code == 3: - raise RuntimeError(response) - - # if we break out of the loop, we assume that connection has been verified given enough sleep time - return + if drop_tables: + for table in truncate_tables: + if table: + cursor.execute(SQL('DROP TABLE IF EXISTS {}').format(Identifier(table))) + else: + for table in truncate_tables: + if table: + cursor.execute(SQL('TRUNCATE TABLE {}').format(Identifier(table))) + db_connection.commit() + cursor.close() -def drop_all_tables(port): - tables = get_tables(port) - cnx, cursor = get_cnx_cursor(port) +def seed_database(sql): + global db_connection + cursor = db_connection.cursor() try: - for t in tables: - cursor.execute(SQL(f'DROP TABLE {t}')) - cnx.commit() - except Exception as e: - print("Error deleting tables {}".format(e)) - finally: - if cursor: - cursor.close() - - -@pytest.fixture(autouse=True) -def cleanup_tables(get_container_func): - container, sqlfuncts, connection_port, historian_version = get_container_func - drop_all_tables(connection_port) - create_all_tables(container, historian_version) + cursor.execute(sql) + except psycopg2.errors.UndefinedTable as e: + print(e) + cursor.close() + db_connection.commit() diff --git a/volttrontesting/testutils/test_docker_wrapper.py b/volttrontesting/testutils/test_docker_wrapper.py deleted file mode 100644 index 67fa4a7d56..0000000000 --- a/volttrontesting/testutils/test_docker_wrapper.py +++ /dev/null @@ -1,45 +0,0 @@ -import pytest - - -try: - SKIP_DOCKER = False - from volttrontesting.fixtures.docker_wrapper import create_container -except ImportError: - SKIP_DOCKER = True - -SKIP_REASON = "No docker available in api (install pip install docker) for availability" - - -@pytest.mark.skipif(SKIP_DOCKER, reason=SKIP_REASON) -def test_docker_wrapper(): - with create_container("mysql", ports={"3306/tcp": 3306}, env={"MYSQL_ROOT_PASSWORD": "12345"}) as container: - print(f"\nStatus: {container.status}") - print(f"\nLogs: {container.logs()}") - assert container.status == 'running' - - -@pytest.mark.skipif(SKIP_DOCKER, reason=SKIP_REASON) -def test_docker_run_crate_latest(): - with create_container("crate", ports={"4200/tcp": 4200}) as container: - assert container.status == 'running' - - -@pytest.mark.skipif(SKIP_DOCKER, reason=SKIP_REASON) -def test_docker_wrapper_should_throw_runtime_error_on_false_image_when_pull(): - with pytest.raises(RuntimeError) as execinfo: - with create_container("not_a_real_image", ports={"4200/tcp": 4200}) as container: - container.logs() - - assert "404 Client Error" in str(execinfo.value) - - -@pytest.mark.skipif(SKIP_DOCKER, reason=SKIP_REASON) -def test_docker_wrapper_should_throw_runtime_error_when_ports_clash(): - port = 4200 - with pytest.raises(RuntimeError) as execinfo: - with create_container("crate", ports={"4200/tcp": port}): - with create_container("crate", ports={"4200/tcp": port}) as container2: - assert container2.status == 'running' - - assert "500 Server Error" in str(execinfo.value) -