Skip to content

Commit 137acfc

Browse files
committed
Refactor using janus queue
Decouple conversation tree from the slack bot Reorganize data structures Add comments
1 parent ecfa975 commit 137acfc

File tree

9 files changed

+364
-1704
lines changed

9 files changed

+364
-1704
lines changed

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ authors = [
1010
{name = "Redis Team", email = "[email protected]"},
1111
]
1212
readme = "README.md"
13-
requires-python = ">=3.8"
13+
requires-python = ">=3.9"
1414
classifiers = [
1515
"Development Status :: 3 - Alpha",
1616
"Intended Audience :: Developers",
@@ -35,6 +35,7 @@ dependencies = [
3535
"openai>=1.0.0",
3636
"PyJWT>=2.8.0",
3737
"cryptography>=41.0.0",
38+
"janus>=2.0,<2.2",
3839
]
3940

4041
[project.optional-dependencies]

src/redis_release/bht/conversation_behaviours.py

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from slack_sdk import WebClient
1010

1111
from ..config import Config
12-
from ..conversation_models import Command
12+
from ..conversation_models import Command, ConversationCockpit
1313
from ..models import ReleaseArgs, ReleaseType
1414
from .behaviours import ReleaseAction
1515
from .conversation_state import ConversationState
@@ -54,7 +54,11 @@ class CommandDetectionResult(BaseModel):
5454

5555
class SimpleCommandClassifier(ReleaseAction):
5656
def __init__(
57-
self, name: str, state: ConversationState, log_prefix: str = ""
57+
self,
58+
name: str,
59+
state: ConversationState,
60+
cockpit: ConversationCockpit,
61+
log_prefix: str = "",
5862
) -> None:
5963
self.state = state
6064
super().__init__(name=name, log_prefix=log_prefix)
@@ -85,12 +89,12 @@ class LLMCommandClassifier(ReleaseAction):
8589
def __init__(
8690
self,
8791
name: str,
88-
llm: OpenAI,
8992
state: ConversationState,
93+
cockpit: ConversationCockpit,
9094
log_prefix: str = "",
9195
confidence_threshold: float = 0.7,
9296
) -> None:
93-
self.llm = llm
97+
self.llm = cockpit.llm
9498
self.state = state
9599
self.confidence_threshold = confidence_threshold
96100
super().__init__(name=name, log_prefix=log_prefix)
@@ -136,6 +140,7 @@ def update(self) -> Status:
136140
)
137141

138142
try:
143+
assert self.llm is not None
139144
# Call LLM with structured outputs
140145
response = self.llm.responses.parse(
141146
model="gpt-4o-2024-08-06",
@@ -253,6 +258,7 @@ def __init__(
253258
self,
254259
name: str,
255260
state: ConversationState,
261+
cockpit: ConversationCockpit,
256262
config: Config,
257263
log_prefix: str = "",
258264
) -> None:
@@ -274,6 +280,18 @@ def update(self) -> Status:
274280
# Get release args
275281
release_args = self.state.release_args
276282

283+
# Check authorization
284+
if (
285+
self.state.authorized_users
286+
and self.state.message
287+
and self.state.message.user not in self.state.authorized_users
288+
):
289+
logger.warning(
290+
f"Unauthorized attempt by user {self.state.message.user}. Authorized users: {self.state.authorized_users}"
291+
)
292+
self.state.reply = "Sorry, you are not authorized to run releases. Please contact an administrator."
293+
return Status.FAILURE
294+
277295
self.logger.info(
278296
f"Starting release for tag {release_args.release_tag} in background thread"
279297
)
@@ -340,7 +358,11 @@ def run_release_in_thread() -> None:
340358

341359
class IsLLMAvailable(ReleaseAction):
342360
def __init__(
343-
self, name: str, state: ConversationState, log_prefix: str = ""
361+
self,
362+
name: str,
363+
state: ConversationState,
364+
cockpit: ConversationCockpit,
365+
log_prefix: str = "",
344366
) -> None:
345367
self.state = state
346368
super().__init__(name=name, log_prefix=log_prefix)
@@ -353,7 +375,11 @@ def update(self) -> Status:
353375

354376
class HasReleaseArgs(ReleaseAction):
355377
def __init__(
356-
self, name: str, state: ConversationState, log_prefix: str = ""
378+
self,
379+
name: str,
380+
state: ConversationState,
381+
cockpit: ConversationCockpit,
382+
log_prefix: str = "",
357383
) -> None:
358384
self.state = state
359385
super().__init__(name=name, log_prefix=log_prefix)
@@ -366,7 +392,11 @@ def update(self) -> Status:
366392

367393
class IsCommandStarted(ReleaseAction):
368394
def __init__(
369-
self, name: str, state: ConversationState, log_prefix: str = ""
395+
self,
396+
name: str,
397+
state: ConversationState,
398+
cockpit: ConversationCockpit,
399+
log_prefix: str = "",
370400
) -> None:
371401
self.state = state
372402
super().__init__(name=name, log_prefix=log_prefix)

src/redis_release/bht/conversation_state.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,10 @@
22

33
from pydantic import BaseModel, Field
44

5-
from ..conversation_models import Command
5+
from ..conversation_models import Command, InboxMessage
66
from ..models import ReleaseArgs, SlackArgs, SlackFormat
77

88

9-
class InboxMessage(BaseModel):
10-
message: str
11-
context: List[str]
12-
13-
149
class ConversationState(BaseModel):
1510
llm_available: bool = False
1611
message: Optional[InboxMessage] = None
@@ -20,3 +15,4 @@ class ConversationState(BaseModel):
2015
reply: Optional[str] = None
2116

2217
slack_args: Optional[SlackArgs] = None
18+
authorized_users: Optional[List[str]] = None
Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1-
from typing import Optional, Tuple
1+
import logging
2+
from typing import List, Optional, Tuple
23

4+
from janus import SyncQueue
35
from openai import OpenAI
46
from py_trees.behaviour import Behaviour
57
from py_trees.trees import BehaviourTree
68
from py_trees.visitors import SnapshotVisitor
79

810
from ..config import Config, load_config
9-
from ..conversation_models import ConversationArgs
11+
from ..conversation_models import ConversationArgs, ConversationCockpit, InboxMessage
1012
from ..models import SlackArgs
1113
from .backchain import create_PPA, latch_chain_to_chain
1214
from .conversation_behaviours import (
@@ -16,41 +18,45 @@
1618
RunCommand,
1719
SimpleCommandClassifier,
1820
)
19-
from .conversation_state import ConversationState, InboxMessage
21+
from .conversation_state import ConversationState
2022
from .tree import log_tree_state_with_markup
2123

24+
logger = logging.getLogger(__name__)
25+
2226

2327
def create_conversation_root_node(
2428
input: InboxMessage,
2529
config: Config,
26-
llm: Optional[OpenAI] = None,
30+
cockpit: ConversationCockpit,
2731
slack_args: Optional[SlackArgs] = None,
32+
authorized_users: Optional[List[str]] = None,
2833
) -> Tuple[Behaviour, ConversationState]:
2934
state = ConversationState(
30-
llm_available=llm is not None,
35+
llm_available=cockpit.llm is not None,
3136
message=input,
3237
slack_args=slack_args,
38+
authorized_users=authorized_users,
3339
)
3440
state.message = input
3541

3642
# Use LLM classifier if available, otherwise use simple classifier
37-
if llm is not None:
43+
if cockpit.llm is not None:
3844
command_detector = create_PPA(
3945
"LLM Command Detector",
40-
LLMCommandClassifier("LLM Command Detector", llm, state),
41-
HasReleaseArgs("Has Release Args", state),
46+
LLMCommandClassifier("LLM Command Detector", state, cockpit),
47+
HasReleaseArgs("Has Release Args", state, cockpit),
4248
)
4349
else:
4450
command_detector = create_PPA(
4551
"Simple Command Detector",
46-
SimpleCommandClassifier("Simple Command Classifier", state),
47-
HasReleaseArgs("Has Release Args", state),
52+
SimpleCommandClassifier("Simple Command Classifier", state, cockpit),
53+
HasReleaseArgs("Has Release Args", state, cockpit),
4854
)
4955

5056
run_command = create_PPA(
5157
"Run",
52-
RunCommand("Run Command", state, config),
53-
IsCommandStarted("Is Command Started", state),
58+
RunCommand("Run Command", state, cockpit, config),
59+
IsCommandStarted("Is Command Started", state, cockpit),
5460
)
5561

5662
latch_chain_to_chain(run_command, command_detector)
@@ -61,6 +67,7 @@ def create_conversation_root_node(
6167

6268
def initialize_conversation_tree(
6369
args: ConversationArgs,
70+
reply_queue: Optional[SyncQueue] = None,
6471
) -> Tuple[BehaviourTree, ConversationState]:
6572

6673
# Load config
@@ -70,11 +77,19 @@ def initialize_conversation_tree(
7077
if args.openai_api_key:
7178
llm = OpenAI(api_key=args.openai_api_key)
7279

80+
cockpit = ConversationCockpit()
81+
cockpit.llm = llm
82+
cockpit.reply_queue = reply_queue
83+
84+
if not args.inbox:
85+
raise ValueError("Inbox message is required")
86+
7387
root, state = create_conversation_root_node(
74-
InboxMessage(message=args.message, context=args.context or []),
88+
args.inbox,
7589
config=config,
76-
llm=llm,
90+
cockpit=cockpit,
7791
slack_args=args.slack_args,
92+
authorized_users=args.authorized_users,
7893
)
7994
tree = BehaviourTree(root)
8095
snapshot_visitor = SnapshotVisitor()
@@ -83,8 +98,24 @@ def initialize_conversation_tree(
8398
return tree, state
8499

85100

86-
def run_conversation_tree(tree: BehaviourTree) -> None:
101+
def run_conversation_tree(
102+
tree: BehaviourTree, state: ConversationState, reply_queue: SyncQueue
103+
) -> None:
87104
"""Abstacting away tree run
88105
Currently it's just a single tick, but it may change in future
89106
"""
90-
tree.tick()
107+
try:
108+
tree.tick()
109+
try:
110+
if state.reply:
111+
reply_queue.put(state.reply)
112+
except Exception as e:
113+
logger.error(f"Error putting reply to queue: {e}", exc_info=True)
114+
except Exception as e:
115+
try:
116+
reply_queue.put(f"Error running conversation tree: {str(e)}")
117+
except Exception as e:
118+
logger.error(f"Error putting error reply to queue: {e}", exc_info=True)
119+
finally:
120+
logger.debug("Shutting down reply queue")
121+
reply_queue.shutdown(immediate=False)

src/redis_release/cli.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
)
1919
from .bht.tree import TreeInspector, async_tick_tock, initialize_tree_and_state
2020
from .config import load_config
21-
from .conversation_models import ConversationArgs
21+
from .conversation_models import ConversationArgs, InboxMessage
2222
from .github_app_auth import GitHubAppAuth, load_private_key_from_file
2323
from .github_client_async import GitHubClientAsync
2424
from .logging_config import setup_logging
@@ -140,7 +140,9 @@ def release_print(
140140
def conversation_print() -> None:
141141
setup_logging()
142142
tree, state = initialize_conversation_tree(
143-
ConversationArgs(message="test", openai_api_key="dummy")
143+
ConversationArgs(
144+
inbox=InboxMessage(message="test", context=[]), openai_api_key="dummy"
145+
)
144146
)
145147
render_dot_tree(tree.root)
146148
print(unicode_tree(tree.root))
@@ -168,9 +170,11 @@ def conversation(
168170
openai_api_key = os.getenv("OPENAI_API_KEY")
169171

170172
args = ConversationArgs(
171-
message=message, openai_api_key=openai_api_key, config_path=config
173+
inbox=InboxMessage(message=message, context=[]),
174+
openai_api_key=openai_api_key,
175+
config_path=config,
172176
)
173-
tree = initialize_conversation_tree(args)
177+
tree, _ = initialize_conversation_tree(args)
174178
tree.tick()
175179
print(unicode_tree(tree.root))
176180

src/redis_release/conversation_models.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from enum import Enum
22
from typing import List, Optional
33

4+
from janus import SyncQueue
5+
from openai import OpenAI
46
from pydantic import BaseModel
57

68
from .models import SlackArgs
@@ -15,10 +17,20 @@ class Command(str, Enum):
1517
HELP = "help"
1618

1719

18-
class ConversationArgs(BaseModel):
19-
openai_api_key: Optional[str] = None
20+
class InboxMessage(BaseModel):
2021
message: str
21-
context: Optional[List[str]] = None
22-
config_path: Optional[str] = None
22+
context: List[str]
23+
user: Optional[str] = None
2324

25+
26+
class ConversationArgs(BaseModel):
27+
inbox: Optional[InboxMessage]
28+
config_path: Optional[str] = None
2429
slack_args: Optional[SlackArgs] = None
30+
openai_api_key: Optional[str] = None
31+
authorized_users: Optional[List[str]] = None
32+
33+
34+
class ConversationCockpit:
35+
llm: Optional[OpenAI] = None
36+
reply_queue: Optional[SyncQueue] = None

src/redis_release/github_client_async.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ async def trigger_workflow(
242242
logger.debug(f"[blue]Triggering workflow[/blue] {workflow_file} in {repo}")
243243
logger.debug(f"Inputs: {inputs}")
244244
logger.debug(f"Ref: {ref}")
245-
logger.debug(f"Workflow UUID: [cyan]{inputs['workflow_uuid']}[/cyan]")
245+
if "workflow_uuid" in inputs:
246+
logger.debug(f"Workflow UUID: [cyan]{inputs['workflow_uuid']}[/cyan]")
246247

247248
url = f"https://api.github.com/repos/{repo}/actions/workflows/{workflow_file}/dispatches"
248249
headers = {

0 commit comments

Comments
 (0)