Skip to content

Commit fb7df2c

Browse files
committed
migrated load balancer tests and topology to include the same
1 parent 137c880 commit fb7df2c

File tree

2 files changed

+223
-0
lines changed

2 files changed

+223
-0
lines changed

jenkins/pipelines/QE/sgw/topology.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,10 @@
1717
"cluster": 0
1818
}
1919
],
20+
"load_balancers": [
21+
{
22+
"sync_gateways": [0, 1, 2]
23+
}
24+
],
2025
"logslurp": true
2126
}

tests/QE/test_load_balancer.py

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
import asyncio
2+
from pathlib import Path
3+
4+
import pytest
5+
from cbltest import CBLPyTest
6+
from cbltest.api.cbltestclass import CBLTestClass
7+
from cbltest.api.syncgateway import (
8+
DocumentUpdateEntry,
9+
PutDatabasePayload,
10+
SyncGatewayUserClient,
11+
)
12+
13+
14+
@pytest.mark.sgw
15+
@pytest.mark.min_sync_gateways(3)
16+
@pytest.mark.min_couchbase_servers(1)
17+
@pytest.mark.min_load_balancers(1)
18+
class TestLoadBalancer(CBLTestClass):
19+
@pytest.mark.asyncio(loop_scope="session")
20+
async def test_load_balance_sanity(
21+
self, cblpytest: CBLPyTest, dataset_path: Path
22+
) -> None:
23+
sgs = cblpytest.sync_gateways[0:3]
24+
cbs = cblpytest.couchbase_servers[0]
25+
lb_url = cblpytest.load_balancers[0]
26+
sg_db = "db"
27+
bucket_name = "data-bucket"
28+
num_docs = 100
29+
channels = ["ABC", "CBS"]
30+
username = "vipul"
31+
password = "pass"
32+
33+
self.mark_test_step("Create shared bucket for all SGW nodes")
34+
cbs.drop_bucket(bucket_name)
35+
cbs.create_bucket(bucket_name)
36+
37+
self.mark_test_step("Configure database on all SGW nodes")
38+
db_config = {
39+
"bucket": bucket_name,
40+
"num_index_replicas": 0,
41+
"scopes": {"_default": {"collections": {"_default": {}}}},
42+
}
43+
db_payload = PutDatabasePayload(db_config)
44+
for sg in sgs:
45+
db_status = await sg.get_database_status(sg_db)
46+
if db_status is not None:
47+
await sg.delete_database(sg_db)
48+
await sg.put_database(sg_db, db_payload)
49+
50+
self.mark_test_step(
51+
f"Create user '{username}' with access to channels {channels}"
52+
)
53+
await sgs[0].create_user_client(sg_db, username, password, channels)
54+
55+
self.mark_test_step(f"Create user client via load balancer ({lb_url})")
56+
lb_user = SyncGatewayUserClient(
57+
lb_url, username, password, port=4984, secure=False
58+
)
59+
60+
self.mark_test_step(f"Add {num_docs} documents via load balancer")
61+
docs = [
62+
DocumentUpdateEntry(
63+
id=f"lb_doc_{i}",
64+
revid=None,
65+
body={"type": "lb_test", "index": i, "channels": channels},
66+
)
67+
for i in range(num_docs)
68+
]
69+
await lb_user.update_documents(sg_db, docs)
70+
all_doc_ids = [d.id for d in docs]
71+
await asyncio.sleep(5)
72+
73+
self.mark_test_step("Verify all documents are visible via load balancer")
74+
lb_docs = await lb_user.get_all_documents(sg_db)
75+
lb_doc_ids = {row.id for row in lb_docs.rows}
76+
missing = set(all_doc_ids) - lb_doc_ids
77+
assert len(missing) == 0, (
78+
f"LB missing {len(missing)} docs: {list(missing)[:5]}..."
79+
)
80+
assert len(lb_doc_ids) == num_docs, (
81+
f"LB sees {len(lb_doc_ids)} docs, expected {num_docs}"
82+
)
83+
84+
self.mark_test_step(
85+
"Verify documents are visible from each SG node (admin API)"
86+
)
87+
for i, sg in enumerate(sgs):
88+
sg_docs = await sg.get_all_documents(sg_db)
89+
sg_doc_ids = {row.id for row in sg_docs.rows}
90+
missing = set(all_doc_ids) - sg_doc_ids
91+
assert len(missing) == 0, (
92+
f"SG{i} missing {len(missing)} docs: {list(missing)[:5]}..."
93+
)
94+
95+
self.mark_test_step("Verify document count consistency across all nodes")
96+
doc_counts = []
97+
for sg in sgs:
98+
sg_docs = await sg.get_all_documents(sg_db)
99+
doc_counts.append(len(sg_docs.rows))
100+
assert all(c == doc_counts[0] for c in doc_counts), (
101+
f"Document counts differ across SG nodes: {doc_counts}"
102+
)
103+
104+
await lb_user.close()
105+
for sg in sgs:
106+
await sg.delete_database(sg_db)
107+
cbs.drop_bucket(bucket_name)
108+
109+
@pytest.mark.asyncio(loop_scope="session")
110+
async def test_sgw_down_with_load_balancer(
111+
self, cblpytest: CBLPyTest, dataset_path: Path
112+
) -> None:
113+
sgs = cblpytest.sync_gateways[0:3]
114+
cbs = cblpytest.couchbase_servers[0]
115+
sg1, sg2, sg3 = sgs[0], sgs[1], sgs[2]
116+
sg_db = "db"
117+
bucket_name = "data-bucket"
118+
num_docs = 100
119+
120+
self.mark_test_step("Create shared bucket for all SGW nodes")
121+
cbs.drop_bucket(bucket_name)
122+
cbs.create_bucket(bucket_name)
123+
124+
self.mark_test_step("Configure database on all SGW nodes")
125+
db_config = {
126+
"bucket": bucket_name,
127+
"num_index_replicas": 0,
128+
"scopes": {"_default": {"collections": {"_default": {}}}},
129+
}
130+
db_payload = PutDatabasePayload(db_config)
131+
for sg in sgs:
132+
db_status = await sg.get_database_status(sg_db)
133+
if db_status is not None:
134+
await sg.delete_database(sg_db)
135+
await sg.put_database(sg_db, db_payload)
136+
await asyncio.sleep(3)
137+
138+
self.mark_test_step("Start concurrent SDK writes in background")
139+
140+
async def write_docs_via_sdk() -> None:
141+
for i in range(num_docs):
142+
doc_id = f"sdk_doc_{i}"
143+
doc_body = {
144+
"type": "sdk_doc",
145+
"index": i,
146+
"content": f"Document {i} written via SDK",
147+
}
148+
cbs.upsert_document(
149+
bucket_name, doc_id, doc_body, "_default", "_default"
150+
)
151+
152+
write_task = asyncio.create_task(write_docs_via_sdk())
153+
154+
self.mark_test_step("Wait for some documents to be written")
155+
await asyncio.sleep(2)
156+
157+
self.mark_test_step("Take SG2 offline by deleting its database")
158+
await sg2.delete_database(sg_db)
159+
160+
self.mark_test_step("Verify SG2 database is offline")
161+
sg2_status = await sg2.get_database_status(sg_db)
162+
assert sg2_status is None, f"SG2 database should be offline, got: {sg2_status}"
163+
164+
self.mark_test_step("Wait for SDK writes to complete")
165+
await write_task
166+
167+
self.mark_test_step("Verify documents are visible on SG1 and SG3 (with retry)")
168+
max_retries = 30
169+
retry_delay = 2
170+
sg1_doc_ids = set()
171+
sg3_doc_ids = set()
172+
for _ in range(max_retries):
173+
sg1_docs = await sg1.get_all_documents(sg_db)
174+
sg3_docs = await sg3.get_all_documents(sg_db)
175+
sg1_doc_ids = {row.id for row in sg1_docs.rows}
176+
sg3_doc_ids = {row.id for row in sg3_docs.rows}
177+
if len(sg1_doc_ids) >= num_docs and len(sg3_doc_ids) >= num_docs:
178+
break
179+
await asyncio.sleep(retry_delay)
180+
assert len(sg1_doc_ids) >= num_docs, (
181+
f"SG1 has {len(sg1_doc_ids)} docs, expected {num_docs}"
182+
)
183+
assert len(sg3_doc_ids) >= num_docs, (
184+
f"SG3 has {len(sg3_doc_ids)} docs, expected {num_docs}"
185+
)
186+
187+
self.mark_test_step("Bring SG2 back online")
188+
sg2_status_before = await sg2.get_database_status(sg_db)
189+
if sg2_status_before is not None:
190+
await sg2.delete_database(sg_db)
191+
await asyncio.sleep(2)
192+
await sg2.put_database(sg_db, db_payload)
193+
194+
self.mark_test_step("Verify SG2 catches up with all documents")
195+
sg2_doc_ids = set()
196+
for _ in range(max_retries):
197+
sg2_docs = await sg2.get_all_documents(sg_db)
198+
sg2_doc_ids = {row.id for row in sg2_docs.rows}
199+
if len(sg2_doc_ids) >= num_docs:
200+
break
201+
await asyncio.sleep(retry_delay)
202+
assert len(sg2_doc_ids) >= num_docs, (
203+
f"SG2 did not catch up. Has {len(sg2_doc_ids)} docs, expected {num_docs}"
204+
)
205+
206+
self.mark_test_step("Verify document consistency across all nodes")
207+
expected_ids = {f"sdk_doc_{i}" for i in range(num_docs)}
208+
for sg_name, sg_ids in [
209+
("SG1", sg1_doc_ids),
210+
("SG2", sg2_doc_ids),
211+
("SG3", sg3_doc_ids),
212+
]:
213+
missing = expected_ids - sg_ids
214+
assert len(missing) == 0, f"{sg_name} missing docs: {list(missing)[:5]}..."
215+
216+
for sg in sgs:
217+
await sg.delete_database(sg_db)
218+
cbs.drop_bucket(bucket_name)

0 commit comments

Comments
 (0)