Skip to content

Commit 0f8591d

Browse files
committed
tests: Bluetooth: Classic: Add test suites gap_discovery
Add test cases gap_central.test_01_gap_central_general_discovery and gap_central.test_02_gap_central_limited_discovery. Add test cases gap_peripheral.test_01_gap_peripheral_general_discovery and gap_peripheral.test_02_gap_peripheral_limited_discovery. Signed-off-by: Lyle Zhu <[email protected]>
1 parent d14a547 commit 0f8591d

File tree

13 files changed

+1812
-0
lines changed

13 files changed

+1812
-0
lines changed
Lines changed: 349 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,349 @@
1+
# Copyright 2025 NXP
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
import asyncio
6+
import logging
7+
import struct
8+
from collections.abc import Callable
9+
10+
import bt_sim_hci as sim_hci
11+
import bt_sim_ll as sim_ll
12+
from bumble import hci
13+
from bumble.controller import Connection, ScoLink
14+
from bumble.controller import Controller as controller
15+
from bumble.core import PhysicalTransport
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
class bt_classic_discovery:
21+
def __init__(
22+
self,
23+
iac_lap,
24+
count,
25+
interval=1.25,
26+
on_inquiry: Callable | None = None,
27+
on_complete: Callable | None = None,
28+
):
29+
self.flag = False
30+
self.count = count
31+
self.interval = interval
32+
self._task = None
33+
self.on_inquiry = on_inquiry
34+
self.on_complete = on_complete
35+
self._is_running = False
36+
self.iac_lap = iac_lap
37+
38+
def start(
39+
self,
40+
count,
41+
interval=1.25,
42+
on_inquiry: Callable | None = None,
43+
on_complete: Callable | None = None,
44+
):
45+
"""Start periodic timeout callback"""
46+
self.flag = True
47+
self._is_running = True
48+
self.count = count
49+
self.interval = interval
50+
if on_inquiry:
51+
self.on_complete = on_inquiry
52+
if on_complete:
53+
self.on_complete = on_complete
54+
if self._task:
55+
self._task.cancel()
56+
self._task = asyncio.create_task(self._periodic_callback(interval))
57+
58+
async def _periodic_callback(self, interval):
59+
"""Execute callback periodically"""
60+
try:
61+
while self._is_running:
62+
await asyncio.sleep(interval)
63+
if self._is_running and self.on_inquiry:
64+
if asyncio.iscoroutinefunction(self.on_inquiry):
65+
await self.on_inquiry()
66+
else:
67+
self.on_inquiry()
68+
if self.count > 0:
69+
self.count = self.count - 1
70+
if self.count == 0 and self.on_complete:
71+
if asyncio.iscoroutinefunction(self.on_complete):
72+
await self.on_complete()
73+
else:
74+
self.on_complete()
75+
self.stop()
76+
except asyncio.CancelledError:
77+
pass
78+
79+
def stop(self):
80+
"""Stop periodic execution"""
81+
self.flag = False
82+
self._is_running = False
83+
if self._task:
84+
self._task.cancel()
85+
self._task = None
86+
87+
88+
class bt_sim_controller(controller):
89+
# Enable BR/EDR feature
90+
lmp_features: bytes = bytes.fromhex(
91+
'0000000040000000'
92+
) # BR/EDR Supported, LE Supported (Controller)
93+
94+
inquiry_mode: int = 0
95+
page_timeout: int = 0x2000
96+
default_link_policy_settings: int = 0
97+
discovery: bt_classic_discovery = None
98+
iac_lap: list = [hci.HCI_GENERAL_INQUIRY_LAP]
99+
class_of_device: int = 0
100+
101+
def start_inquiry(self, lap) -> None:
102+
logger.debug("[%s] >>> Inquiry LAP %s", self.name, lap)
103+
if self.link:
104+
self.link.start_inquiry(self, lap)
105+
106+
def on_inquiry_complete(self) -> None:
107+
logger.debug("Inquiry complete")
108+
self.send_hci_packet(hci.HCI_Inquiry_Complete_Event(status=hci.HCI_SUCCESS))
109+
110+
def on_inquiry_timeout(self) -> None:
111+
logger.debug("Inquiry timeout")
112+
self.start_inquiry(self.discovery.iac_lap)
113+
114+
def on_hci_write_page_timeout_command(
115+
self, command: hci.HCI_Write_Page_Timeout_Command
116+
) -> bytes | None:
117+
'''
118+
See Bluetooth spec Vol 4, Part E - 7.3.18 Write Page Timeout Command
119+
'''
120+
self.page_timeout = command.page_timeout
121+
return bytes([hci.HCI_SUCCESS])
122+
123+
def on_hci_write_inquiry_mode_command(
124+
self, command: hci.HCI_Write_Inquiry_Mode_Command
125+
) -> bytes | None:
126+
'''
127+
See Bluetooth spec Vol 4, Part E - 7.3.18 Write Inquiry Mode Command
128+
'''
129+
self.inquiry_mode = command.inquiry_mode
130+
return bytes([hci.HCI_SUCCESS])
131+
132+
def on_hci_read_default_link_policy_settings_command(
133+
self, _command: sim_hci.HCI_Read_Default_Link_Policy_Settings_Command
134+
) -> bytes | None:
135+
'''
136+
See Bluetooth spec 7.2.11 Read Default Link Policy Settings command
137+
'''
138+
return struct.pack(
139+
'<BH',
140+
hci.HCI_SUCCESS,
141+
self.default_link_policy_settings,
142+
)
143+
144+
def on_inquiry_result_ind(self, packet: sim_ll.InquiryInd) -> None:
145+
self.send_hci_packet(
146+
hci.HCI_Inquiry_Result_Event(
147+
bd_addr=packet.bd_address,
148+
page_scan_repetition_mode=packet.Page_Scan_Repetition_Mode,
149+
reserved_0=packet.Reserved,
150+
reserved_1=packet.Reserved,
151+
class_of_device=packet.Class_Of_Device,
152+
clock_offset=packet.Clock_Offset,
153+
)
154+
)
155+
156+
def on_inquiry_result_with_rssi_ind(self, packet: sim_ll.InquiryRssiInd) -> None:
157+
self.send_hci_packet(
158+
hci.HCI_Inquiry_Result_With_RSSI_Event(
159+
bd_addr=packet.bd_address,
160+
page_scan_repetition_mode=packet.Page_Scan_Repetition_Mode,
161+
reserved=packet.Reserved,
162+
class_of_device=packet.Class_Of_Device,
163+
clock_offset=packet.Clock_Offset,
164+
rssi=packet.RSSI,
165+
)
166+
)
167+
168+
def on_extended_inquiry_result_ind(self, packet: sim_ll.ExtendedInquiryInd) -> None:
169+
self.send_hci_packet(
170+
hci.HCI_Extended_Inquiry_Result_Event(
171+
num_responses=packet.num_responses,
172+
bd_addr=packet.bd_address,
173+
page_scan_repetition_mode=packet.Page_Scan_Repetition_Mode,
174+
reserved=packet.Reserved,
175+
class_of_device=packet.Class_Of_Device,
176+
clock_offset=packet.Clock_Offset,
177+
rssi=packet.RSSI,
178+
extended_inquiry_response=packet.Extended_Inquiry_Response,
179+
)
180+
)
181+
182+
def on_inquiry_result_pdu(self, packet: sim_ll.InquiryPdu) -> None:
183+
logger.debug("[%s] <<< Inquiry PDU: %s", self.name, packet)
184+
if isinstance(packet, sim_ll.InquiryInd):
185+
self.on_inquiry_result_ind(packet)
186+
elif isinstance(packet, sim_ll.InquiryRssiInd):
187+
self.on_inquiry_result_with_rssi_ind(packet)
188+
elif isinstance(packet, sim_ll.ExtendedInquiryInd):
189+
self.on_extended_inquiry_result_ind(packet)
190+
else:
191+
logger.warning("[%s] Unknown inquiry PDU type: %s", self.name, type(packet))
192+
193+
def on_hci_inquiry_command(self, command: hci.HCI_Inquiry_Command) -> None:
194+
'''
195+
See Bluetooth spec 7.1.1 Inquiry command
196+
'''
197+
logger.debug(
198+
f'Inqury LAP {command.lap} length {command.inquiry_length} '
199+
f'num_responses {command.num_responses}'
200+
)
201+
202+
self.discovery = bt_classic_discovery(
203+
command.lap,
204+
command.inquiry_length,
205+
on_inquiry=self.on_inquiry_timeout,
206+
on_complete=self.on_inquiry_complete,
207+
)
208+
self.discovery.start(command.inquiry_length)
209+
210+
self.send_hci_packet(
211+
hci.HCI_Command_Status_Event(
212+
status=hci.HCI_SUCCESS,
213+
num_hci_command_packets=1,
214+
command_opcode=command.op_code,
215+
)
216+
)
217+
218+
self.start_inquiry(command.lap)
219+
220+
def on_hci_inquiry_cancel_command(
221+
self, _command: hci.HCI_Inquiry_Cancel_Command
222+
) -> bytes | None:
223+
'''
224+
See Bluetooth spec 7.1.2 Inquiry Cancel command
225+
'''
226+
if self.discovery is not None:
227+
self.discovery.stop()
228+
229+
return bytes([hci.HCI_SUCCESS])
230+
231+
def on_hci_write_current_iac_lap_command(
232+
self, command: sim_hci.HCI_Write_Current_Iac_Lap_Command
233+
) -> bytes | None:
234+
'''
235+
See Bluetooth spec 7.3.45 Write Current IAC LAP command
236+
'''
237+
self.iac_lap = []
238+
239+
for iac in command.iac_lap:
240+
self.iac_lap.append(iac)
241+
return bytes([hci.HCI_SUCCESS])
242+
243+
def on_hci_read_class_of_device_command(
244+
self, _command: hci.HCI_Read_Class_Of_Device_Command
245+
) -> bytes | None:
246+
'''
247+
See Bluetooth spec Vol 4, Part E - 7.3.25 Read Class of Device Command
248+
'''
249+
return struct.pack('<BI', hci.HCI_SUCCESS, self.class_of_device)[:4]
250+
251+
def on_hci_write_class_of_device_command(
252+
self, command: hci.HCI_Write_Class_Of_Device_Command
253+
) -> bytes | None:
254+
'''
255+
See Bluetooth spec Vol 4, Part E - 7.3.26 Write Class of Device Command
256+
'''
257+
self.class_of_device = command.class_of_device
258+
return bytes([hci.HCI_SUCCESS])
259+
260+
def on_classic_connection_request(self, peer_address: hci.Address, link_type: int) -> None:
261+
if link_type == hci.HCI_Connection_Complete_Event.LinkType.ACL:
262+
self.classic_connections[peer_address] = Connection(
263+
controller=self,
264+
handle=0,
265+
role=hci.Role.PERIPHERAL,
266+
peer_address=peer_address,
267+
link=self.link,
268+
transport=PhysicalTransport.BR_EDR,
269+
link_type=link_type,
270+
classic_allow_role_switch=self.classic_allow_role_switch,
271+
)
272+
else:
273+
self.sco_links[peer_address] = ScoLink(
274+
handle=0,
275+
link_type=link_type,
276+
peer_address=peer_address,
277+
)
278+
self.send_hci_packet(
279+
hci.HCI_Connection_Request_Event(
280+
bd_addr=peer_address,
281+
class_of_device=self.class_of_device,
282+
link_type=link_type,
283+
)
284+
)
285+
286+
def on_read_remote_supported_features_pdu(
287+
self, lmp_features: bytes, connection_handle: int
288+
) -> None:
289+
logger.debug("[%s] <<< Read Remote Supported Features PDU %s", self.name, lmp_features)
290+
self.send_hci_packet(
291+
hci.HCI_Read_Remote_Supported_Features_Complete_Event(
292+
status=hci.HCI_SUCCESS,
293+
connection_handle=connection_handle,
294+
lmp_features=lmp_features,
295+
)
296+
)
297+
298+
def read_remote_supported_features(
299+
self, peer_address: hci.Address, connection_handle: int
300+
) -> None:
301+
logger.debug("[%s] >>> Peer Address %s", self.name, peer_address)
302+
if self.link:
303+
self.link.read_remote_supported_features(self, peer_address, connection_handle)
304+
else:
305+
logger.warning("[%s] No link available", self.name)
306+
307+
def on_hci_read_remote_supported_features_command(
308+
self, command: hci.HCI_Read_Remote_Supported_Features_Command
309+
) -> None:
310+
'''
311+
See Bluetooth spec 7.1.1 Inquiry command
312+
'''
313+
logger.debug(f'Connect handle {command.connection_handle}')
314+
315+
peer_address = None
316+
317+
for address, connection in self.classic_connections.items():
318+
if connection.handle == command.connection_handle:
319+
peer_address = address
320+
break
321+
322+
if peer_address is None:
323+
status = hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR
324+
else:
325+
status = hci.HCI_SUCCESS
326+
327+
self.send_hci_packet(
328+
hci.HCI_Command_Status_Event(
329+
status=status,
330+
num_hci_command_packets=1,
331+
command_opcode=command.op_code,
332+
)
333+
)
334+
335+
if peer_address is not None:
336+
self.read_remote_supported_features(peer_address, command.connection_handle)
337+
else:
338+
logger.warning("[%s] Unknown connection handle", self.name)
339+
340+
def on_hci_host_number_of_completed_packets_command(
341+
self, command: sim_hci.HCI_Host_Number_Of_Completed_Packets_Command
342+
) -> None:
343+
'''
344+
See Bluetooth spec 7.1.1 Inquiry command
345+
'''
346+
logger.debug(
347+
f'Connect handle {command.connection_handle}'
348+
f'num_of_completed_packets {command.num_of_completed_packets}'
349+
)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Copyright 2025 NXP
2+
#
3+
# SPDX-License-Identifier: Apache-2.0
4+
5+
import dataclasses
6+
from collections.abc import Sequence
7+
from dataclasses import field
8+
9+
from bumble.hci import STATUS_SPEC, HCI_Command, metadata
10+
11+
12+
@HCI_Command.command
13+
@dataclasses.dataclass
14+
class HCI_Read_Default_Link_Policy_Settings_Command(HCI_Command):
15+
'''
16+
See Bluetooth spec @ 7.2.11 Read Default Link Policy Settings command
17+
'''
18+
19+
return_parameters_fields = [
20+
('status', STATUS_SPEC),
21+
('Default_Link_Policy_Settings', 2),
22+
]
23+
24+
25+
@HCI_Command.command
26+
@dataclasses.dataclass
27+
class HCI_Write_Current_Iac_Lap_Command(HCI_Command):
28+
'''
29+
See Bluetooth spec @ 7.3.45 Write Current IAC LAP command
30+
'''
31+
32+
iac_lap: Sequence[int] = field(metadata=metadata(3, list_begin=True, list_end=True))
33+
34+
35+
@HCI_Command.command
36+
@dataclasses.dataclass
37+
class HCI_Host_Number_Of_Completed_Packets_Command(HCI_Command):
38+
'''
39+
See Bluetooth spec @ 7.3.40 Host Number Of Completed Packets command
40+
'''
41+
42+
connection_handle: Sequence[int] = field(metadata=metadata(2, list_begin=True, list_end=True))
43+
num_of_completed_packets: int = field(metadata=metadata(2))

0 commit comments

Comments
 (0)