forked from Doriandarko/deepseek-engineer
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdeepseek-eng.py
More file actions
889 lines (771 loc) · 37.1 KB
/
deepseek-eng.py
File metadata and controls
889 lines (771 loc) · 37.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
#!/usr/bin/env python3
import os
import sys
import json
from pathlib import Path
from textwrap import dedent
from typing import List, Dict, Any, Optional, Union
from openai import OpenAI
from pydantic import BaseModel
from dotenv import load_dotenv
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.style import Style
from prompt_toolkit import PromptSession
from prompt_toolkit.styles import Style as PromptStyle
import time
# Initialize Rich console and prompt session
console = Console()
prompt_session = PromptSession(
style=PromptStyle.from_dict({
'prompt': '#0066ff bold', # Bright blue prompt
'completion-menu.completion': 'bg:#1e3a8a fg:#ffffff',
'completion-menu.completion.current': 'bg:#3b82f6 fg:#ffffff bold',
})
)
# --------------------------------------------------------------------------------
# 1. LLM Provider Configuration
# --------------------------------------------------------------------------------
load_dotenv() # Load environment variables from .env file
class LLMProvider:
"""Base class for LLM providers"""
def __init__(self, name: str, client: Any, model: str, supports_tools: bool = True, supports_reasoning: bool = False):
self.name = name
self.client = client
self.model = model
self.supports_tools = supports_tools
self.supports_reasoning = supports_reasoning
# Available LLM providers
PROVIDERS = {}
# DeepSeek (default)
if os.getenv("DEEPSEEK_API_KEY"):
PROVIDERS["deepseek"] = LLMProvider(
name="DeepSeek",
client=OpenAI(api_key=os.getenv("DEEPSEEK_API_KEY"), base_url="https://api.deepseek.com"),
model="deepseek-reasoner",
supports_tools=True,
supports_reasoning=True
)
# OpenAI
if os.getenv("OPENAI_API_KEY"):
PROVIDERS["openai"] = LLMProvider(
name="OpenAI",
client=OpenAI(api_key=os.getenv("OPENAI_API_KEY")),
model="gpt-4o",
supports_tools=True,
supports_reasoning=False
)
# Anthropic Claude (via OpenAI-compatible API)
if os.getenv("ANTHROPIC_API_KEY"):
try:
import anthropic
PROVIDERS["claude"] = LLMProvider(
name="Claude",
client=anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY")),
model="claude-3-5-sonnet-20241022",
supports_tools=True,
supports_reasoning=False
)
except ImportError:
console.print("[yellow]Warning: anthropic package not installed. Claude support disabled.[/yellow]")
# Google Gemini
if os.getenv("GOOGLE_API_KEY"):
try:
import google.generativeai as genai
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
PROVIDERS["gemini"] = LLMProvider(
name="Gemini",
client=genai.GenerativeModel('gemini-1.5-pro'),
model="gemini-1.5-pro",
supports_tools=True,
supports_reasoning=False
)
except ImportError:
console.print("[yellow]Warning: google-generativeai package not installed. Gemini support disabled.[/yellow]")
# Ollama (local)
if os.getenv("OLLAMA_BASE_URL") or os.path.exists("/usr/local/bin/ollama"):
PROVIDERS["ollama"] = LLMProvider(
name="Ollama",
client=OpenAI(
api_key="ollama", # Ollama doesn't require API key
base_url=os.getenv("OLLAMA_BASE_URL", "http://localhost:11434/v1")
),
model=os.getenv("OLLAMA_MODEL", "llama3.2"),
supports_tools=True,
supports_reasoning=False
)
# Select current provider
current_provider_key = os.getenv("LLM_PROVIDER", "deepseek")
if current_provider_key not in PROVIDERS:
# Fallback to first available provider
if PROVIDERS:
current_provider_key = list(PROVIDERS.keys())[0]
else:
console.print("[bold red]❌ No LLM providers configured! Please set up API keys in .env file.[/bold red]")
sys.exit(1)
current_provider = PROVIDERS[current_provider_key]
client = current_provider.client
# --------------------------------------------------------------------------------
# 2. Define our schema using Pydantic for type safety
# --------------------------------------------------------------------------------
class FileToCreate(BaseModel):
path: str
content: str
class FileToEdit(BaseModel):
path: str
original_snippet: str
new_snippet: str
# Remove AssistantResponse as we're using function calling now
# --------------------------------------------------------------------------------
# 2.1. Define Function Calling Tools
# --------------------------------------------------------------------------------
tools = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read the content of a single file from the filesystem",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The path to the file to read (relative or absolute)",
}
},
"required": ["file_path"]
},
}
},
{
"type": "function",
"function": {
"name": "read_multiple_files",
"description": "Read the content of multiple files from the filesystem",
"parameters": {
"type": "object",
"properties": {
"file_paths": {
"type": "array",
"items": {"type": "string"},
"description": "Array of file paths to read (relative or absolute)",
}
},
"required": ["file_paths"]
},
}
},
{
"type": "function",
"function": {
"name": "create_file",
"description": "Create a new file or overwrite an existing file with the provided content",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The path where the file should be created",
},
"content": {
"type": "string",
"description": "The content to write to the file",
}
},
"required": ["file_path", "content"]
},
}
},
{
"type": "function",
"function": {
"name": "create_multiple_files",
"description": "Create multiple files at once",
"parameters": {
"type": "object",
"properties": {
"files": {
"type": "array",
"items": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"}
},
"required": ["path", "content"]
},
"description": "Array of files to create with their paths and content",
}
},
"required": ["files"]
},
}
},
{
"type": "function",
"function": {
"name": "edit_file",
"description": "Edit an existing file by replacing a specific snippet with new content",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The path to the file to edit",
},
"original_snippet": {
"type": "string",
"description": "The exact text snippet to find and replace",
},
"new_snippet": {
"type": "string",
"description": "The new text to replace the original snippet with",
}
},
"required": ["file_path", "original_snippet", "new_snippet"]
},
}
}
]
# --------------------------------------------------------------------------------
# 3. system prompt
# --------------------------------------------------------------------------------
system_PROMPT = dedent(f"""\
You are an elite software engineer called Multi-LLM Engineer with decades of experience across all programming domains.
Your expertise spans system design, algorithms, testing, and best practices.
You provide thoughtful, well-structured solutions while explaining your reasoning.
Current LLM Provider: {current_provider.name} ({current_provider.model})
Reasoning Support: {'Yes' if current_provider.supports_reasoning else 'No'}
Function Calling: {'Yes' if current_provider.supports_tools else 'No'}
Core capabilities:
1. Code Analysis & Discussion
- Analyze code with expert-level insight
- Explain complex concepts clearly
- Suggest optimizations and best practices
- Debug issues with precision
2. File Operations (via function calls):
- read_file: Read a single file's content
- read_multiple_files: Read multiple files at once
- create_file: Create or overwrite a single file
- create_multiple_files: Create multiple files at once
- edit_file: Make precise edits to existing files using snippet replacement
Guidelines:
1. Provide natural, conversational responses explaining your reasoning
2. Use function calls when you need to read or modify files
3. For file operations:
- Always read files first before editing them to understand the context
- Use precise snippet matching for edits
- Explain what changes you're making and why
- Consider the impact of changes on the overall codebase
4. Follow language-specific best practices
5. Suggest tests or validation steps when appropriate
6. Be thorough in your analysis and recommendations
IMPORTANT: In your thinking process, if you realize that something requires a tool call, cut your thinking short and proceed directly to the tool call. Don't overthink - act efficiently when file operations are needed.
Remember: You're a senior engineer - be thoughtful, precise, and explain your reasoning clearly.
""")
# --------------------------------------------------------------------------------
# 4. Helper functions
# --------------------------------------------------------------------------------
def read_local_file(file_path: str) -> str:
"""Return the text content of a local file."""
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
def create_file(path: str, content: str):
"""Create (or overwrite) a file at 'path' with the given 'content'."""
file_path = Path(path)
# Security checks
if any(part.startswith('~') for part in file_path.parts):
raise ValueError("Home directory references not allowed")
normalized_path = normalize_path(str(file_path))
# Validate reasonable file size for operations
if len(content) > 5_000_000: # 5MB limit
raise ValueError("File content exceeds 5MB size limit")
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
console.print(f"[bold blue]✓[/bold blue] Created/updated file at '[bright_cyan]{file_path}[/bright_cyan]'")
def show_diff_table(files_to_edit: List[FileToEdit]) -> None:
if not files_to_edit:
return
table = Table(title="📝 Proposed Edits", show_header=True, header_style="bold bright_blue", show_lines=True, border_style="blue")
table.add_column("File Path", style="bright_cyan", no_wrap=True)
table.add_column("Original", style="red dim")
table.add_column("New", style="bright_green")
for edit in files_to_edit:
table.add_row(edit.path, edit.original_snippet, edit.new_snippet)
console.print(table)
def apply_diff_edit(path: str, original_snippet: str, new_snippet: str):
"""Reads the file at 'path', replaces the first occurrence of 'original_snippet' with 'new_snippet', then overwrites."""
try:
content = read_local_file(path)
# Verify we're replacing the exact intended occurrence
occurrences = content.count(original_snippet)
if occurrences == 0:
raise ValueError("Original snippet not found")
if occurrences > 1:
console.print(f"[bold yellow]⚠ Multiple matches ({occurrences}) found - requiring line numbers for safety[/bold yellow]")
console.print("[dim]Use format:\n--- original.py (lines X-Y)\n+++ modified.py[/dim]")
raise ValueError(f"Ambiguous edit: {occurrences} matches")
updated_content = content.replace(original_snippet, new_snippet, 1)
create_file(path, updated_content)
console.print(f"[bold blue]✓[/bold blue] Applied diff edit to '[bright_cyan]{path}[/bright_cyan]'")
except FileNotFoundError:
console.print(f"[bold red]✗[/bold red] File not found for diff editing: '[bright_cyan]{path}[/bright_cyan]'")
except ValueError as e:
console.print(f"[bold yellow]⚠[/bold yellow] {str(e)} in '[bright_cyan]{path}[/bright_cyan]'. No changes made.")
console.print("\n[bold blue]Expected snippet:[/bold blue]")
console.print(Panel(original_snippet, title="Expected", border_style="blue", title_align="left"))
console.print("\n[bold blue]Actual file content:[/bold blue]")
console.print(Panel(content, title="Actual", border_style="yellow", title_align="left"))
def try_handle_add_command(user_input: str) -> bool:
prefix = "/add "
if user_input.strip().lower().startswith(prefix):
path_to_add = user_input[len(prefix):].strip()
try:
normalized_path = normalize_path(path_to_add)
if os.path.isdir(normalized_path):
# Handle entire directory
add_directory_to_conversation(normalized_path)
else:
# Handle a single file as before
content = read_local_file(normalized_path)
conversation_history.append({
"role": "system",
"content": f"Content of file '{normalized_path}':\n\n{content}"
})
console.print(f"[bold blue]✓[/bold blue] Added file '[bright_cyan]{normalized_path}[/bright_cyan]' to conversation.\n")
except OSError as e:
console.print(f"[bold red]✗[/bold red] Could not add path '[bright_cyan]{path_to_add}[/bright_cyan]': {e}\n")
return True
return False
def add_directory_to_conversation(directory_path: str):
with console.status("[bold bright_blue]🔍 Scanning directory...[/bold bright_blue]") as status:
excluded_files = {
# Python specific
".DS_Store", "Thumbs.db", ".gitignore", ".python-version",
"uv.lock", ".uv", "uvenv", ".uvenv", ".venv", "venv",
"__pycache__", ".pytest_cache", ".coverage", ".mypy_cache",
# Node.js / Web specific
"node_modules", "package-lock.json", "yarn.lock", "pnpm-lock.yaml",
".next", ".nuxt", "dist", "build", ".cache", ".parcel-cache",
".turbo", ".vercel", ".output", ".contentlayer",
# Build outputs
"out", "coverage", ".nyc_output", "storybook-static",
# Environment and config
".env", ".env.local", ".env.development", ".env.production",
# Misc
".git", ".svn", ".hg", "CVS"
}
excluded_extensions = {
# Binary and media files
".png", ".jpg", ".jpeg", ".gif", ".ico", ".svg", ".webp", ".avif",
".mp4", ".webm", ".mov", ".mp3", ".wav", ".ogg",
".zip", ".tar", ".gz", ".7z", ".rar",
".exe", ".dll", ".so", ".dylib", ".bin",
# Documents
".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx",
# Python specific
".pyc", ".pyo", ".pyd", ".egg", ".whl",
# UV specific
".uv", ".uvenv",
# Database and logs
".db", ".sqlite", ".sqlite3", ".log",
# IDE specific
".idea", ".vscode",
# Web specific
".map", ".chunk.js", ".chunk.css",
".min.js", ".min.css", ".bundle.js", ".bundle.css",
# Cache and temp files
".cache", ".tmp", ".temp",
# Font files
".ttf", ".otf", ".woff", ".woff2", ".eot"
}
skipped_files = []
added_files = []
total_files_processed = 0
max_files = 1000 # Reasonable limit for files to process
max_file_size = 5_000_000 # 5MB limit
for root, dirs, files in os.walk(directory_path):
if total_files_processed >= max_files:
console.print(f"[bold yellow]⚠[/bold yellow] Reached maximum file limit ({max_files})")
break
status.update(f"[bold bright_blue]🔍 Scanning {root}...[/bold bright_blue]")
# Skip hidden directories and excluded directories
dirs[:] = [d for d in dirs if not d.startswith('.') and d not in excluded_files]
for file in files:
if total_files_processed >= max_files:
break
if file.startswith('.') or file in excluded_files:
skipped_files.append(os.path.join(root, file))
continue
_, ext = os.path.splitext(file)
if ext.lower() in excluded_extensions:
skipped_files.append(os.path.join(root, file))
continue
full_path = os.path.join(root, file)
try:
# Check file size before processing
if os.path.getsize(full_path) > max_file_size:
skipped_files.append(f"{full_path} (exceeds size limit)")
continue
# Check if it's binary
if is_binary_file(full_path):
skipped_files.append(full_path)
continue
normalized_path = normalize_path(full_path)
content = read_local_file(normalized_path)
conversation_history.append({
"role": "system",
"content": f"Content of file '{normalized_path}':\n\n{content}"
})
added_files.append(normalized_path)
total_files_processed += 1
except OSError:
skipped_files.append(full_path)
console.print(f"[bold blue]✓[/bold blue] Added folder '[bright_cyan]{directory_path}[/bright_cyan]' to conversation.")
if added_files:
console.print(f"\n[bold bright_blue]📁 Added files:[/bold bright_blue] [dim]({len(added_files)} of {total_files_processed})[/dim]")
for f in added_files:
console.print(f" [bright_cyan]📄 {f}[/bright_cyan]")
if skipped_files:
console.print(f"\n[bold yellow]⏭ Skipped files:[/bold yellow] [dim]({len(skipped_files)})[/dim]")
for f in skipped_files[:10]: # Show only first 10 to avoid clutter
console.print(f" [yellow dim]⚠ {f}[/yellow dim]")
if len(skipped_files) > 10:
console.print(f" [dim]... and {len(skipped_files) - 10} more[/dim]")
console.print()
def is_binary_file(file_path: str, peek_size: int = 1024) -> bool:
try:
with open(file_path, 'rb') as f:
chunk = f.read(peek_size)
# If there is a null byte in the sample, treat it as binary
if b'\0' in chunk:
return True
return False
except Exception:
# If we fail to read, just treat it as binary to be safe
return True
def ensure_file_in_context(file_path: str) -> bool:
try:
normalized_path = normalize_path(file_path)
content = read_local_file(normalized_path)
file_marker = f"Content of file '{normalized_path}'"
if not any(file_marker in msg["content"] for msg in conversation_history):
conversation_history.append({
"role": "system",
"content": f"{file_marker}:\n\n{content}"
})
return True
except OSError:
console.print(f"[bold red]✗[/bold red] Could not read file '[bright_cyan]{file_path}[/bright_cyan]' for editing context")
return False
def normalize_path(path_str: str) -> str:
"""Return a canonical, absolute version of the path with security checks."""
path = Path(path_str).resolve()
# Prevent directory traversal attacks
if ".." in path.parts:
raise ValueError(f"Invalid path: {path_str} contains parent directory references")
return str(path)
# --------------------------------------------------------------------------------
# 5. Conversation state
# --------------------------------------------------------------------------------
conversation_history = [
{"role": "system", "content": system_PROMPT}
]
# --------------------------------------------------------------------------------
# 6. OpenAI API interaction with streaming
# --------------------------------------------------------------------------------
def execute_function_call_dict(tool_call_dict) -> str:
"""Execute a function call from a dictionary format and return the result as a string."""
try:
function_name = tool_call_dict["function"]["name"]
arguments = json.loads(tool_call_dict["function"]["arguments"])
if function_name == "read_file":
file_path = arguments["file_path"]
normalized_path = normalize_path(file_path)
content = read_local_file(normalized_path)
return f"Content of file '{normalized_path}':\n\n{content}"
elif function_name == "read_multiple_files":
file_paths = arguments["file_paths"]
results = []
for file_path in file_paths:
try:
normalized_path = normalize_path(file_path)
content = read_local_file(normalized_path)
results.append(f"Content of file '{normalized_path}':\n\n{content}")
except OSError as e:
results.append(f"Error reading '{file_path}': {e}")
return "\n\n" + "="*50 + "\n\n".join(results)
elif function_name == "create_file":
file_path = arguments["file_path"]
content = arguments["content"]
create_file(file_path, content)
return f"Successfully created file '{file_path}'"
elif function_name == "create_multiple_files":
files = arguments["files"]
created_files = []
for file_info in files:
create_file(file_info["path"], file_info["content"])
created_files.append(file_info["path"])
return f"Successfully created {len(created_files)} files: {', '.join(created_files)}"
elif function_name == "edit_file":
file_path = arguments["file_path"]
original_snippet = arguments["original_snippet"]
new_snippet = arguments["new_snippet"]
# Ensure file is in context first
if not ensure_file_in_context(file_path):
return f"Error: Could not read file '{file_path}' for editing"
apply_diff_edit(file_path, original_snippet, new_snippet)
return f"Successfully edited file '{file_path}'"
else:
return f"Unknown function: {function_name}"
except Exception as e:
return f"Error executing {function_name}: {str(e)}"
def execute_function_call(tool_call) -> str:
"""Execute a function call and return the result as a string."""
try:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
if function_name == "read_file":
file_path = arguments["file_path"]
normalized_path = normalize_path(file_path)
content = read_local_file(normalized_path)
return f"Content of file '{normalized_path}':\n\n{content}"
elif function_name == "read_multiple_files":
file_paths = arguments["file_paths"]
results = []
for file_path in file_paths:
try:
normalized_path = normalize_path(file_path)
content = read_local_file(normalized_path)
results.append(f"Content of file '{normalized_path}':\n\n{content}")
except OSError as e:
results.append(f"Error reading '{file_path}': {e}")
return "\n\n" + "="*50 + "\n\n".join(results)
elif function_name == "create_file":
file_path = arguments["file_path"]
content = arguments["content"]
create_file(file_path, content)
return f"Successfully created file '{file_path}'"
elif function_name == "create_multiple_files":
files = arguments["files"]
created_files = []
for file_info in files:
create_file(file_info["path"], file_info["content"])
created_files.append(file_info["path"])
return f"Successfully created {len(created_files)} files: {', '.join(created_files)}"
elif function_name == "edit_file":
file_path = arguments["file_path"]
original_snippet = arguments["original_snippet"]
new_snippet = arguments["new_snippet"]
# Ensure file is in context first
if not ensure_file_in_context(file_path):
return f"Error: Could not read file '{file_path}' for editing"
apply_diff_edit(file_path, original_snippet, new_snippet)
return f"Successfully edited file '{file_path}'"
else:
return f"Unknown function: {function_name}"
except Exception as e:
return f"Error executing {function_name}: {str(e)}"
def trim_conversation_history():
"""Trim conversation history to prevent token limit issues while preserving tool call sequences"""
if len(conversation_history) <= 20: # Don't trim if conversation is still small
return
# Always keep the system prompt
system_msgs = [msg for msg in conversation_history if msg["role"] == "system"]
other_msgs = [msg for msg in conversation_history if msg["role"] != "system"]
# Keep only the last 15 messages to prevent token overflow
if len(other_msgs) > 15:
other_msgs = other_msgs[-15:]
# Rebuild conversation history
conversation_history.clear()
conversation_history.extend(system_msgs + other_msgs)
def stream_openai_response(user_message: str):
# Add the user message to conversation history
conversation_history.append({"role": "user", "content": user_message})
# Trim conversation history if it's getting too long
trim_conversation_history()
# Remove the old file guessing logic since we'll use function calls
try:
stream = client.chat.completions.create(
model="deepseek-reasoner",
messages=conversation_history,
tools=tools,
max_completion_tokens=64000,
stream=True
)
console.print("\n[bold bright_blue]🐋 Seeking...[/bold bright_blue]")
reasoning_started = False
reasoning_content = ""
final_content = ""
tool_calls = []
for chunk in stream:
# Handle reasoning content if available
if hasattr(chunk.choices[0].delta, 'reasoning_content') and chunk.choices[0].delta.reasoning_content:
if not reasoning_started:
console.print("\n[bold blue]💭 Reasoning:[/bold blue]")
reasoning_started = True
console.print(chunk.choices[0].delta.reasoning_content, end="")
reasoning_content += chunk.choices[0].delta.reasoning_content
elif chunk.choices[0].delta.content:
if reasoning_started:
console.print("\n") # Add spacing after reasoning
console.print("\n[bold bright_blue]🤖 Assistant>[/bold bright_blue] ", end="")
reasoning_started = False
final_content += chunk.choices[0].delta.content
console.print(chunk.choices[0].delta.content, end="")
elif chunk.choices[0].delta.tool_calls:
# Handle tool calls
for tool_call_delta in chunk.choices[0].delta.tool_calls:
if tool_call_delta.index is not None:
# Ensure we have enough tool_calls
while len(tool_calls) <= tool_call_delta.index:
tool_calls.append({
"id": "",
"type": "function",
"function": {"name": "", "arguments": ""}
})
if tool_call_delta.id:
tool_calls[tool_call_delta.index]["id"] = tool_call_delta.id
if tool_call_delta.function:
if tool_call_delta.function.name:
tool_calls[tool_call_delta.index]["function"]["name"] += tool_call_delta.function.name
if tool_call_delta.function.arguments:
tool_calls[tool_call_delta.index]["function"]["arguments"] += tool_call_delta.function.arguments
console.print() # New line after streaming
# Store the assistant's response in conversation history
assistant_message = {
"role": "assistant",
"content": final_content if final_content else None
}
if tool_calls:
# Convert our tool_calls format to the expected format
formatted_tool_calls = []
for i, tc in enumerate(tool_calls):
if tc["function"]["name"]: # Only add if we have a function name
# Ensure we have a valid tool call ID
tool_id = tc["id"] if tc["id"] else f"call_{i}_{int(time.time() * 1000)}"
formatted_tool_calls.append({
"id": tool_id,
"type": "function",
"function": {
"name": tc["function"]["name"],
"arguments": tc["function"]["arguments"]
}
})
if formatted_tool_calls:
# Important: When there are tool calls, content should be None or empty
if not final_content:
assistant_message["content"] = None
assistant_message["tool_calls"] = formatted_tool_calls
conversation_history.append(assistant_message)
# Execute tool calls and add results immediately
console.print(f"\n[bold bright_cyan]⚡ Executing {len(formatted_tool_calls)} function call(s)...[/bold bright_cyan]")
for tool_call in formatted_tool_calls:
console.print(f"[bright_blue]→ {tool_call['function']['name']}[/bright_blue]")
try:
result = execute_function_call_dict(tool_call)
# Add tool result to conversation immediately
tool_response = {
"role": "tool",
"tool_call_id": tool_call["id"],
"content": result
}
conversation_history.append(tool_response)
except Exception as e:
console.print(f"[red]Error executing {tool_call['function']['name']}: {e}[/red]")
# Still need to add a tool response even on error
conversation_history.append({
"role": "tool",
"tool_call_id": tool_call["id"],
"content": f"Error: {str(e)}"
})
# Get follow-up response after tool execution
console.print("\n[bold bright_blue]🔄 Processing results...[/bold bright_blue]")
follow_up_stream = client.chat.completions.create(
model="deepseek-reasoner",
messages=conversation_history,
tools=tools,
max_completion_tokens=64000,
stream=True
)
follow_up_content = ""
reasoning_started = False
for chunk in follow_up_stream:
# Handle reasoning content if available
if hasattr(chunk.choices[0].delta, 'reasoning_content') and chunk.choices[0].delta.reasoning_content:
if not reasoning_started:
console.print("\n[bold blue]💭 Reasoning:[/bold blue]")
reasoning_started = True
console.print(chunk.choices[0].delta.reasoning_content, end="")
elif chunk.choices[0].delta.content:
if reasoning_started:
console.print("\n")
console.print("\n[bold bright_blue]🤖 Assistant>[/bold bright_blue] ", end="")
reasoning_started = False
follow_up_content += chunk.choices[0].delta.content
console.print(chunk.choices[0].delta.content, end="")
console.print()
# Store follow-up response
conversation_history.append({
"role": "assistant",
"content": follow_up_content
})
else:
# No tool calls, just store the regular response
conversation_history.append(assistant_message)
return {"success": True}
except Exception as e:
error_msg = f"DeepSeek API error: {str(e)}"
console.print(f"\n[bold red]❌ {error_msg}[/bold red]")
return {"error": error_msg}
# --------------------------------------------------------------------------------
# 7. Main interactive loop
# --------------------------------------------------------------------------------
def main():
# Create a beautiful gradient-style welcome panel
welcome_text = """[bold bright_blue]🐋 DeepSeek Engineer[/bold bright_blue] [bright_cyan]with Function Calling[/bright_cyan]
[dim blue]Powered by DeepSeek-R1 with Chain-of-Thought Reasoning[/dim blue]"""
console.print(Panel.fit(
welcome_text,
border_style="bright_blue",
padding=(1, 2),
title="[bold bright_cyan]🤖 AI Code Assistant[/bold bright_cyan]",
title_align="center"
))
# Create an elegant instruction panel
instructions = """[bold bright_blue]📁 File Operations:[/bold bright_blue]
• [bright_cyan]/add path/to/file[/bright_cyan] - Include a single file in conversation
• [bright_cyan]/add path/to/folder[/bright_cyan] - Include all files in a folder
• [dim]The AI can automatically read and create files using function calls[/dim]
[bold bright_blue]🎯 Commands:[/bold bright_blue]
• [bright_cyan]exit[/bright_cyan] or [bright_cyan]quit[/bright_cyan] - End the session
• Just ask naturally - the AI will handle file operations automatically!"""
console.print(Panel(
instructions,
border_style="blue",
padding=(1, 2),
title="[bold blue]💡 How to Use[/bold blue]",
title_align="left"
))
console.print()
while True:
try:
user_input = prompt_session.prompt("🔵 You> ").strip()
except (EOFError, KeyboardInterrupt):
console.print("\n[bold yellow]👋 Exiting gracefully...[/bold yellow]")
break
if not user_input:
continue
if user_input.lower() in ["exit", "quit"]:
console.print("[bold bright_blue]👋 Goodbye! Happy coding![/bold bright_blue]")
break
if try_handle_add_command(user_input):
continue
response_data = stream_openai_response(user_input)
if response_data.get("error"):
console.print(f"[bold red]❌ Error: {response_data['error']}[/bold red]")
console.print("[bold blue]✨ Session finished. Thank you for using DeepSeek Engineer![/bold blue]")
if __name__ == "__main__":
main()