Skip to content

Commit 28e0420

Browse files
committed
Add state reporting back to metadata agents
The call to initialize state reporting was removed in [0], which resulted in the metadata-agent for ML2/OVS being reported as Not Alive (XXX) when 'openstack network agent list' is run, even though it is running properly. The second part of this fix is that we need force the metadata agents to run using the oslo.service threading backend, not the eventlet one. Otherwise the background thread sending RPC updates to neutron-server will never start. This also needs to be backported to stable/2025.1 as the original change was merged there, but since oslo.service 4.2.0 does not support the threading backend, we will need to implement private versions of LoopingCallBase and FixedIntervalLoopingCall there. NOTE: this backport does not include the change in the agents code, selecting the "threading" backend because this is not present in 2025.1. [0] https://review.opendev.org/c/openstack/neutron/+/942916 Conflicts: neutron/common/loopingcall.py neutron/cmd/agents/metadata.py neutron/cmd/agents/ovn_metadata.py neutron/cmd/agents/ovn_neutron_agent.py Closes-bug: #2112492 Change-Id: I4399b6aca1984003e0b564552cc1907425241b9d (cherry picked from commit 3ff9cfd)
1 parent dc74c31 commit 28e0420

File tree

3 files changed

+230
-2
lines changed

3 files changed

+230
-2
lines changed

neutron/agent/metadata/agent.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from neutron_lib import context
2222
from oslo_config import cfg
2323
from oslo_log import log as logging
24-
from oslo_service import loopingcall
2524
import requests
2625
import webob
2726

@@ -31,6 +30,7 @@
3130
from neutron.agent.metadata import proxy_base
3231
from neutron.agent import rpc as agent_rpc
3332
from neutron.common import ipv6_utils
33+
from neutron.common import loopingcall
3434
from neutron.common import metadata as common_metadata
3535
from neutron.common import utils as common_utils
3636

@@ -325,4 +325,5 @@ def run(self):
325325
self._server = socketserver.ThreadingUnixStreamServer(
326326
file_socket, MetadataProxyHandler)
327327
MetadataProxyHandler._conf = self.conf
328+
self._init_state_reporting()
328329
self._server.serve_forever()

neutron/common/loopingcall.py

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
# Copyright (C) 2025 Red Hat, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
# implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
# NOTE(ralonsoh): this is the implementation of ``FixedIntervalLoopingCall``
16+
# and all needed resources done in ``oslo.service`` 4.2.0, in the ``threading``
17+
# backend. Because this code is not going to be backported or available in
18+
# 2025.1.
19+
20+
import sys
21+
import threading
22+
23+
import futurist
24+
from oslo_log import log as logging
25+
from oslo_utils import reflection
26+
from oslo_utils import timeutils
27+
28+
from neutron._i18n import _
29+
30+
31+
LOG = logging.getLogger(__name__)
32+
33+
34+
class FutureEvent:
35+
"""A simple event object that can carry a result or an exception."""
36+
37+
def __init__(self):
38+
self._event = threading.Event()
39+
self._result = None
40+
self._exc_info = None
41+
42+
def send(self, result):
43+
self._result = result
44+
self._event.set()
45+
46+
def send_exception(self, exc_type, exc_value, tb):
47+
self._exc_info = (exc_type, exc_value, tb)
48+
self._event.set()
49+
50+
def wait(self, timeout=None):
51+
flag = self._event.wait(timeout)
52+
53+
if not flag:
54+
raise RuntimeError(_('Timed out waiting for event'))
55+
56+
if self._exc_info:
57+
exc_type, exc_value, tb = self._exc_info
58+
raise exc_value.with_traceback(tb)
59+
return self._result
60+
61+
62+
class LoopingCallDone(Exception):
63+
"""Exception to break out and stop a LoopingCallBase.
64+
65+
The function passed to a looping call may raise this exception to
66+
break out of the loop normally. An optional return value may be
67+
provided; this value will be returned by LoopingCallBase.wait().
68+
"""
69+
70+
def __init__(self, retvalue=True):
71+
""":param retvalue: Value that LoopingCallBase.wait() should return."""
72+
self.retvalue = retvalue
73+
74+
75+
def _safe_wrapper(f, kind, func_name):
76+
"""Wrapper that calls the wrapped function and logs errors as needed."""
77+
78+
def func(*args, **kwargs):
79+
try:
80+
return f(*args, **kwargs)
81+
except LoopingCallDone:
82+
raise # Let the outer handler process this
83+
except Exception:
84+
LOG.error('%(kind)s %(func_name)r failed',
85+
{'kind': kind, 'func_name': func_name},
86+
exc_info=True)
87+
return 0
88+
89+
return func
90+
91+
92+
class LoopingCallBase:
93+
KIND = _("Unknown looping call")
94+
RUN_ONLY_ONE_MESSAGE = _(
95+
"A looping call can only run one function at a time")
96+
97+
def __init__(self, *args, f=None, **kwargs):
98+
self.args = args
99+
self.kwargs = kwargs
100+
self.f = f
101+
self._future = None
102+
self.done = None
103+
self._abort = threading.Event() # When set, the loop stops
104+
105+
@property
106+
def _running(self):
107+
return not self._abort.is_set()
108+
109+
def stop(self):
110+
if self._running:
111+
self._abort.set()
112+
113+
def wait(self):
114+
"""Wait for the looping call to complete and return its result."""
115+
return self.done.wait()
116+
117+
def _on_done(self, future):
118+
self._future = None
119+
120+
def _sleep(self, timeout):
121+
# Instead of eventlet.sleep, we wait on the abort event for timeout
122+
# seconds.
123+
self._abort.wait(timeout)
124+
125+
def _start(self, idle_for, initial_delay=None, stop_on_exception=True):
126+
"""Start the looping call.
127+
128+
:param idle_for: Callable taking two arguments (last result,
129+
elapsed time) and returning how long to idle.
130+
:param initial_delay: Delay (in seconds) before starting the
131+
loop.
132+
:param stop_on_exception: Whether to stop on exception.
133+
:returns: A FutureEvent instance.
134+
"""
135+
136+
if self._future is not None:
137+
raise RuntimeError(self.RUN_ONLY_ONE_MESSAGE)
138+
139+
self.done = FutureEvent()
140+
self._abort.clear()
141+
142+
def _run_loop():
143+
kind = self.KIND
144+
func_name = reflection.get_callable_name(self.f)
145+
func = self.f if stop_on_exception else _safe_wrapper(self.f, kind,
146+
func_name)
147+
if initial_delay:
148+
self._sleep(initial_delay)
149+
try:
150+
watch = timeutils.StopWatch()
151+
152+
while self._running:
153+
watch.restart()
154+
result = func(*self.args, **self.kwargs)
155+
watch.stop()
156+
157+
if not self._running:
158+
break
159+
160+
idle = idle_for(result, watch.elapsed())
161+
LOG.debug(
162+
'%(kind)s %(func_name)r sleeping for %(idle).02f'
163+
' seconds',
164+
{'func_name': func_name, 'idle': idle, 'kind': kind})
165+
self._sleep(idle)
166+
except LoopingCallDone as e:
167+
self.done.send(e.retvalue)
168+
except Exception:
169+
exc_info = sys.exc_info()
170+
try:
171+
LOG.error('%(kind)s %(func_name)r failed',
172+
{'kind': kind, 'func_name': func_name},
173+
exc_info=exc_info)
174+
self.done.send_exception(*exc_info)
175+
finally:
176+
del exc_info
177+
return
178+
else:
179+
self.done.send(True)
180+
181+
# Use futurist's ThreadPoolExecutor to run the loop in a background
182+
# thread.
183+
executor = futurist.ThreadPoolExecutor(max_workers=1)
184+
self._future = executor.submit(_run_loop)
185+
self._future.add_done_callback(self._on_done)
186+
return self.done
187+
188+
# NOTE: _elapsed() is a thin wrapper for StopWatch.elapsed()
189+
def _elapsed(self, watch):
190+
return watch.elapsed()
191+
192+
193+
class FixedIntervalLoopingCall(LoopingCallBase):
194+
"""A fixed interval looping call."""
195+
RUN_ONLY_ONE_MESSAGE = _(
196+
"A fixed interval looping call can only run one function at a time")
197+
KIND = _('Fixed interval looping call')
198+
199+
def start(self, interval, initial_delay=None, stop_on_exception=True):
200+
def _idle_for(result, elapsed):
201+
delay = round(elapsed - interval, 2)
202+
if delay > 0:
203+
func_name = reflection.get_callable_name(self.f)
204+
LOG.warning(
205+
'Function %(func_name)r run outlasted interval by'
206+
' %(delay).2f sec',
207+
{'func_name': func_name, 'delay': delay})
208+
return -delay if delay < 0 else 0
209+
210+
return self._start(_idle_for, initial_delay=initial_delay,
211+
stop_on_exception=stop_on_exception)

neutron/tests/unit/agent/metadata/test_agent.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# License for the specific language governing permissions and limitations
1313
# under the License.
1414

15+
import socketserver
1516
from unittest import mock
1617

1718
import ddt
@@ -393,7 +394,7 @@ def setUp(self):
393394
self.cfg_p = mock.patch.object(agent, 'cfg')
394395
self.cfg = self.cfg_p.start()
395396
looping_call_p = mock.patch(
396-
'oslo_service.loopingcall.FixedIntervalLoopingCall')
397+
'neutron.common.loopingcall.FixedIntervalLoopingCall')
397398
self.looping_mock = looping_call_p.start()
398399
self.cfg.CONF.metadata_proxy_socket = '/the/path'
399400
self.cfg.CONF.metadata_workers = 0
@@ -435,6 +436,21 @@ def test_init_exists_unlink_fails_file_still_exists(self):
435436
agent.UnixDomainMetadataProxy(mock.Mock())
436437
unlink.assert_called_once_with('/the/path')
437438

439+
@mock.patch.object(agent, 'MetadataProxyHandler')
440+
@mock.patch.object(socketserver, 'ThreadingUnixStreamServer')
441+
@mock.patch.object(fileutils, 'ensure_tree')
442+
def test_run(self, ensure_dir, server, handler):
443+
p = agent.UnixDomainMetadataProxy(self.cfg.CONF)
444+
p.run()
445+
446+
ensure_dir.assert_called_once_with('/the', mode=0o755)
447+
server.assert_has_calls([
448+
mock.call('/the/path', mock.ANY),
449+
mock.call().serve_forever()])
450+
self.looping_mock.assert_called_once_with(p._report_state)
451+
self.looping_mock.return_value.start.assert_called_once_with(
452+
interval=mock.ANY)
453+
438454
def test_main(self):
439455
with mock.patch.object(agent, 'UnixDomainMetadataProxy') as proxy:
440456
with mock.patch.object(metadata_agent, 'config') as config:

0 commit comments

Comments
 (0)