11
11
# License for the specific language governing permissions and limitations
12
12
# under the License.
13
13
import os
14
+ import re
15
+ from datetime import UTC , datetime
16
+ from time import sleep
14
17
from typing import Optional
18
+
19
+ from testcontainers .core .config import MAX_TRIES , SLEEP_TIME
15
20
from testcontainers .core .generic import DbContainer
16
21
from testcontainers .core .utils import raise_for_deprecated_parameter
22
+ from testcontainers .core .waiting_utils import wait_container_is_ready
17
23
18
24
19
25
class PostgresContainer (DbContainer ):
@@ -63,3 +69,27 @@ def get_connection_url(self, host=None) -> str:
63
69
password = self .password , dbname = self .dbname , host = host ,
64
70
port = self .port ,
65
71
)
72
+
73
+ @wait_container_is_ready ()
74
+ def _connect (self ) -> None :
75
+ count = 0
76
+
77
+ # ALTERNATE IMPLEMENTATION based on comments from @HofmeisterAn
78
+ # expr = re.compile(r'(?P<ts>\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{9}Z).*')
79
+ # timestamp = datetime.now()
80
+ while count < MAX_TRIES :
81
+ status , _ = self .exec (f"pg_isready -hlocalhost -p{ self .port } -U{ self .username } " )
82
+ if status == 0 :
83
+ return
84
+
85
+ # ALTERNATE IMPLEMENTATION based on comments from @HofmeisterAn
86
+ # stdout = self._container.logs(stderr = False, timestamps = True, since = timestamp)
87
+ # lines = stdout.decode("utf-8").split("\n")
88
+ # for line in lines:
89
+ # if m:= re.match(expr, line):
90
+ # timestamp = datetime.fromisoformat(m.groupdict()["ts"]).replace(tzinfo=None)
91
+ # if "database system is ready to accept connections" in line:
92
+ # return
93
+
94
+ sleep (SLEEP_TIME )
95
+ count += 1
0 commit comments