19
19
//! - This might give us more flexabiity in the long run, but initially seems
20
20
//! more complex with more unknowns.
21
21
22
- use crate :: { ClaudeMessage , ClaudeMessageContent , UserInput , claude_transcript:: Transcript , db} ;
22
+ use crate :: {
23
+ ClaudeMessage , ClaudeMessageContent , UserInput , db,
24
+ rules:: { create_claude_assignment_rule, list_claude_assignment_rules} ,
25
+ } ;
23
26
use anyhow:: { Result , bail} ;
24
27
use but_broadcaster:: { Broadcaster , FrontendEvent } ;
25
28
use but_workspace:: StackId ;
26
29
use gitbutler_command_context:: CommandContext ;
27
30
use serde_json:: json;
28
31
use std:: {
29
32
collections:: HashSet ,
30
- io:: { BufRead , BufReader , Read as _} ,
33
+ io:: { BufRead , BufReader , PipeReader , Read as _} ,
31
34
sync:: Arc ,
32
35
} ;
33
- use tokio:: { process:: Command , sync:: Mutex } ;
36
+ use tokio:: {
37
+ process:: { Child , Command } ,
38
+ sync:: Mutex ,
39
+ } ;
34
40
35
41
/// Holds the CC instances. Currently keyed by stackId, since our current model
36
42
/// assumes one CC per stack at any given time.
@@ -50,7 +56,7 @@ impl Claudes {
50
56
51
57
pub async fn send_message (
52
58
& self ,
53
- ctx : Mutex < CommandContext > ,
59
+ ctx : Arc < Mutex < CommandContext > > ,
54
60
broadcaster : Arc < tokio:: sync:: Mutex < Broadcaster > > ,
55
61
stack_id : StackId ,
56
62
message : & str ,
@@ -70,120 +76,71 @@ impl Claudes {
70
76
ctx : & mut CommandContext ,
71
77
stack_id : StackId ,
72
78
) -> Result < Vec < ClaudeMessage > > {
73
- let messages = db:: list_messages_by_session ( ctx, stack_id. into ( ) ) ?;
74
- Ok ( messages)
79
+ let rule = list_claude_assignment_rules ( ctx) ?
80
+ . into_iter ( )
81
+ . find ( |rule| rule. stack_id == stack_id) ;
82
+ if let Some ( rule) = rule {
83
+ let messages = db:: list_messages_by_session ( ctx, rule. session_id ) ?;
84
+ Ok ( messages)
85
+ } else {
86
+ Ok ( vec ! [ ] )
87
+ }
75
88
}
76
89
77
90
async fn spawn_claude (
78
91
& self ,
79
- ctx : Mutex < CommandContext > ,
92
+ ctx : Arc < Mutex < CommandContext > > ,
80
93
broadcaster : Arc < tokio:: sync:: Mutex < Broadcaster > > ,
81
94
stack_id : StackId ,
82
95
message : String ,
83
96
) -> Result < ( ) > {
84
97
self . requests . lock ( ) . await . insert ( stack_id) ;
85
98
86
- // Clone so the reference to ctx can be immediatly dropped
87
- let project = ctx. lock ( ) . await . project ( ) . clone ( ) ;
88
-
89
99
// We're also making the bold assumption that if we can find the
90
100
// transcript, that a session was created. This is _not_ the best
91
101
// way to do this.
92
102
//
93
103
// https://github.com/anthropics/claude-code/issues/5161 could
94
104
// simplify this
95
- let transcript_path = Transcript :: get_transcript_path ( & project. path , stack_id. into ( ) ) ?;
96
-
97
- let create_new = !transcript_path. try_exists ( ) ?;
98
-
99
- let ( read_stdout, writer) = std:: io:: pipe ( ) ?;
100
- let ( mut read_stderr, write_stderr) = std:: io:: pipe ( ) ?;
101
- let broadcaster = broadcaster. clone ( ) ;
102
-
103
- // Currently the stack_id is used as the initial "stable" identifier.
104
- let session_id: uuid:: Uuid = stack_id. into ( ) ;
105
- let project_id = project. id ;
106
-
107
- let session = {
105
+ let rule = {
108
106
let mut ctx = ctx. lock ( ) . await ;
109
- let session = if let Some ( session) = db:: get_session_by_id ( & mut ctx, session_id) ? {
110
- session
111
- } else {
112
- db:: save_new_session ( & mut ctx, session_id) ?
113
- } ;
114
-
115
- // Before we save the first line, we want to append the user's side
116
- let message = db:: save_new_message (
117
- & mut ctx,
118
- stack_id. into ( ) ,
119
- ClaudeMessageContent :: UserInput ( UserInput {
120
- message : message. clone ( ) ,
121
- } ) ,
122
- ) ?;
123
-
124
- broadcaster. lock ( ) . await . send ( FrontendEvent {
125
- name : format ! ( "project://{project_id}/claude/{stack_id}/message_recieved" ) ,
126
- payload : json ! ( message) ,
127
- } ) ;
128
-
129
- session
107
+ list_claude_assignment_rules ( & mut ctx) ?
108
+ . into_iter ( )
109
+ . find ( |rule| rule. stack_id == stack_id)
130
110
} ;
131
111
132
- let response_streamer = tokio:: spawn ( async move {
133
- let reader = BufReader :: new ( read_stdout) ;
134
- let mut first = true ;
135
- for line in reader. lines ( ) {
136
- let mut ctx = ctx. lock ( ) . await ;
137
- let line = line. unwrap ( ) ;
138
- let parsed_event: serde_json:: Value = serde_json:: from_str ( & line) . unwrap ( ) ;
139
-
140
- if first {
141
- let current_session_id = parsed_event[ "session_id" ]
142
- . as_str ( )
143
- . unwrap ( )
144
- . parse ( )
145
- . unwrap ( ) ;
146
- let session = db:: get_session_by_id ( & mut ctx, session_id) . unwrap ( ) ;
147
- if session. is_some ( ) {
148
- db:: set_session_current_id ( & mut ctx, session_id, current_session_id)
149
- . unwrap ( ) ;
150
- }
151
- first = false ;
152
- }
112
+ let create_new = rule. is_none ( ) ;
113
+ let session_id = rule. map ( |r| r. session_id ) . unwrap_or ( uuid:: Uuid :: new_v4 ( ) ) ;
153
114
154
- let message_content = ClaudeMessageContent :: ClaudeOutput ( parsed_event. clone ( ) ) ;
155
- let message =
156
- db:: save_new_message ( & mut ctx, stack_id. into ( ) , message_content. clone ( ) )
157
- . unwrap ( ) ;
115
+ let broadcaster = broadcaster. clone ( ) ;
158
116
159
- broadcaster. lock ( ) . await . send ( FrontendEvent {
160
- name : format ! ( "project://{project_id}/claude/{stack_id}/message_recieved" ) ,
161
- payload : json ! ( message) ,
162
- } )
163
- }
164
- } ) ;
165
-
166
- let project_path = project. path . clone ( ) ;
167
-
168
- let mut command = Command :: new ( "claude" ) ;
169
- command. stdout ( writer) ;
170
- command. stderr ( write_stderr) ;
171
- command. current_dir ( & project_path) ;
172
- command. args ( [
173
- "-p" ,
174
- "--output-format=stream-json" ,
175
- "--verbose" ,
176
- "--dangerously-skip-permissions" ,
177
- ] ) ;
178
- if create_new {
179
- command. arg ( format ! ( "--session-id={stack_id}" ) ) ;
180
- } else {
181
- command. arg ( format ! ( "--resume={}" , session. current_id) ) ;
182
- }
183
- command. arg ( message) ;
117
+ let session = upsert_session ( ctx. clone ( ) , session_id, stack_id) . await ?;
118
+ create_user_message (
119
+ ctx. clone ( ) ,
120
+ broadcaster. clone ( ) ,
121
+ session_id,
122
+ stack_id,
123
+ & message,
124
+ )
125
+ . await ?;
126
+ let ( read_stdout, writer) = std:: io:: pipe ( ) ?;
127
+ let response_streamer =
128
+ spawn_response_streaming ( ctx. clone ( ) , broadcaster, read_stdout, session_id, stack_id) ;
184
129
185
- let mut handle = command. spawn ( ) . unwrap ( ) ;
130
+ let ( mut read_stderr, write_stderr) = std:: io:: pipe ( ) ?;
131
+ // Clone so the reference to ctx can be immediatly dropped
132
+ let project = ctx. lock ( ) . await . project ( ) . clone ( ) ;
133
+ let mut handle = spawn_command (
134
+ message,
135
+ create_new,
136
+ writer,
137
+ write_stderr,
138
+ session,
139
+ project. path . clone ( ) ,
140
+ ) ?;
186
141
let exit_status = handle. wait ( ) . await ?;
142
+ // My understanding is that it is not great to abort things like this,
143
+ // but it's "good enough" for now.
187
144
response_streamer. abort ( ) ;
188
145
189
146
self . requests . lock ( ) . await . remove ( & stack_id) ;
@@ -202,6 +159,121 @@ impl Claudes {
202
159
}
203
160
}
204
161
162
+ /// Spawns the actual claude code command
163
+ fn spawn_command (
164
+ message : String ,
165
+ create_new : bool ,
166
+ writer : std:: io:: PipeWriter ,
167
+ write_stderr : std:: io:: PipeWriter ,
168
+ session : crate :: ClaudeSession ,
169
+ project_path : std:: path:: PathBuf ,
170
+ ) -> Result < Child > {
171
+ let mut command = Command :: new ( "claude" ) ;
172
+ command. stdout ( writer) ;
173
+ command. stderr ( write_stderr) ;
174
+ command. current_dir ( & project_path) ;
175
+ command. args ( [
176
+ "-p" ,
177
+ "--output-format=stream-json" ,
178
+ "--verbose" ,
179
+ "--dangerously-skip-permissions" ,
180
+ ] ) ;
181
+ if create_new {
182
+ command. arg ( format ! ( "--session-id={}" , session. id) ) ;
183
+ } else {
184
+ command. arg ( format ! ( "--resume={}" , session. current_id) ) ;
185
+ }
186
+ command. arg ( message) ;
187
+ Ok ( command. spawn ( ) ?)
188
+ }
189
+
190
+ /// Creates the user's message, and adds it to the database & streams it back to
191
+ /// the client.
192
+ async fn create_user_message (
193
+ ctx : Arc < Mutex < CommandContext > > ,
194
+ broadcaster : Arc < Mutex < Broadcaster > > ,
195
+ session_id : uuid:: Uuid ,
196
+ stack_id : StackId ,
197
+ message : & str ,
198
+ ) -> Result < ( ) > {
199
+ let mut ctx = ctx. lock ( ) . await ;
200
+ let message = db:: save_new_message (
201
+ & mut ctx,
202
+ session_id,
203
+ ClaudeMessageContent :: UserInput ( UserInput {
204
+ message : message. to_owned ( ) ,
205
+ } ) ,
206
+ ) ?;
207
+ let project_id = ctx. project ( ) . id ;
208
+ broadcaster. lock ( ) . await . send ( FrontendEvent {
209
+ name : format ! ( "project://{project_id}/claude/{stack_id}/message_recieved" ) ,
210
+ payload : json ! ( message) ,
211
+ } ) ;
212
+ Ok ( ( ) )
213
+ }
214
+
215
+ /// If a session exists, it just returns it, otherwise it creates a new session
216
+ /// and makes a cooresponding rule
217
+ async fn upsert_session (
218
+ ctx : Arc < Mutex < CommandContext > > ,
219
+ session_id : uuid:: Uuid ,
220
+ stack_id : StackId ,
221
+ ) -> Result < crate :: ClaudeSession > {
222
+ let mut ctx = ctx. lock ( ) . await ;
223
+ let session = if let Some ( session) = db:: get_session_by_id ( & mut ctx, session_id) ? {
224
+ session
225
+ } else {
226
+ let session = db:: save_new_session ( & mut ctx, session_id) ?;
227
+ create_claude_assignment_rule ( & mut ctx, session_id, stack_id) ?;
228
+ session
229
+ } ;
230
+ Ok ( session)
231
+ }
232
+
233
+ /// Spawns the thread that manages reading the CC stdout and saves the events to
234
+ /// the db and streams them to the client.
235
+ fn spawn_response_streaming (
236
+ ctx : Arc < Mutex < CommandContext > > ,
237
+ broadcaster : Arc < Mutex < Broadcaster > > ,
238
+ read_stdout : PipeReader ,
239
+ session_id : uuid:: Uuid ,
240
+ stack_id : StackId ,
241
+ ) -> tokio:: task:: JoinHandle < ( ) > {
242
+ tokio:: spawn ( async move {
243
+ let reader = BufReader :: new ( read_stdout) ;
244
+ let mut first = true ;
245
+ for line in reader. lines ( ) {
246
+ let mut ctx = ctx. lock ( ) . await ;
247
+ let line = line. unwrap ( ) ;
248
+ let parsed_event: serde_json:: Value = serde_json:: from_str ( & line) . unwrap ( ) ;
249
+
250
+ if first {
251
+ let current_session_id = parsed_event[ "session_id" ]
252
+ . as_str ( )
253
+ . unwrap ( )
254
+ . parse ( )
255
+ . unwrap ( ) ;
256
+ let session = db:: get_session_by_id ( & mut ctx, session_id) . unwrap ( ) ;
257
+ if session. is_some ( ) {
258
+ db:: set_session_current_id ( & mut ctx, session_id, current_session_id) . unwrap ( ) ;
259
+ }
260
+ first = false ;
261
+ }
262
+
263
+ let message_content = ClaudeMessageContent :: ClaudeOutput ( parsed_event. clone ( ) ) ;
264
+ let message =
265
+ db:: save_new_message ( & mut ctx, session_id, message_content. clone ( ) ) . unwrap ( ) ;
266
+
267
+ let project_id = ctx. project ( ) . id ;
268
+
269
+ broadcaster. lock ( ) . await . send ( FrontendEvent {
270
+ name : format ! ( "project://{project_id}/claude/{stack_id}/message_recieved" ) ,
271
+ payload : json ! ( message) ,
272
+ } )
273
+ }
274
+ } )
275
+ }
276
+
205
277
impl Default for Claudes {
206
278
fn default ( ) -> Self {
207
279
Self :: new ( )
0 commit comments