11import threading
22import time
33
4+ import psycopg
45import pytest
6+ from django .conf import settings
57from django .db import connection
68from django .db .utils import OperationalError
9+ from django .test import override_settings
710
8- from ansible_base .lib .utils .db import advisory_lock , migrations_are_complete
11+ from ansible_base .lib .utils .db import (
12+ advisory_lock ,
13+ get_pg_notify_params ,
14+ migrations_are_complete ,
15+ psycopg_conn_string_from_settings_dict ,
16+ psycopg_kwargs_from_settings_dict ,
17+ )
918
1019
1120@pytest .mark .django_db
@@ -14,6 +23,115 @@ def test_migrations_are_complete():
1423 assert migrations_are_complete ()
1524
1625
26+ class TestPGNotifyConnection :
27+ TEST_DATABASE_DICT = {
28+ "default" : {
29+ "ENGINE" : "django.db.backends.postgresql" ,
30+ "HOST" : "https://foo.invalid" ,
31+ "PORT" : 55434 ,
32+ "USER" : "dab_user" ,
33+ "PASSWORD" : "dabbing" ,
34+ "NAME" : "dab_db" ,
35+ }
36+ }
37+ PSYCOPG_KWARGS = {
38+ 'dbname' : 'dab_db' ,
39+ 'client_encoding' : 'UTF8' ,
40+ # kwargs containing objects can not be compared so they will be ignored
41+ # 'cursor_factory': <class 'django.db.backends.postgresql.base.Cursor'>,
42+ 'user' : 'dab_user' ,
43+ 'password' : 'dabbing' ,
44+ 'host' : 'https://foo.invalid' ,
45+ 'port' : 55434 ,
46+ # 'context': <psycopg.adapt.AdaptersMap object at 0x7f537f2d9f70>,
47+ 'prepare_threshold' : None ,
48+ 'autocommit' : True ,
49+ }
50+
51+ @pytest .fixture
52+ def mock_settings (self ):
53+ with override_settings (DATABASES = self .TEST_DATABASE_DICT , USE_TZ = False ):
54+ yield
55+
56+ def _trim_python_objects (self , psycopg_params ):
57+ # These remove those commented-out kwargs in PSYCOPG_KWARGS
58+ psycopg_params .pop ('cursor_factory' )
59+ psycopg_params .pop ('context' )
60+ return psycopg_params
61+
62+ def test_default_behavior (self , mock_settings ):
63+ params = self ._trim_python_objects (get_pg_notify_params ())
64+ assert params == self .PSYCOPG_KWARGS
65+
66+ def test_pg_notify_extra_options (self , mock_settings ):
67+ params = self ._trim_python_objects (get_pg_notify_params (application_name = 'joe_connection' ))
68+ expected = self .PSYCOPG_KWARGS .copy ()
69+ expected ['application_name' ] = 'joe_connection'
70+ assert params == expected
71+
72+ def test_lister_databases (self , mock_settings ):
73+ LISTENER_DATABASES = {"default" : {"HOST" : "https://foo.anotherhost.invalid" }}
74+ with override_settings (LISTENER_DATABASES = LISTENER_DATABASES ):
75+ params = self ._trim_python_objects (get_pg_notify_params ())
76+ assert params ['host' ] == "https://foo.anotherhost.invalid"
77+
78+ def test_pg_notify_databases (self , mock_settings ):
79+ PG_NOTIFY_DATABASES = {"default" : {"HOST" : "https://foo.anotherhost2.invalid" }}
80+ with override_settings (PG_NOTIFY_DATABASES = PG_NOTIFY_DATABASES ):
81+ params = self ._trim_python_objects (get_pg_notify_params ())
82+ assert params ['host' ] == "https://foo.anotherhost2.invalid"
83+
84+ def test_psycopg_kwargs_from_settings_dict (self ):
85+ "More of a unit test, doing the same thing"
86+ test_dict = self .TEST_DATABASE_DICT ["default" ].copy ()
87+ test_dict ['OPTIONS' ] = {'autocommit' : True }
88+ with override_settings (USE_TZ = False ):
89+ psycopg_params = self ._trim_python_objects (psycopg_kwargs_from_settings_dict (test_dict ))
90+ assert psycopg_params == self .PSYCOPG_KWARGS
91+
92+ def test_psycopg_kwargs_use (self ):
93+ "This assures that the data we get for the kwargs are usable, and demos how to use"
94+ if connection .vendor == 'sqlite' :
95+ pytest .skip ('Test needs to connect to postgres which is not running' )
96+
97+ test_dict = settings .DATABASES ['default' ].copy ()
98+ test_dict ['OPTIONS' ] = {'autocommit' : True }
99+ with override_settings (USE_TZ = False ):
100+ psycopg_params = self ._trim_python_objects (psycopg_kwargs_from_settings_dict (test_dict ))
101+
102+ psycopg .connect (** psycopg_params )
103+
104+ def test_listener_string_production (self ):
105+ "This is a test to correspond to PG_NOTIFY_DSN_SERVER type settings in eda-server"
106+ with override_settings (USE_TZ = False ):
107+ args = psycopg_conn_string_from_settings_dict (
108+ {
109+ "ENGINE" : "django.db.backends.postgresql" ,
110+ "HOST" : "127.0.0.1" ,
111+ "PORT" : 5432 ,
112+ "USER" : "postgres" ,
113+ "PASSWORD" : "DB_PASSWORD" ,
114+ "NAME" : "eda" ,
115+ "OPTIONS" : {
116+ "sslmode" : "allow" ,
117+ "sslcert" : "" ,
118+ "sslkey" : "" ,
119+ "sslrootcert" : "" ,
120+ },
121+ }
122+ )
123+ assert args == (
124+ "dbname=eda sslmode=allow sslcert='' sslkey='' sslrootcert='' client_encoding=UTF8 user=postgres password=DB_PASSWORD host=127.0.0.1 port=5432"
125+ )
126+
127+ def test_listener_string_production_use (self ):
128+ "This assures that the data we get for the connection string is usable, and demos how to use"
129+ if connection .vendor == 'sqlite' :
130+ pytest .skip ('Test needs to connect to postgres which is not running' )
131+ args = psycopg_conn_string_from_settings_dict (settings .DATABASES ['default' ])
132+ psycopg .connect (conninfo = args )
133+
134+
17135class TestAdvisoryLock :
18136 @pytest .fixture (autouse = True )
19137 def skip_if_sqlite (self ):
@@ -36,7 +154,7 @@ def background_task(django_db_blocker):
36154 def test_determine_lock_is_held (self , django_db_blocker ):
37155 thread = threading .Thread (target = TestAdvisoryLock .background_task , args = (django_db_blocker ,))
38156 thread .start ()
39- for _ in range (5 ):
157+ for _ in range (20 ):
40158 with advisory_lock ('background_task_lock' , wait = False ) as held :
41159 if held is False :
42160 break
0 commit comments