|
| 1 | +import lineus |
| 2 | +import threading |
| 3 | +import json |
| 4 | +import time |
| 5 | + |
| 6 | + |
| 7 | +class Diagnostics(threading.Thread): |
| 8 | + |
| 9 | + def __init__(self): |
| 10 | + threading.Thread.__init__(self) |
| 11 | + self.my_line_us = lineus.LineUs() |
| 12 | + self.diags = {} |
| 13 | + self.status_callback = None |
| 14 | + self.complete_callback = None |
| 15 | + |
| 16 | + def on_status(self, callback): |
| 17 | + self.status_callback = callback |
| 18 | + |
| 19 | + def on_complete(self, callback): |
| 20 | + self.complete_callback = callback |
| 21 | + |
| 22 | + def run(self): |
| 23 | + self.status('Finding networks') |
| 24 | + self.diags['networks'] = self.my_line_us.get_network_list() |
| 25 | + if self.diags['networks'] is not None: |
| 26 | + self.diags['scanned'] = {} |
| 27 | + networks = self.diags['networks'] |
| 28 | + for i in range(0, len(networks)): |
| 29 | + self.status(f'Looking for Line-us on {networks[i]["name"]}') |
| 30 | + line_us_list = self.my_line_us.slow_search(network=i, return_first=False) |
| 31 | + self.diags['scanned'][networks[i]['name']] = line_us_list |
| 32 | + |
| 33 | + self.status('Looking for mdns Line-us') |
| 34 | + self.diags['mdns'] = self.my_line_us.get_line_us_list() |
| 35 | + |
| 36 | + self.status('Checking scanned Line-us') |
| 37 | + self.diags['connections_scanned'] = {} |
| 38 | + for network in self.diags['scanned']: |
| 39 | + for line_us in self.diags['scanned'][network]: |
| 40 | + self.status(f'Trying to contact {line_us[0]} by all methods') |
| 41 | + check_result = self.check_line_us(line_us) |
| 42 | + self.diags['connections_scanned'][line_us[0]] = check_result |
| 43 | + |
| 44 | + self.status('Checking for Line-us found by mdns') |
| 45 | + self.diags['connections_mdns'] = {} |
| 46 | + for line_us in self.diags['mdns']: |
| 47 | + self.status(f'Trying to contact {line_us[0]} by all methods') |
| 48 | + check_result = self.check_line_us(line_us) |
| 49 | + self.diags['connections_mdns'][line_us[0]] = check_result |
| 50 | + |
| 51 | + if self.complete_callback is not None: |
| 52 | + self.complete_callback(self.diags) |
| 53 | + |
| 54 | + def check_line_us(self, line_us): |
| 55 | + connection_result = {'info': line_us} |
| 56 | + types = ('DNS', 'mDNS', 'IP') |
| 57 | + for connection_type in range(0, 3): |
| 58 | + success, hello = self.connect_line_us(line_us[connection_type]) |
| 59 | + connection_type_name = types[connection_type] |
| 60 | + # print(f'{line_us[0]} via {connection_type_name} is {success}. Hello={str(hello)}') |
| 61 | + connection_result[connection_type_name] = (success, hello) |
| 62 | + time.sleep(2) |
| 63 | + ping = self.my_line_us.ping(line_us[2]) |
| 64 | + connection_result['ping'] = ping |
| 65 | + return connection_result |
| 66 | + |
| 67 | + def connect_line_us(self, line_us): |
| 68 | + success = self.my_line_us.connect(line_us) |
| 69 | + if success: |
| 70 | + return True, self.my_line_us.get_hello_string() |
| 71 | + else: |
| 72 | + return False, {} |
| 73 | + |
| 74 | + def status(self, message): |
| 75 | + if self.status_callback is not None: |
| 76 | + self.status_callback(message) |
| 77 | + |
| 78 | + def get_results(self): |
| 79 | + return self.diags |
| 80 | + |
| 81 | + |
| 82 | +if __name__ == '__main__': |
| 83 | + |
| 84 | + def done(info): |
| 85 | + print(f'Finished:') |
| 86 | + print(json.dumps(info, indent=4)) |
| 87 | + |
| 88 | + def status(info): |
| 89 | + print(f'Status: {info}') |
| 90 | + |
| 91 | + d = Diagnostics() |
| 92 | + d.on_complete(done) |
| 93 | + d.on_status(status) |
| 94 | + d.start() |
| 95 | + d.join() |
0 commit comments