Skip to content

Commit e5c1ef8

Browse files
authored
DRIVERS-2949 Test happy eyeballs behavior (#1233)
1 parent 288d796 commit e5c1ef8

File tree

9 files changed

+344
-12
lines changed

9 files changed

+344
-12
lines changed

.config/nextest.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
[profile.default]
22
test-threads = 1
3+
default-filter = 'not test(test::happy_eyeballs)'
34

45
[profile.ci]
56
failure-output = "final"
67
test-threads = 1
78
fail-fast = false
9+
default-filter = 'not test(test::happy_eyeballs)'
810

911
[profile.ci.junit]
1012
path = "junit.xml"

.evergreen/config.yml

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,20 @@ buildvariants:
380380
tasks:
381381
- name: test-aws-lambda-task-group
382382

383+
- name: happy-eyeballs-macos
384+
display_name: "Happy Eyeballs (MacOS)"
385+
run_on:
386+
- macos-1100
387+
tasks:
388+
- happy-eyeballs-task-group
389+
390+
- name: happy-eyeballs-windows
391+
display_name: "Happy Eyeballs (Windows)"
392+
run_on:
393+
- windows-64-vs2017-small
394+
tasks:
395+
- happy-eyeballs-task-group
396+
383397
###############
384398
# Task Groups #
385399
###############
@@ -711,6 +725,26 @@ task_groups:
711725
tasks:
712726
- oidc-auth-test-gcp-latest
713727

728+
- name: happy-eyeballs-task-group
729+
setup_group_can_fail_task: true
730+
setup_group_timeout_secs: 1800
731+
setup_group:
732+
- func: "fetch source"
733+
- func: "create expansions"
734+
- func: "prepare resources"
735+
- func: "windows fix"
736+
- func: "fix absolute paths"
737+
- func: "init test-results"
738+
- func: "make files executable"
739+
- func: "install rust"
740+
- func: "install junit dependencies"
741+
- func: "start happy eyeballs server"
742+
tasks:
743+
- test-happy-eyeballs
744+
teardown_task:
745+
- func: "stop happy eyeballs server"
746+
- func: "upload test results"
747+
714748
#########
715749
# Tasks #
716750
#########
@@ -1184,6 +1218,18 @@ tasks:
11841218
export GCPOIDC_TEST_CMD="ls -la && PROJECT_DIRECTORY='.' OIDC_ENV=gcp OIDC=oidc TEST_FILE=./$TEST_FILE ./.evergreen/run-mongodb-oidc-test.sh"
11851219
bash $DRIVERS_TOOLS/.evergreen/auth_oidc/gcp/run-driver-test.sh
11861220
1221+
- name: "test-happy-eyeballs"
1222+
commands:
1223+
- command: subprocess.exec
1224+
type: test
1225+
params:
1226+
working_dir: src
1227+
binary: bash
1228+
args:
1229+
- .evergreen/run-happy-eyeballs-tests.sh
1230+
include_expansions_in_env:
1231+
- PROJECT_DIRECTORY
1232+
11871233
#############
11881234
# Functions #
11891235
#############
@@ -1378,7 +1424,17 @@ functions:
13781424
binary: bash
13791425
args:
13801426
- .evergreen/fetch-drivers-tools.sh
1381-
1427+
- command: subprocess.exec
1428+
params:
1429+
working_dir: src
1430+
include_expansions_in_env:
1431+
- DRIVERS_TOOLS
1432+
binary: bash
1433+
args:
1434+
- .evergreen/find-python3.sh
1435+
- command: expansions.update
1436+
params:
1437+
file: src/python3.yml
13821438
- command: shell.exec
13831439
params:
13841440
script: |
@@ -1673,6 +1729,31 @@ functions:
16731729
export OIDC="oidc"
16741730
.evergreen/run-mongodb-oidc-test.sh
16751731
1732+
"start happy eyeballs server":
1733+
- command: subprocess.exec
1734+
params:
1735+
working_dir: src
1736+
background: true
1737+
binary: ${PYTHON3}
1738+
args:
1739+
- .evergreen/happy-eyeballs-server.py
1740+
- command: subprocess.exec
1741+
params:
1742+
working_dir: src
1743+
binary: ${PYTHON3}
1744+
args:
1745+
- .evergreen/happy-eyeballs-server.py
1746+
- --wait
1747+
1748+
"stop happy eyeballs server":
1749+
- command: subprocess.exec
1750+
params:
1751+
working_dir: src
1752+
binary: ${PYTHON3}
1753+
args:
1754+
- .evergreen/happy-eyeballs-server.py
1755+
- --stop
1756+
16761757
"compile only":
16771758
- command: shell.exec
16781759
type: test

.evergreen/find-python3.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
source ${DRIVERS_TOOLS}/.evergreen/find-python3.sh
2+
PYTHON3=$(find_python3)
3+
4+
cat <<EOT >python3.yml
5+
PYTHON3: "${PYTHON3}"
6+
EOT
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import argparse
2+
import asyncio
3+
import socket
4+
5+
parser = argparse.ArgumentParser(
6+
prog='happy-eyeballs-client',
7+
description='client for testing the happy eyeballs test server',
8+
)
9+
parser.add_argument('-c', '--control', default=10036, type=int, metavar='PORT', help='control port')
10+
parser.add_argument('-d', '--delay', default=4, type=int)
11+
args = parser.parse_args()
12+
13+
async def main():
14+
print('connecting to control')
15+
control_r, control_w = await asyncio.open_connection('localhost', args.control)
16+
control_w.write(args.delay.to_bytes(1, 'big'))
17+
await control_w.drain()
18+
data = await control_r.read(1)
19+
if data != b'\x01':
20+
raise Exception(f'Expected byte 1, got {data}')
21+
ipv4_port = int.from_bytes(await control_r.read(2), 'big')
22+
ipv6_port = int.from_bytes(await control_r.read(2), 'big')
23+
connect_tasks = [
24+
asyncio.create_task(connect('IPv4', ipv4_port, socket.AF_INET, b'\x04')),
25+
asyncio.create_task(connect('IPv6', ipv6_port, socket.AF_INET6, b'\x06')),
26+
]
27+
await asyncio.wait(connect_tasks)
28+
29+
async def connect(name: str, port: int, family: socket.AddressFamily, payload: bytes):
30+
print(f'{name}: connecting')
31+
try:
32+
reader, writer = await asyncio.open_connection('localhost', port, family=family)
33+
except Exception as e:
34+
print(f'{name}: failed ({e})')
35+
return
36+
print(f'{name}: connected')
37+
data = await reader.readexactly(1)
38+
if data != payload:
39+
raise Exception(f'Expected {payload}, got {data}')
40+
writer.close()
41+
await writer.wait_closed()
42+
print(f'{name}: done')
43+
44+
asyncio.run(main())
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import argparse
2+
import asyncio
3+
import socket
4+
import sys
5+
6+
parser = argparse.ArgumentParser(
7+
prog='happy-eyeballs-server',
8+
description='Fake server for testing happy eyeballs',
9+
)
10+
parser.add_argument('-c', '--control', default=10036, type=int, metavar='PORT', help='control port')
11+
parser.add_argument('--stop', action='store_true', help='stop a currently-running server')
12+
parser.add_argument('--wait', action='store_true', help='wait for a server to be ready')
13+
args = parser.parse_args()
14+
15+
PREFIX='happy eyeballs server'
16+
17+
async def control_server():
18+
shutdown = asyncio.Event()
19+
srv = await asyncio.start_server(lambda reader, writer: on_control_connected(reader, writer, shutdown), 'localhost', args.control)
20+
print(f'{PREFIX}: listening for control connections on {args.control}', file=sys.stderr)
21+
async with srv:
22+
await shutdown.wait()
23+
print(f'{PREFIX}: all done', file=sys.stderr)
24+
25+
async def on_control_connected(reader: asyncio.StreamReader, writer: asyncio.StreamWriter, shutdown: asyncio.Event):
26+
# Read the control request byte
27+
data = await reader.readexactly(1)
28+
if data == b'\x04':
29+
print(f'{PREFIX}: ========================', file=sys.stderr)
30+
print(f'{PREFIX}: request for delayed IPv4', file=sys.stderr)
31+
slow = 'IPv4'
32+
elif data == b'\x06':
33+
print(f'{PREFIX}: ========================', file=sys.stderr)
34+
print(f'{PREFIX}: request for delayed IPv6', file=sys.stderr)
35+
slow = 'IPv6'
36+
elif data == b'\xF0':
37+
writer.write(b'\x01')
38+
await writer.drain()
39+
writer.close()
40+
await writer.wait_closed()
41+
return
42+
elif data == b'\xFF':
43+
print(f'{PREFIX}: shutting down', file=sys.stderr)
44+
writer.close()
45+
await writer.wait_closed()
46+
shutdown.set()
47+
return
48+
else:
49+
print(f'Unexpected control byte: {data}', file=sys.stderr)
50+
exit(1)
51+
52+
# Bind the test ports but do not yet start accepting connections
53+
connected = asyncio.Event()
54+
on_ipv4_connected = lambda reader, writer: on_test_connected('IPv4', writer, b'\x04', connected, slow)
55+
on_ipv6_connected = lambda reader, writer: on_test_connected('IPv6', writer, b'\x06', connected, slow)
56+
# port 0: pick random unused port
57+
srv4 = await asyncio.start_server(on_ipv4_connected, 'localhost', 0, family=socket.AF_INET, start_serving=False)
58+
srv6 = await asyncio.start_server(on_ipv6_connected, 'localhost', 0, family=socket.AF_INET6, start_serving=False)
59+
ipv4_port = srv4.sockets[0].getsockname()[1]
60+
ipv6_port = srv6.sockets[0].getsockname()[1]
61+
print(f'{PREFIX}: [slow {slow}] open for IPv4 on {ipv4_port}', file=sys.stderr)
62+
print(f'{PREFIX}: [slow {slow}] open for IPv6 on {ipv6_port}', file=sys.stderr)
63+
64+
# Reply to control request with success byte and test server ports
65+
writer.write(b'\x01')
66+
writer.write(ipv4_port.to_bytes(2, 'big'))
67+
writer.write(ipv6_port.to_bytes(2, 'big'))
68+
await writer.drain()
69+
writer.close()
70+
await writer.wait_closed()
71+
72+
# Start test servers listening in parallel
73+
# Hold a reference to the tasks so they aren't GC'd
74+
test_tasks = [
75+
asyncio.create_task(test_listen('IPv4', srv4, data == b'\x04', connected, slow)),
76+
asyncio.create_task(test_listen('IPv6', srv6, data == b'\x06', connected, slow)),
77+
]
78+
await asyncio.wait(test_tasks)
79+
80+
# Wait for the test servers to shut down
81+
srv4.close()
82+
srv6.close()
83+
close_tasks = [
84+
asyncio.create_task(srv4.wait_closed()),
85+
asyncio.create_task(srv6.wait_closed()),
86+
]
87+
await asyncio.wait(close_tasks)
88+
89+
print(f'{PREFIX}: [slow {slow}] connection complete, test ports closed', file=sys.stderr)
90+
print(f'{PREFIX}: ========================', file=sys.stderr)
91+
92+
async def test_listen(name: str, srv, delay: bool, connected: asyncio.Event, slow: str):
93+
# Both connections are delayed; the slow one is delayed by more than the fast one; this
94+
# ensures that the client is comparing timing and not simply choosing an immediate success
95+
# over a connection denied.
96+
if delay:
97+
print(f'{PREFIX}: [slow {slow}] delaying {name} connections', file=sys.stderr)
98+
await asyncio.sleep(2.0)
99+
else:
100+
await asyncio.sleep(1.0)
101+
async with srv:
102+
await srv.start_serving()
103+
print(f'{PREFIX}: [slow {slow}] accepting {name} connections', file=sys.stderr)
104+
# Terminate this test server when either test server has handled a request
105+
await connected.wait()
106+
107+
async def on_test_connected(name: str, writer: asyncio.StreamWriter, payload: bytes, connected: asyncio.Event, slow: str):
108+
print(f'{PREFIX}: [slow {slow}] connected on {name}', file=sys.stderr)
109+
writer.write(payload)
110+
await writer.drain()
111+
writer.close()
112+
await writer.wait_closed()
113+
connected.set()
114+
115+
async def stop_server():
116+
control_r, control_w = await asyncio.open_connection('localhost', args.control)
117+
control_w.write(b'\xFF')
118+
await control_w.drain()
119+
control_w.close()
120+
await control_w.wait_closed()
121+
122+
async def wait_for_server():
123+
while True:
124+
try:
125+
control_r, control_w = await asyncio.open_connection('localhost', args.control)
126+
except OSError as e:
127+
print(f'{PREFIX}: failed ({e}), will retry', file=sys.stderr)
128+
await asyncio.sleep(1)
129+
continue
130+
break
131+
control_w.write(b'\xF0')
132+
await control_w.drain()
133+
data = await control_r.read(1)
134+
if data != b'\x01':
135+
print(f'{PREFIX}: expected byte 1, got {data}', file=sys.stderr)
136+
exit(1)
137+
print(f'{PREFIX}: happy eyeballs server ready on port {args.control}', file=sys.stderr)
138+
139+
140+
if args.stop:
141+
asyncio.run(stop_server())
142+
elif args.wait:
143+
asyncio.run(wait_for_server())
144+
else:
145+
asyncio.run(control_server())
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/bin/bash
2+
3+
set -o errexit
4+
set -o pipefail
5+
6+
source .evergreen/env.sh
7+
source .evergreen/cargo-test.sh
8+
9+
CARGO_OPTIONS+=("--ignore-default-filter")
10+
11+
set +o errexit
12+
13+
cargo_test "test::happy_eyeballs"
14+
exit $CARGO_RESULT

src/runtime/stream.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,14 @@ impl AsyncStream {
4747
) -> Result<Self> {
4848
match &address {
4949
ServerAddress::Tcp { host, .. } => {
50-
let inner = tcp_connect(&address).await?;
50+
let resolved: Vec<_> = runtime::resolve_address(&address).await?.collect();
51+
if resolved.is_empty() {
52+
return Err(ErrorKind::DnsResolve {
53+
message: format!("No DNS results for domain {}", address),
54+
}
55+
.into());
56+
}
57+
let inner = tcp_connect(resolved).await?;
5158

5259
// If there are TLS options, wrap the inner stream in an AsyncTlsStream.
5360
match tls_cfg {
@@ -77,21 +84,14 @@ async fn tcp_try_connect(address: &SocketAddr) -> Result<TcpStream> {
7784
Ok(TcpStream::from_std(std_stream)?)
7885
}
7986

80-
async fn tcp_connect(address: &ServerAddress) -> Result<TcpStream> {
87+
pub(crate) async fn tcp_connect(resolved: Vec<SocketAddr>) -> Result<TcpStream> {
8188
// "Happy Eyeballs": try addresses in parallel, interleaving IPv6 and IPv4, preferring IPv6.
8289
// Based on the implementation in https://codeberg.org/KMK/happy-eyeballs.
83-
let (addrs_v6, addrs_v4): (Vec<_>, Vec<_>) = runtime::resolve_address(address)
84-
.await?
90+
let (addrs_v6, addrs_v4): (Vec<_>, Vec<_>) = resolved
91+
.into_iter()
8592
.partition(|a| matches!(a, SocketAddr::V6(_)));
8693
let socket_addrs = interleave(addrs_v6, addrs_v4);
8794

88-
if socket_addrs.is_empty() {
89-
return Err(ErrorKind::DnsResolve {
90-
message: format!("No DNS results for domain {}", address),
91-
}
92-
.into());
93-
}
94-
9595
fn handle_join(
9696
result: std::result::Result<Result<TcpStream>, tokio::task::JoinError>,
9797
) -> Result<TcpStream> {

src/test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub(crate) mod csfle;
2121
mod cursor;
2222
mod db;
2323
mod documentation_examples;
24+
mod happy_eyeballs;
2425
mod index_management;
2526
mod lambda_examples;
2627
pub(crate) mod spec;

0 commit comments

Comments
 (0)