-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathcommon.py
More file actions
183 lines (154 loc) · 6.06 KB
/
common.py
File metadata and controls
183 lines (154 loc) · 6.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
from RLTest import Env, Defaults
import json
import signal
import time
import unittest
import inspect
import os
import tempfile
from packaging import version
Defaults.decode_responses = True
class ShardsConnectionTimeoutException(Exception):
pass
class TimeLimit(object):
"""
A context manager that fires a TimeExpired exception if it does not
return within the specified amount of time.
"""
def __init__(self, timeout):
self.timeout = timeout
def __enter__(self):
signal.signal(signal.SIGALRM, self.handler)
signal.setitimer(signal.ITIMER_REAL, self.timeout, 0)
def __exit__(self, exc_type, exc_value, traceback):
signal.setitimer(signal.ITIMER_REAL, 0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)
def handler(self, signum, frame):
raise ShardsConnectionTimeoutException()
class Colors(object):
@staticmethod
def Cyan(data):
return '\033[36m' + data + '\033[0m'
@staticmethod
def Yellow(data):
return '\033[33m' + data + '\033[0m'
@staticmethod
def Bold(data):
return '\033[1m' + data + '\033[0m'
@staticmethod
def Bred(data):
return '\033[31;1m' + data + '\033[0m'
@staticmethod
def Gray(data):
return '\033[30;1m' + data + '\033[0m'
@staticmethod
def Lgray(data):
return '\033[30;47m' + data + '\033[0m'
@staticmethod
def Blue(data):
return '\033[34m' + data + '\033[0m'
@staticmethod
def Green(data):
return '\033[32m' + data + '\033[0m'
BASE_JAR_FILE = './gears_tests/build/gears_tests.jar'
def getConnectionByEnv(env):
conn = None
if env.env == 'oss-cluster':
conn = env.envRunner.getClusterConnection()
else:
conn = env.getConnection()
return conn
def runSkipTests():
return True if os.environ.get('RUN_SKIPED_TESTS', False) else False
def waitBeforeTestStart():
return True if os.environ.get('HOLD', False) else False
def shardsConnections(env):
for s in range(1, env.shardsCount + 1):
yield env.getConnection(shardId=s)
def verifyClusterInitialized(env):
for conn in shardsConnections(env):
try:
# try to promote to internal connection
conn.execute_command('debug', 'MARK-INTERNAL-CLIENT')
except Exception:
pass
allConnected = False
while not allConnected:
res = conn.execute_command('MRTESTS.INFOCLUSTER')
nodes = res[4]
allConnected = True
for n in nodes:
status = n[17]
if status != 'connected':
allConnected = False
if not allConnected:
time.sleep(0.1)
def initialiseCluster(env):
env.broadcast('MRTESTS.REFRESHCLUSTER')
if env.isCluster():
# make sure cluster will not turn to failed state and we will not be
# able to execute commands on shards, on slow envs, run with valgrind,
# or mac, it is needed.
env.broadcast('CONFIG', 'set', 'cluster-node-timeout', '120000')
for conn in shardsConnections(env):
try:
conn.execute_command('debug', 'MARK-INTERNAL-CLIENT')
except Exception as e:
print(e)
pass
conn.execute_command('MRTESTS.FORCESHARDSCONNECTION')
with TimeLimit(2):
verifyClusterInitialized(env)
# Creates a temporary file with the content provided.
# Returns the filepath of the created file.
def create_config_file(content) -> str:
with tempfile.NamedTemporaryFile(delete=False, dir=os.getcwd()) as f:
f.write(content.encode())
return f.name
# Returns the redis-server version without starting the server.
def get_redis_version():
redis_binary = os.environ.get('REDIS_SERVER', Defaults.binary)
version_output = os.popen('%s --version' % redis_binary).read()
version_number = version_output.split()[2][2:].strip()
return version.parse(version_number)
def is_redis_version_is_lower_than(required_version):
return get_redis_version() < version.parse(required_version)
def skip_if_redis_version_is_lower_than(required_version):
if is_redis_version_is_lower_than(required_version):
raise unittest.SkipTest()
def MRTestDecorator(redisConfigFileContent=None, moduleArgs=None, skipTest=False, skipClusterInitialisation=False, skipOnVersionLowerThan=None, skipOnSingleShard=False, skipOnCluster=False, skipOnValgrind=False, envArgs={}):
def test_func_generator(test_function):
def test_func():
test_name = '%s:%s' % (inspect.getfile(test_function), test_function.__name__)
if skipTest and not runSkipTests():
raise unittest.SkipTest()
if skipOnVersionLowerThan:
skip_if_redis_version_is_lower_than(skipOnVersionLowerThan)
defaultModuleArgs = 'password'
if not is_redis_version_is_lower_than('8.0.0'):
# We provide password only if version < 8.0.0. If version is greater, we have internal command and we do not need the password.
defaultModuleArgs = None
envArgs['moduleArgs'] = moduleArgs or defaultModuleArgs
envArgs['redisConfigFile'] = create_config_file(redisConfigFileContent) if redisConfigFileContent else None
env = Env(**envArgs)
conn = getConnectionByEnv(env)
if skipOnSingleShard:
if env.shardsCount == 1:
raise unittest.SkipTest()
if skipOnCluster:
if 'cluster' in env.env:
raise unittest.SkipTest()
if skipOnValgrind:
if env.debugger is not None:
raise unittest.SkipTest()
args = {
'env': env,
'conn': conn
}
if not skipClusterInitialisation:
initialiseCluster(env)
if waitBeforeTestStart():
input('\tpress any button to continue test %s' % test_name)
test_function(**args)
return test_func
return test_func_generator