-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreflection.py
More file actions
150 lines (125 loc) · 5.78 KB
/
reflection.py
File metadata and controls
150 lines (125 loc) · 5.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import re
from typing import List
from pydantic import BaseModel, Field
class ReflectionEntry(BaseModel):
id: str
type: str # 'str', 'mis', or 'dec'
helpful: int = 0
harmful: int = 0
content: str
class ReflectionResult(BaseModel):
entries: List[ReflectionEntry] = Field(default_factory=list)
class ReflectionEngine:
"""
Extracts learnings from agent output using structured patterns.
[str-XXX] helpful=X harmful=Y :: <strategy>
[mis-XXX] helpful=X harmful=Y :: <pitfall>
[dec-XXX] :: <decision>
"""
# Regex patterns for the three types of reflections
STR_PATTERN = r"\[str-(\w+)\]\s+helpful=(\d+)\s+harmful=(\d+)\s+::\s+(.*)"
MIS_PATTERN = r"\[mis-(\w+)\]\s+helpful=(\d+)\s+harmful=(\d+)\s+::\s+(.*)"
DEC_PATTERN = r"\[dec-(\w+)\]\s+::\s+(.*)"
def parse_output(self, text: str) -> ReflectionResult:
"""Parse structured reflection output into a list of update dictionaries."""
result = ReflectionResult()
# Find all matches for strategies
for match in re.finditer(self.STR_PATTERN, text):
result.entries.append(ReflectionEntry(
id=match.group(1),
type="str",
helpful=int(match.group(2)),
harmful=int(match.group(3)),
content=match.group(4).strip()
))
# Find all matches for pitfalls
for match in re.finditer(self.MIS_PATTERN, text):
result.entries.append(ReflectionEntry(
id=match.group(1),
type="mis",
helpful=int(match.group(2)),
harmful=int(match.group(3)),
content=match.group(4).strip()
))
# Find all matches for decisions
for match in re.finditer(self.DEC_PATTERN, text):
result.entries.append(ReflectionEntry(
id=match.group(1),
type="dec",
content=match.group(2).strip()
))
return result
class PlaybookUpdater:
"""
Updates .mdc files with new reflections while preserving structure.
"""
def __init__(self, playbook_path: str):
self.playbook_path = playbook_path
def update(self, reflections: ReflectionResult):
"""Updates .mdc files with new reflections while preserving structure."""
if not reflections.entries:
return
with open(self.playbook_path, "r") as f:
content = f.read()
new_content = content
for entry in reflections.entries:
if entry.type == "str":
new_content = self._update_section(new_content, "## Strategier & patterns", entry)
elif entry.type == "mis":
new_content = self._update_section(new_content, "## Kända fallgropar", entry)
elif entry.type == "dec":
new_content = self._update_section(new_content, "## Arkitekturella beslut", entry)
if new_content != content:
with open(self.playbook_path, "w") as f:
f.write(new_content)
def _update_section(
self,
content: str,
section_header: str,
entry: ReflectionEntry
) -> str:
"""Update a section with a new or existing entry."""
# Find the section
section_start = content.find(section_header)
if section_start == -1:
# Section doesn't exist, append it to the end
content = content.rstrip() + f"\n\n{section_header}\n"
section_start = content.find(section_header)
# Find the end of the section (next header or end of file)
next_section = content.find("\n## ", section_start + len(section_header))
if next_section == -1:
section_content = content[section_start:]
post_content = ""
else:
section_content = content[section_start:next_section]
post_content = content[next_section:]
# Check if the entry already exists in the section
# Use a more robust check that doesn't rely on the comment markers
entry_id_marker = f"[{entry.type}-{entry.id}]"
if entry_id_marker in section_content:
# Update existing entry
if entry.type == "dec":
new_entry_line = f"<!-- [dec-{entry.id}] :: {entry.content} -->"
old_entry_pattern = rf"<!-- \[dec-{entry.id}\] :: .*? -->"
else:
new_entry_line = f"<!-- [{entry.type}-{entry.id}] helpful={entry.helpful} harmful={entry.harmful} :: {entry.content} -->"
old_entry_pattern = rf"<!-- \[{entry.type}-{entry.id}\] helpful=\d+ harmful=\d+ :: .*? -->"
# Use a more specific replacement that includes the comment markers
# and avoid issues with re.sub and groups if needed.
# We'll use a simple string replace if we can find the exact old line.
match = re.search(old_entry_pattern, section_content)
if match:
# Replace only the first occurrence within the section_content
section_content = section_content[:match.start()] + new_entry_line + section_content[match.end():]
else:
# Add new entry
if entry.type == "dec":
new_entry_line = f"<!-- [dec-{entry.id}] :: {entry.content} -->"
else:
new_entry_line = f"<!-- [{entry.type}-{entry.id}] helpful={entry.helpful} harmful={entry.harmful} :: {entry.content} -->"
# Append at the end of section_content
if section_content.strip().endswith("-->"):
section_content = section_content.rstrip() + f"\n{new_entry_line}\n"
else:
section_content = section_content.rstrip() + f"\n{new_entry_line}\n"
return content[:section_start] + section_content + post_content