|
1 | | -from bleak import BleakScanner, BleakClient |
2 | | -from bleak.backends.service import BleakGATTServiceCollection |
3 | | -from bleak.backends.device import BLEDevice |
4 | | -from bleak.exc import BleakError |
5 | | -import argparse, asyncio |
6 | | - |
7 | | - |
8 | | -async def scan(args): |
9 | | - print("Started BLE scan\n") |
10 | | - |
11 | | - base_kwargs = dict(adapter=args.adapter, timeout=args.sec) |
12 | | - |
13 | | - if args.service_uuid: |
14 | | - devices = await BleakScanner.discover(**base_kwargs, service_uuids=[args.service_uuid]) |
15 | | - else: |
16 | | - devices = await BleakScanner.discover(**base_kwargs) |
17 | | - |
18 | | - await general_scan(devices) |
19 | | - |
20 | | - if args.addr: |
21 | | - await deep_scan(args.addr, devices) |
22 | | - |
23 | | - print("\nFinished BLE scan") |
24 | | - |
25 | | -async def general_scan(devices: [BLEDevice]): |
26 | | - sorted_devices = sorted(devices, key=lambda dev: dev.rssi, reverse=True) |
27 | | - |
28 | | - for d in sorted_devices: |
29 | | - print(f'{d.address} (RSSI={d.rssi}): {d.name}') |
30 | | - |
31 | | - |
32 | | -async def deep_scan(addr: str, devices: [BLEDevice]): |
33 | | - print(f"\nStarted deep scan of {addr}\n") |
34 | | - |
35 | | - devices = filter(lambda dev: dev.address == addr, devices) |
36 | | - devices_list = list(devices) |
37 | | - if len(devices_list) > 0: |
38 | | - device = devices_list[0] |
39 | | - print(f'Found device {device} (out of {len(devices_list)})') |
40 | | - else: |
41 | | - print('Found no device with matching address') |
42 | | - return |
43 | | - |
44 | | - async with BleakClient(device) as client: |
45 | | - print_details(await client.get_services()) |
46 | | - |
47 | | - print(f"\nCompleted deep scan of {addr}") |
48 | | - |
49 | | -def print_details(serv: BleakGATTServiceCollection): |
50 | | - INDENT = ' ' |
51 | | - for s in serv: |
52 | | - print('SERVICE', s) |
53 | | - for char in s.characteristics: |
54 | | - print(INDENT, 'CHARACTERISTIC', char, char.properties) |
55 | | - for desc in char.descriptors: |
56 | | - print(INDENT*2, 'DESCRIPTOR', desc) |
57 | | - |
58 | | - |
59 | | -# Extra function for console scripts |
60 | | -def main(): |
61 | | - parser = argparse.ArgumentParser( |
62 | | - description='Scanner for BLE devices and service/characteristics.', |
63 | | - formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
64 | | - parser.add_argument('-t', '--scan-time', dest='sec', default=5.0, type=float, |
65 | | - help='Duration of the scan in seconds') |
66 | | - parser.add_argument('-i', '--interface', dest='adapter', required=False, default='hci0', |
67 | | - help='BLE host adapter number to use') |
68 | | - parser.add_argument('-d', '--deep-scan', dest='addr', type=str, |
69 | | - help='Try to connect to device and read out service/characteristic UUIDs') |
70 | | - parser.add_argument('-s', '--service-uuid', dest='service_uuid', required=False, |
71 | | - help='The service used for scanning of potential devices') |
72 | | - args = parser.parse_args() |
73 | | - |
74 | | - try: |
75 | | - asyncio.run(scan(args)) |
76 | | - except BleakError as be: |
77 | | - print('ERROR:', be) |
78 | | - |
0 commit comments