Skip to content

Commit 183cab6

Browse files
committed
refactor advertising.py for peripheral service solicitation; skeletal CTS (no pairing yet)
1 parent 49e0428 commit 183cab6

File tree

2 files changed

+173
-49
lines changed

2 files changed

+173
-49
lines changed

adafruit_ble/advertising.py

Lines changed: 94 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ class AdvertisingPacket:
5151
"""Incomplete list of 128 bit service UUIDs."""
5252
ALL_128_BIT_SERVICE_UUIDS = 0x07
5353
"""Complete list of 128 bit service UUIDs."""
54+
SOLICITED_16_BIT_SERVICE_UUIDS = 0x14
55+
"""List of 16 bit service UUIDs solicited by a peripheral."""
56+
SOLICITED_128_BIT_SERVICE_UUIDS = 0x15
57+
"""List of 128 bit service UUIDs solicited by a peripheral."""
5458
SHORT_LOCAL_NAME = 0x08
5559
"""Short local device name (shortened to fit)."""
5660
COMPLETE_LOCAL_NAME = 0x09
@@ -131,101 +135,142 @@ def get(self, element_type, default=None):
131135
except KeyError:
132136
return default
133137

138+
@property
139+
def length(self):
140+
"""Current number of bytes in packet."""
141+
return len(self._packet_bytes)
142+
134143
@property
135144
def bytes_remaining(self):
136145
"""Number of bytes still available for use in the packet."""
137-
return self._max_length - len(self._packet_bytes)
146+
return self._max_length - self.length
138147

139148
def _check_length(self):
140-
if len(self._packet_bytes) > self._max_length:
149+
if self.length > self._max_length:
141150
raise IndexError("Advertising data too long")
142151

152+
def add_flags(self, flags=(FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY)):
153+
"""Add advertising flags."""
154+
self.add_field(self.FLAGS, struct.pack("<B", flags))
155+
143156
def add_field(self, field_type, field_data):
144-
"""Append an advertising data field to the current packet, of the given type.
157+
"""Append byte data to the current packet, of the given type.
145158
The length field is calculated from the length of field_data."""
146159
self._packet_bytes.append(1 + len(field_data))
147160
self._packet_bytes.append(field_type)
148161
self._packet_bytes.extend(field_data)
149162
self._check_length()
150163

151-
def add_flags(self, flags=(FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY)):
152-
"""Add default or custom advertising flags."""
153-
self.add_field(self.FLAGS, struct.pack("<B", flags))
154-
155-
def add_16_bit_uuids(self, uuids):
156-
"""Add a complete list of 16 bit service UUIDs."""
157-
for uuid in uuids:
158-
self.add_field(self.ALL_16_BIT_SERVICE_UUIDS, struct.pack("<H", uuid.uuid16))
159-
160-
def add_128_bit_uuids(self, uuids):
161-
"""Add a complete list of 128 bit service UUIDs."""
162-
for uuid in uuids:
163-
self.add_field(self.ALL_128_BIT_SERVICE_UUIDS, uuid.uuid128)
164-
165164
def add_mfr_specific_data(self, mfr_id, data):
166165
"""Add manufacturer-specific data bytes."""
167166
self.add_field(self.MANUFACTURER_SPECIFIC_DATA, struct.pack('<H', mfr_id) + data)
168167

168+
def add_tx_power(self, tx_power):
169+
"""Add transmit power value."""
170+
self.add_field(AdvertisingPacket.TX_POWER, struct.pack("<b", tx_power))
169171

170-
class ServerAdvertisement:
171-
"""
172-
Data to advertise a peripheral's services.
172+
def add_appearance(self, appearance):
173+
"""Add BLE Appearance value."""
174+
self.add_field(AdvertisingPacket.APPEARANCE, struct.pack("<h", appearance))
173175

174-
The advertisement consists of an advertising data packet and an optional scan response packet,
175-
The scan response packet is created only if there is not room in the
176-
advertising data packet for the complete peripheral name.
177176

178-
:param peripheral Peripheral the Peripheral to advertise. Use its services and name
179-
:param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm
180-
"""
181-
182-
def __init__(self, peripheral, *, tx_power=0):
183-
self._peripheral = peripheral
177+
class Advertisement:
178+
"""Superclass for common code to construct a BLE advertisement,
179+
consisting of an advertising data packet and an optional scan response packet.
184180
185-
packet = AdvertisingPacket()
186-
packet.add_flags()
181+
:param int flags: advertising flags. Default is general discovery, and BLE only (not classic)
182+
"""
183+
def __init__(self, flags=None, tx_power=None):
184+
self._packet = AdvertisingPacket()
187185
self._scan_response_packet = None
186+
if flags:
187+
self._packet.add_flags(flags)
188+
else:
189+
self._packet.add_flags()
188190

189-
# Need to check service.secondary
190-
uuids_16_bits = [service.uuid for service in peripheral.services
191-
if service.uuid.size == 16 and not service.secondary]
192-
if uuids_16_bits:
193-
packet.add_16_bit_uuids(uuids_16_bits)
194-
195-
uuids_128_bits = [service.uuid for service in peripheral.services
196-
if service.uuid.size == 128 and not service.secondary]
197-
if uuids_128_bits:
198-
packet.add_128_bit_uuids(uuids_128_bits)
199-
200-
packet.add_field(AdvertisingPacket.TX_POWER, struct.pack("<b", tx_power))
191+
if tx_power is not None:
192+
self._packet.add_tx_power(tx_power)
201193

194+
def add_name(self, name):
195+
"""Add name to advertisement. If it doesn't fit, add truncated name to packet,
196+
and add complete name to scan response packet.
197+
"""
202198
# 2 bytes needed for field length and type.
203-
bytes_available = packet.bytes_remaining - 2
199+
bytes_available = self._packet.bytes_remaining - 2
204200
if bytes_available <= 0:
205201
raise IndexError("No room for name")
206202

207-
name_bytes = bytes(peripheral.name, 'utf-8')
203+
name_bytes = bytes(name, 'utf-8')
208204
if bytes_available >= len(name_bytes):
209-
packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME, name_bytes)
205+
self._packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME, name_bytes)
210206
else:
211-
packet.add_field(AdvertisingPacket.SHORT_LOCAL_NAME, name_bytes[:bytes_available])
207+
self._packet.add_field(AdvertisingPacket.SHORT_LOCAL_NAME, name_bytes[:bytes_available])
212208
self._scan_response_packet = AdvertisingPacket()
213209
try:
214210
self._scan_response_packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME,
215211
name_bytes)
216212
except IndexError:
217213
raise IndexError("Name too long")
218214

219-
self._advertising_data_packet = packet
215+
def add_uuids(self, uuids, field_type_16_bit_uuids, field_type_128_bit_uuids):
216+
"""Add 16-bit and 128-bit uuids to the packet, using the given field types."""
217+
concatenated_16_bit_uuids = b''.join(
218+
struct.pack("<H", uuid.uuid16) for uuid in uuids if uuid.size == 16)
219+
if concatenated_16_bit_uuids:
220+
self._packet.add_field(field_type_16_bit_uuids, concatenated_16_bit_uuids)
221+
222+
uuids_128_bits = [uuid for uuid in uuids if uuid.size == 128]
223+
if len(uuids_128_bits) > 1:
224+
raise ValueError("Only one 128 bit UUID will fit")
225+
if uuids_128_bits:
226+
self._packet.add_field(field_type_128_bit_uuids, uuids_128_bits[0].uuid128)
220227

221228
@property
222229
def advertising_data_bytes(self):
223230
"""The raw bytes for the initial advertising data packet."""
224-
return self._advertising_data_packet.packet_bytes
231+
return self._packet.packet_bytes
225232

226233
@property
227234
def scan_response_bytes(self):
228235
"""The raw bytes for the scan response packet. None if there is no response packet."""
229236
if self._scan_response_packet:
230237
return self._scan_response_packet.packet_bytes
231238
return None
239+
240+
241+
class ServerAdvertisement(Advertisement):
242+
"""Build an advertisement for a peripheral's services.
243+
244+
There is room in the packet for only one 128-bit UUID. Giving UUIDs in the scan response
245+
is not yet implemented.
246+
247+
:param Peripheral peripheral: the Peripheral to advertise. Use its services and name.
248+
:param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm
249+
"""
250+
251+
def __init__(self, peripheral, *, tx_power=0):
252+
super().__init__()
253+
uuids = [service.uuid for service in peripheral.services if not service.secondary]
254+
self.add_uuids(uuids,
255+
AdvertisingPacket.ALL_16_BIT_SERVICE_UUIDS,
256+
AdvertisingPacket.ALL_128_BIT_SERVICE_UUIDS)
257+
self.add_name(peripheral.name)
258+
259+
260+
class SolicitationAdvertisement(Advertisement):
261+
"""Build an advertisement for a peripheral to solicit one or more services from a central.
262+
263+
There is room in the packet for only one 128-bit UUID. Giving UUIDs in the scan response
264+
is not yet implemented.
265+
266+
:param string name: Name to use in advertisement.
267+
:param iterable service_uuids: One or more services requested from a central
268+
:param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm.
269+
"""
270+
271+
def __init__(self, name, service_uuids, *, tx_power=0):
272+
super().__init__()
273+
self.add_uuids(service_uuids,
274+
AdvertisingPacket.SOLICITED_16_BIT_SERVICE_UUIDS,
275+
AdvertisingPacket.SOLICITED_128_BIT_SERVICE_UUIDS)
276+
self.add_name(name)

adafruit_ble/current_time_client.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# The MIT License (MIT)
2+
#
3+
# Copyright (c) 2019 Dan Halbert for Adafruit Industries
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy
6+
# of this software and associated documentation files (the "Software"), to deal
7+
# in the Software without restriction, including without limitation the rights
8+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
# copies of the Software, and to permit persons to whom the Software is
10+
# furnished to do so, subject to the following conditions:
11+
#
12+
# The above copyright notice and this permission notice shall be included in
13+
# all copies or substantial portions of the Software.
14+
#
15+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
# THE SOFTWARE.
22+
"""
23+
`adafruit_ble.current_time_client`
24+
====================================================
25+
26+
UART-style communication by a Central as a GATT Client
27+
28+
* Author(s): Dan Halbert for Adafruit Industries
29+
30+
"""
31+
from bleio import Peripheral, UUID
32+
from .advertising import SolicitationAdvertisement
33+
34+
class CurrentTimeClient:
35+
"""
36+
Set up a peripheral that advertises for Current Time Service,
37+
and connects if found.
38+
39+
:param str name: Name to advertise for server. If None, use default Peripheral name.
40+
41+
Example::
42+
43+
from adafruit_ble.current_time_client import SolicitationAdvertisement
44+
45+
cts_client = CurrentTimeClient()
46+
cts_client.start_advertising()
47+
while not cts_client.connected:
48+
pass
49+
print(cts_client.time)
50+
"""
51+
52+
CTS_UUID = UUID(0x1805)
53+
54+
def __init__(self, name="CTSClient", tx_power=0):
55+
self._periph = Peripheral()
56+
self._advertisement = SolicitationAdvertisement(name, (self.CTS_UUID,), tx_power=tx_power)
57+
58+
def start_advertising(self):
59+
"""Start advertising to solicit a central that supports Current Time Service."""
60+
self._periph.start_advertising(self._advertisement.advertising_data_bytes,
61+
scan_response=self._advertisement.scan_response_bytes)
62+
63+
def stop_advertising(self):
64+
"""Stop advertising the service."""
65+
self._periph.stop_advertising()
66+
67+
@property
68+
def connected(self):
69+
"""True if a central connected to this peripheral."""
70+
return self._periph.connected
71+
72+
def pair(self):
73+
"""Pair with the connected central."""
74+
pass
75+
76+
@property
77+
def time(self):
78+
"""Get the current time from the server."""
79+
return None

0 commit comments

Comments
 (0)