Skip to content

Commit 1bf70e6

Browse files
donaldhPaolo Abeni
authored andcommitted
tools/net/ynl: improve async notification handling
The notification handling in ynl is currently very simple, using sleep() to wait a period of time and then handling all the buffered messages in a single batch. This patch changes the notification handling so that messages are processed as they are received. This makes it possible to use ynl as a library that supplies notifications in a timely manner. - Change check_ntf() to be a generator that yields 1 notification at a time and blocks until a notification is available. - Use the --sleep parameter to set an alarm and exit when it fires. This means that the CLI has the same interface, but notifications get printed as they are received: ./tools/net/ynl/cli.py --spec <SPEC> --subscribe <TOPIC> [ --sleep <SECS> ] Here is an example python snippet that shows how to use ynl as a library for receiving notifications: ynl = YnlFamily(f"{dir}/rt_route.yaml") ynl.ntf_subscribe('rtnlgrp-ipv4-route') for event in ynl.check_ntf(): handle(event) Signed-off-by: Donald Hunter <[email protected]> Tested-by: Kory Maincent <[email protected]> Link: https://patch.msgid.link/[email protected] Signed-off-by: Paolo Abeni <[email protected]>
1 parent d05596f commit 1bf70e6

File tree

2 files changed

+36
-23
lines changed

2 files changed

+36
-23
lines changed

tools/net/ynl/cli.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import json
66
import pprint
77
import time
8+
import signal
89

910
from lib import YnlFamily, Netlink, NlError
1011

@@ -17,6 +18,8 @@ def default(self, obj):
1718
return list(obj)
1819
return json.JSONEncoder.default(self, obj)
1920

21+
def handle_timeout(sig, frame):
22+
exit(0)
2023

2124
def main():
2225
description = """
@@ -81,7 +84,8 @@ def output(msg):
8184
ynl.ntf_subscribe(args.ntf)
8285

8386
if args.sleep:
84-
time.sleep(args.sleep)
87+
signal.signal(signal.SIGALRM, handle_timeout)
88+
signal.alarm(args.sleep)
8589

8690
if args.list_ops:
8791
for op_name, op in ynl.ops.items():
@@ -106,8 +110,8 @@ def output(msg):
106110
exit(1)
107111

108112
if args.ntf:
109-
ynl.check_ntf()
110-
output(ynl.async_msg_queue)
113+
for msg in ynl.check_ntf():
114+
output(msg)
111115

112116

113117
if __name__ == "__main__":

tools/net/ynl/lib/ynl.py

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import yaml
1313
import ipaddress
1414
import uuid
15+
import queue
16+
import time
1517

1618
from .nlspec import SpecFamily
1719

@@ -489,7 +491,7 @@ def __init__(self, def_path, schema=None, process_unknown=False,
489491
self.sock.setsockopt(Netlink.SOL_NETLINK, Netlink.NETLINK_GET_STRICT_CHK, 1)
490492

491493
self.async_msg_ids = set()
492-
self.async_msg_queue = []
494+
self.async_msg_queue = queue.Queue()
493495

494496
for msg in self.msgs.values():
495497
if msg.is_async:
@@ -903,32 +905,39 @@ def handle_ntf(self, decoded):
903905

904906
msg['name'] = op['name']
905907
msg['msg'] = attrs
906-
self.async_msg_queue.append(msg)
908+
self.async_msg_queue.put(msg)
907909

908-
def check_ntf(self):
910+
def check_ntf(self, interval=0.1):
909911
while True:
910912
try:
911913
reply = self.sock.recv(self._recv_size, socket.MSG_DONTWAIT)
912-
except BlockingIOError:
913-
return
914+
nms = NlMsgs(reply)
915+
self._recv_dbg_print(reply, nms)
916+
for nl_msg in nms:
917+
if nl_msg.error:
918+
print("Netlink error in ntf!?", os.strerror(-nl_msg.error))
919+
print(nl_msg)
920+
continue
921+
if nl_msg.done:
922+
print("Netlink done while checking for ntf!?")
923+
continue
914924

915-
nms = NlMsgs(reply)
916-
self._recv_dbg_print(reply, nms)
917-
for nl_msg in nms:
918-
if nl_msg.error:
919-
print("Netlink error in ntf!?", os.strerror(-nl_msg.error))
920-
print(nl_msg)
921-
continue
922-
if nl_msg.done:
923-
print("Netlink done while checking for ntf!?")
924-
continue
925+
decoded = self.nlproto.decode(self, nl_msg, None)
926+
if decoded.cmd() not in self.async_msg_ids:
927+
print("Unexpected msg id while checking for ntf", decoded)
928+
continue
925929

926-
decoded = self.nlproto.decode(self, nl_msg, None)
927-
if decoded.cmd() not in self.async_msg_ids:
928-
print("Unexpected msg id done while checking for ntf", decoded)
929-
continue
930+
self.handle_ntf(decoded)
931+
except BlockingIOError:
932+
pass
930933

931-
self.handle_ntf(decoded)
934+
try:
935+
yield self.async_msg_queue.get_nowait()
936+
except queue.Empty:
937+
try:
938+
time.sleep(interval)
939+
except KeyboardInterrupt:
940+
return
932941

933942
def operation_do_attributes(self, name):
934943
"""

0 commit comments

Comments
 (0)