Skip to content

Commit c943b1d

Browse files
committed
feat: added JeevesListener for unified DCT on both Jeeves api and Jeeves agents
1 parent 6df582e commit c943b1d

File tree

15 files changed

+385
-154
lines changed

15 files changed

+385
-154
lines changed

constants.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,19 @@ class JeevesCt:
157157
"KEYSOFT_JEEVES",
158158
]
159159

160+
JEEVES_AGENT_SIGNATURES = [
161+
"DOC_EMBEDDING_AGENT",
162+
"LLM_AGENT",
163+
]
164+
165+
JEEVES_PLUGIN_SIGNATURES = JEEVES_API_SIGNATURES + JEEVES_AGENT_SIGNATURES
166+
167+
UNIFIED_PATH_FILTER = [
168+
None,
169+
None,
170+
JEEVES_PLUGIN_SIGNATURES,
171+
None
172+
]
160173
AGENT_PATH_FILTER = [
161174
None,
162175
None,
@@ -166,10 +179,7 @@ class JeevesCt:
166179
API_PATH_FILTER = [
167180
None,
168181
None,
169-
[
170-
"DOC_EMBEDDING_AGENT",
171-
"LLM_AGENT"
172-
],
182+
JEEVES_AGENT_SIGNATURES,
173183
None
174184
]
175185

docker-compose/start_n_containers.bat

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,21 @@ set DEPLOY_DEBUG=false
66
set USE_LOCAL_IMAGE=true
77
set USE_GPU=true
88
set USE_IPC_HOST=true
9-
set EXPOSED_PORT=15033
9+
set EXPOSED_PORT=15034
10+
set USE_CACHED_DOCKER_COMPOSE=true
1011

1112

1213
REM Hardcoded number of containers
13-
set NUM_CONTAINERS=1
14+
set NUM_CONTAINERS=2
1415
set NUM_SUPERVISORS=0
16+
set START_CONTAINER_INDEX=1
17+
set END_CONTAINER_INDEX=%NUM_CONTAINERS% + %START_CONTAINER_INDEX% - 1
1518

1619

1720
if !USE_LOCAL_IMAGE! == true (
1821
set CONTAINER_IMAGE=local_edge_node
1922
) else (
20-
set CONTAINER_IMAGE=ratio1/edge_node:develop
23+
set CONTAINER_IMAGE=ratio1/edge_node:devnet
2124
)
2225

2326
if !DEPLOY_DEBUG! == true (
@@ -63,9 +66,11 @@ set WATCHTOWER_LABEL=com.centurylinklabs.watchtower.enable=true
6366
REM Generate the list of container IDs dynamically
6467
for /l %%i in (1,1,%NUM_CONTAINERS%) do (
6568
REM Generic container ID and edge node ID
66-
set CONTAINER_IDS[%%i]=!GENERIC_CONTAINER_ID!%%i
67-
set EDGE_NODE_IDS[%%i]=!GENERIC_EDGE_NODE_ID!%%i
68-
set CONTAINER_VOLUMES[%%i]=!GENERIC_CONTAINER_VOLUME!%%i
69+
set /a CURRENT_CONTAINER_IDX = %%i + !START_CONTAINER_INDEX! - 1
70+
echo Current container index: !CURRENT_CONTAINER_IDX!
71+
set CONTAINER_IDS[%%i]=!GENERIC_CONTAINER_ID!!CURRENT_CONTAINER_IDX!
72+
set EDGE_NODE_IDS[%%i]=!GENERIC_EDGE_NODE_ID!!CURRENT_CONTAINER_IDX!
73+
set CONTAINER_VOLUMES[%%i]=!GENERIC_CONTAINER_VOLUME!!CURRENT_CONTAINER_IDX!
6974

7075
REM Check if the container is a supervisor
7176
if %%i leq %NUM_SUPERVISORS% (
@@ -76,6 +81,14 @@ for /l %%i in (1,1,%NUM_CONTAINERS%) do (
7681
)
7782

7883

84+
REM Check if we should use cached docker-compose.yaml
85+
if !USE_CACHED_DOCKER_COMPOSE! == true (
86+
if exist docker-compose.yaml (
87+
echo Using cached docker-compose.yaml
88+
goto :START_CONTAINERS
89+
)
90+
)
91+
7992
REM Generate docker-compose.yaml dynamically
8093
echo services: > docker-compose.yaml
8194
for /l %%i in (1,1,%NUM_CONTAINERS%) do (
@@ -153,6 +166,7 @@ REM Maybe unnecessary
153166
@REM echo !CONTAINER_VOLUMES[%%i]!: >> docker-compose.yaml
154167
@REM )
155168

169+
:START_CONTAINERS
156170
REM Pull the containers
157171
docker-compose pull
158172

extensions/business/mixins/nlp_agent_mixin.py

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,60 @@
11
NLP_AGENT_MIXIN_CONFIG = {
22
'OBJECT_TYPE': [],
33
"ALLOW_EMPTY_INPUTS": False, # if this is set to true the on-idle will be triggered continuously the process
4+
"DEBUG_MODE": True,
45

56
"VALIDATION_RULES": {
67
},
78
}
89

910

1011
class _NlpAgentMixin(object):
11-
def compute_and_send_responses(self, inferences):
12+
def Pd(self, msg, **kwargs):
13+
if self.cfg_debug_mode:
14+
self.P(msg, **kwargs)
15+
return
16+
17+
def filter_valid_inferences(self, inferences, return_idxs=False):
18+
res = []
19+
idxs = []
20+
for idx, inf in enumerate(inferences):
21+
if isinstance(inf, dict) and inf.get("IS_VALID", True):
22+
res.append(inf)
23+
idxs.append(idx)
24+
# endfor inferences
25+
return res if not return_idxs else (res, idxs)
26+
27+
def inference_to_response(self, inference, model_name):
28+
return {
29+
'REQUEST_ID': inference.get('REQUEST_ID'),
30+
'MODEL_NAME': model_name,
31+
'TEXT_RESPONSE': inference.get('text'),
32+
}
33+
34+
def handle_inferences(self, inferences, data=None):
35+
if not isinstance(inferences, list):
36+
return
1237
model_name = inferences[0].get('MODEL_NAME', None) if len(inferences) > 0 else None
38+
cnt_initial_inferences = len(inferences)
39+
inferences, valid_idxs = self.filter_valid_inferences(inferences, return_idxs=True)
40+
self.Pd(f"Filtered {cnt_initial_inferences} inferences to {len(inferences)} valid inferences.")
41+
if data is not None:
42+
filtered_data = [
43+
data[idx] for idx in valid_idxs
44+
]
45+
if len(filtered_data) > 0:
46+
self.Pd(f"Received requests: {self.json_dumps(self.shorten_str(filtered_data), indent=2)}")
47+
# endif data is not None
48+
1349
for inf in inferences:
14-
request_result = {
15-
'REQUEST_ID': inf.get('REQUEST_ID'),
16-
'MODEL_NAME': model_name,
17-
'TEXT_RESPONSE': inf.get('text'),
18-
}
50+
request_id = inf.get('REQUEST_ID', None)
51+
self.Pd(f"Processing inference for request ID: {request_id}, model: {model_name}")
52+
request_result = self.inference_to_response(inf, model_name)
1953
current_payload_kwargs = {
2054
'result': request_result,
21-
'request_id': inf.get('REQUEST_ID'),
55+
'request_id': request_id,
2256
}
2357
self.add_payload_by_fields(**current_payload_kwargs)
2458
# endfor inferences
2559
return
2660

27-

extensions/business/nlp/doc_embedding_agent.py

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
from naeural_core.business.base import BasePluginExecutor as BasePlugin
2+
from extensions.business.mixins.nlp_agent_mixin import _NlpAgentMixin, NLP_AGENT_MIXIN_CONFIG
23

34
__VER__ = '0.1.0.0'
45

56
_CONFIG = {
67
# mandatory area
78
**BasePlugin.CONFIG,
9+
**NLP_AGENT_MIXIN_CONFIG,
810

911
'MAX_INPUTS_QUEUE_SIZE': 64,
1012

@@ -21,14 +23,9 @@
2123
}
2224

2325

24-
class DocEmbeddingAgentPlugin(BasePlugin):
26+
class DocEmbeddingAgentPlugin(BasePlugin, _NlpAgentMixin):
2527
CONFIG = _CONFIG
2628

27-
def D(self, msg, **kwargs):
28-
if self.cfg_debug_mode:
29-
self.P(msg, **kwargs)
30-
return
31-
3229
def on_init(self):
3330
self.__last_status_time = None
3431
self.__last_inference_meta = None
@@ -80,22 +77,13 @@ def maybe_send_status(self, inf_meta):
8077
# endif time to send status
8178
return
8279

80+
def inference_to_response(self, inference, model_name):
81+
return inference
82+
8383
def _process(self):
8484
inf_meta = self.dataapi_inferences_meta().get(self.cfg_ai_engine)
8585
self.maybe_send_status(inf_meta)
86+
data = self.dataapi_struct_datas()
8687
inferences = self.dataapi_struct_data_inferences()
87-
if inferences is None or len(inferences) == 0 or isinstance(inferences[0], list):
88-
return
89-
self.D(f'[Agent]Processing inferences: {str(inferences)[:50]}')
90-
for inf in inferences:
91-
self.D(f'[Agent]Processing inference: {inf["REQUEST_ID"]}')
92-
# For each inference a response payload will be created
93-
request_id = inf.get('REQUEST_ID')
94-
request_result = inf
95-
self.D(f'[Agent]Processing inference: {request_result}')
96-
self.add_payload_by_fields(
97-
result=request_result,
98-
request_id=request_id,
99-
)
100-
# endfor inferences
88+
self.handle_inferences(inferences, data=data)
10189
return

extensions/business/nlp/llm_agent.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,7 @@ class LlmAgentPlugin(BasePlugin, _NlpAgentMixin):
2424
def _process(self):
2525
# we always receive input from the upstream due to the fact that _process
2626
# is called only when we have input based on ALLOW_EMPTY_INPUTS=False (from NLP_AGENT_MIXIN_CONFIG)
27-
data = self.dataapi_struct_data()
27+
data = self.dataapi_struct_datas()
2828
inferences = self.dataapi_struct_data_inferences()
29-
if len(inferences) == 0:
30-
return
31-
if isinstance(inferences[0], list):
32-
return
33-
self.P(f"Received request:\n{self.json_dumps(self.shorten_str(data), indent=2)}")
34-
self.P(f"Received inferences:\n{inferences}")
35-
self.compute_and_send_responses(inferences)
29+
self.handle_inferences(inferences=inferences, data=data)
3630
return
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from extensions.data.default.jeeves.jeeves_listener import JeevesListenerDataCapture as BaseClass
2+
from constants import JeevesCt
3+
4+
5+
_CONFIG = {
6+
**BaseClass.CONFIG,
7+
8+
"PATH_FILTER": JeevesCt.AGENT_PATH_FILTER,
9+
"PING_PERIOD": 0.5, # seconds between pings
10+
11+
'PING_ENABLED': False, # whether to send ping inputs
12+
13+
'VALIDATION_RULES': {
14+
**BaseClass.CONFIG['VALIDATION_RULES'],
15+
},
16+
}
17+
18+
19+
class JeevesAgentListenerDataCapture(BaseClass):
20+
CONFIG = _CONFIG
21+
22+
def __init__(self, **kwargs):
23+
super(JeevesAgentListenerDataCapture, self).__init__(**kwargs)
24+
self.last_ping_time = 0
25+
return
26+
27+
def add_ping_input(self):
28+
self._add_inputs(
29+
[
30+
self._new_input(struct_data={
31+
'ping': True
32+
})
33+
]
34+
)
35+
36+
def needs_ping(self):
37+
"""
38+
Check if it's time to send a ping input.
39+
"""
40+
if not self.cfg_ping_enabled:
41+
return False
42+
if self._deque is None or len(self._deque) > 0:
43+
return False
44+
return self.time() - self.last_ping_time >= self.cfg_ping_period
45+
46+
def _run_data_aquisition_step(self):
47+
"""
48+
Override to of _run_data_aquisition_step to add ping inputs for Jeeves agents.
49+
"""
50+
super()._run_data_aquisition_step()
51+
if self.needs_ping():
52+
# Add a ping input for Jeeves agents
53+
self.add_ping_input()
54+
self.last_ping_time = self.time()
55+
# endif needs_ping
56+
return

extensions/data/default/iot/jeeves_api_listener.py renamed to extensions/data/default/jeeves/jeeves_api_listener.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from naeural_core.data.default.iot.network_listener import NetworkListenerDataCapture as BaseClass
1+
from extensions.data.default.jeeves.jeeves_listener import JeevesListenerDataCapture as BaseClass
22
from constants import JeevesCt
33

44

extensions/data/default/iot/jeeves_embed_agent_listener.py renamed to extensions/data/default/jeeves/jeeves_embed_agent_listener.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from extensions.data.default.iot.jeeves_agent_listener import JeevesAgentListenerDataCapture as BaseClass
1+
from extensions.data.default.jeeves.jeeves_agent_listener import JeevesAgentListenerDataCapture as BaseClass
22
from constants import JeevesCt
33

44

0 commit comments

Comments
 (0)