This repository was archived by the owner on Apr 18, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 125
Expand file tree
/
Copy path_core_testing.py
More file actions
113 lines (86 loc) · 4.8 KB
/
_core_testing.py
File metadata and controls
113 lines (86 loc) · 4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import collections
import random
import unittest
import gearman.util
from gearman.command_handler import GearmanCommandHandler
from gearman.connection import GearmanConnection
from gearman.connection_manager import GearmanConnectionManager, NoopEncoder
from gearman.constants import PRIORITY_NONE, PRIORITY_HIGH, PRIORITY_LOW, DEFAULT_GEARMAN_PORT, JOB_UNKNOWN, JOB_CREATED
from gearman.errors import ConnectionError
from gearman.job import GearmanJob, GearmanJobRequest
from gearman.protocol import get_command_name
class MockGearmanConnection(GearmanConnection):
def __init__(self, host=None, port=DEFAULT_GEARMAN_PORT):
host = host or '__testing_host__'
super(MockGearmanConnection, self).__init__(host=host, port=port)
self._fail_on_bind = False
self._fail_on_read = False
self._fail_on_write = False
def _create_client_socket(self):
if self._fail_on_bind:
self.throw_exception(message='mock bind failure')
def read_data_from_socket(self):
if self._fail_on_read:
self.throw_exception(message='mock read failure')
def send_data_to_socket(self):
if self._fail_on_write:
self.throw_exception(message='mock write failure')
def fileno(self):
# 73 is the best number, so why not?
return 73
def __repr__(self):
return ('<GearmanConnection %s:%d connected=%s> (%s)' %
(self.gearman_host, self.gearman_port, self.connected, id(self)))
class MockGearmanConnectionManager(GearmanConnectionManager):
"""Handy mock client base to test Worker/Client/Abstract ClientBases"""
def poll_connections_once(self, poller, connection_map, timeout=None):
return set(), set(), set()
def _register_connections_with_poller(self, connections, poller):
pass
class _GearmanAbstractTest(unittest.TestCase):
connection_class = MockGearmanConnection
connection_manager_class = MockGearmanConnectionManager
command_handler_class = None
job_class = GearmanJob
job_request_class = GearmanJobRequest
def setUp(self):
# Create a new MockGearmanTestClient on the fly
self.setup_connection_manager()
self.setup_connection()
self.setup_command_handler()
def setup_connection_manager(self):
testing_attributes = {'command_handler_class': self.command_handler_class, 'connection_class': self.connection_class}
testing_client_class = type('MockGearmanTestingClient', (self.connection_manager_class, ), testing_attributes)
self.connection_manager = testing_client_class()
def setup_connection(self):
self.connection = self.connection_class()
self.connection_manager.connection_list = [self.connection]
def setup_command_handler(self):
self.connection_manager.establish_connection(self.connection)
self.command_handler = self.connection_manager.connection_to_handler_map[self.connection]
def generate_job(self):
return self.job_class(self.connection, handle=str(random.random()), task='__test_ability__', unique=str(random.random()), data=str(random.random()), when_to_run=None)
def generate_job_dict(self):
current_job = self.generate_job()
return current_job.to_dict()
def generate_job_request(self, priority=PRIORITY_NONE, when_to_run=None, background=False):
job_handle = str(random.random())
current_job = self.job_class(connection=self.connection, handle=job_handle, task='__test_ability__', unique=str(random.random()), data=str(random.random()), when_to_run=when_to_run)
current_request = self.job_request_class(current_job, initial_priority=priority, background=background)
self.assertEqual(current_request.state, JOB_UNKNOWN)
return current_request
def assert_jobs_equal(self, job_actual, job_expected):
# Validates that GearmanJobs are essentially equal
self.assertEqual(job_actual.handle, job_expected.handle)
self.assertEqual(job_actual.task, job_expected.task)
self.assertEqual(job_actual.unique, job_expected.unique)
self.assertEqual(job_actual.data, job_expected.data)
def assert_sent_command(self, expected_cmd_type, **expected_cmd_args):
# Make sure any commands we're passing through the CommandHandler gets properly passed through to the client base
client_cmd_type, client_cmd_args = self.connection._outgoing_commands.popleft()
self.assert_commands_equal(client_cmd_type, expected_cmd_type)
self.assertEqual(client_cmd_args, expected_cmd_args)
def assert_no_pending_commands(self):
self.assertEqual(self.connection._outgoing_commands, collections.deque())
def assert_commands_equal(self, cmd_type_actual, cmd_type_expected):
self.assertEqual(get_command_name(cmd_type_actual), get_command_name(cmd_type_expected))