|
11 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | | - |
15 | | -import unittest |
16 | | - |
17 | | -from urllib.request import build_opener, Request, HTTPHandler |
18 | | -import re |
19 | | -import os |
20 | | -import time |
21 | | -from os.path import expanduser |
22 | | - |
23 | | -from ccmlib import common |
24 | | - |
25 | | -from tests.integration import get_server_versions, BasicKeyspaceUnitTestCase, \ |
26 | | - drop_keyspace_shutdown_cluster, get_node, USE_CASS_EXTERNAL, TestCluster |
27 | | -from tests.integration import use_singledc, use_single_node, wait_for_node_socket, CASSANDRA_IP |
28 | | - |
29 | | -home = expanduser('~') |
30 | | - |
31 | | -# Home directory of the Embedded Apache Directory Server to use |
32 | | -ADS_HOME = os.getenv('ADS_HOME', home) |
33 | | - |
34 | | - |
35 | | -def find_spark_master(session): |
36 | | - |
37 | | - # Iterate over the nodes the one with port 7080 open is the spark master |
38 | | - for host in session.hosts: |
39 | | - ip = host.address |
40 | | - port = 7077 |
41 | | - spark_master = (ip, port) |
42 | | - if common.check_socket_listening(spark_master, timeout=3): |
43 | | - return spark_master[0] |
44 | | - return None |
45 | | - |
46 | | - |
47 | | -def wait_for_spark_workers(num_of_expected_workers, timeout): |
48 | | - """ |
49 | | - This queries the spark master and checks for the expected number of workers |
50 | | - """ |
51 | | - start_time = time.time() |
52 | | - while True: |
53 | | - opener = build_opener(HTTPHandler) |
54 | | - request = Request("http://{0}:7080".format(CASSANDRA_IP)) |
55 | | - request.get_method = lambda: 'GET' |
56 | | - connection = opener.open(request) |
57 | | - match = re.search('Alive Workers:.*(\d+)</li>', connection.read().decode('utf-8')) |
58 | | - num_workers = int(match.group(1)) |
59 | | - if num_workers == num_of_expected_workers: |
60 | | - match = True |
61 | | - break |
62 | | - elif time.time() - start_time > timeout: |
63 | | - match = True |
64 | | - break |
65 | | - time.sleep(1) |
66 | | - return match |
67 | | - |
68 | | - |
69 | | -def use_single_node_with_graph(start=True, options={}, dse_options={}): |
70 | | - use_single_node(start=start, workloads=['graph'], configuration_options=options, dse_options=dse_options) |
71 | | - |
72 | | - |
73 | | -def use_single_node_with_graph_and_spark(start=True, options={}): |
74 | | - use_single_node(start=start, workloads=['graph', 'spark'], configuration_options=options) |
75 | | - |
76 | | - |
77 | | -def use_single_node_with_graph_and_solr(start=True, options={}): |
78 | | - use_single_node(start=start, workloads=['graph', 'solr'], configuration_options=options) |
79 | | - |
80 | | - |
81 | | -def use_singledc_wth_graph(start=True): |
82 | | - use_singledc(start=start, workloads=['graph']) |
83 | | - |
84 | | - |
85 | | -def use_singledc_wth_graph_and_spark(start=True): |
86 | | - use_cluster_with_graph(3) |
87 | | - |
88 | | - |
89 | | -def use_cluster_with_graph(num_nodes): |
90 | | - """ |
91 | | - This is a work around to account for the fact that spark nodes will conflict over master assignment |
92 | | - when started all at once. |
93 | | - """ |
94 | | - if USE_CASS_EXTERNAL: |
95 | | - return |
96 | | - |
97 | | - # Create the cluster but don't start it. |
98 | | - use_singledc(start=False, workloads=['graph', 'spark']) |
99 | | - # Start first node. |
100 | | - get_node(1).start(wait_for_binary_proto=True) |
101 | | - # Wait binary protocol port to open |
102 | | - wait_for_node_socket(get_node(1), 120) |
103 | | - # Wait for spark master to start up |
104 | | - spark_master_http = ("localhost", 7080) |
105 | | - common.check_socket_listening(spark_master_http, timeout=60) |
106 | | - tmp_cluster = TestCluster() |
107 | | - |
108 | | - # Start up remaining nodes. |
109 | | - try: |
110 | | - session = tmp_cluster.connect() |
111 | | - statement = "ALTER KEYSPACE dse_leases WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'dc1': '%d'}" % (num_nodes) |
112 | | - session.execute(statement) |
113 | | - finally: |
114 | | - tmp_cluster.shutdown() |
115 | | - |
116 | | - for i in range(1, num_nodes+1): |
117 | | - if i is not 1: |
118 | | - node = get_node(i) |
119 | | - node.start(wait_for_binary_proto=True) |
120 | | - wait_for_node_socket(node, 120) |
121 | | - |
122 | | - # Wait for workers to show up as Alive on master |
123 | | - wait_for_spark_workers(3, 120) |
124 | | - |
125 | | - |
126 | | -class BasicGeometricUnitTestCase(BasicKeyspaceUnitTestCase): |
127 | | - """ |
128 | | - This base test class is used by all the geomteric tests. It contains class level teardown and setup |
129 | | - methods. It also contains the test fixtures used by those tests |
130 | | - """ |
131 | | - |
132 | | - @classmethod |
133 | | - def common_dse_setup(cls, rf, keyspace_creation=True): |
134 | | - cls.cluster = TestCluster() |
135 | | - cls.session = cls.cluster.connect() |
136 | | - cls.ks_name = cls.__name__.lower() |
137 | | - if keyspace_creation: |
138 | | - cls.create_keyspace(rf) |
139 | | - cls.cass_version, cls.cql_version = get_server_versions() |
140 | | - cls.session.set_keyspace(cls.ks_name) |
141 | | - |
142 | | - @classmethod |
143 | | - def setUpClass(cls): |
144 | | - cls.common_dse_setup(1) |
145 | | - cls.initalizeTables() |
146 | | - |
147 | | - @classmethod |
148 | | - def tearDownClass(cls): |
149 | | - drop_keyspace_shutdown_cluster(cls.ks_name, cls.session, cls.cluster) |
150 | | - |
151 | | - @classmethod |
152 | | - def initalizeTables(cls): |
153 | | - udt_type = "CREATE TYPE udt1 (g {0})".format(cls.cql_type_name) |
154 | | - large_table = "CREATE TABLE tbl (k uuid PRIMARY KEY, g {0}, l list<{0}>, s set<{0}>, m0 map<{0},int>, m1 map<int,{0}>, t tuple<{0},{0},{0}>, u frozen<udt1>)".format( |
155 | | - cls.cql_type_name) |
156 | | - simple_table = "CREATE TABLE tblpk (k {0} primary key, v int)".format(cls.cql_type_name) |
157 | | - cluster_table = "CREATE TABLE tblclustering (k0 int, k1 {0}, v int, primary key (k0, k1))".format( |
158 | | - cls.cql_type_name) |
159 | | - cls.session.execute(udt_type) |
160 | | - cls.session.execute(large_table) |
161 | | - cls.session.execute(simple_table) |
162 | | - cls.session.execute(cluster_table) |
0 commit comments