Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.

Commit 8a0f478

Browse files
committed
Move instantiating the pipelines into a pipeline factory
This would allow us to reuse the pipelines in the copilot proxy server without having to redefine them over and over.
1 parent 1c26ff5 commit 8a0f478

File tree

3 files changed

+86
-80
lines changed

3 files changed

+86
-80
lines changed

src/codegate/cli.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
from codegate.codegate_logging import LogFormat, LogLevel, setup_logging
1616
from codegate.config import Config, ConfigurationError
1717
from codegate.db.connection import init_db_sync
18+
from codegate.pipeline.factory import PipelineFactory
19+
from codegate.pipeline.secrets.manager import SecretsManager
1820
from codegate.providers.copilot.provider import CopilotProvider
1921
from codegate.server import init_app
2022
from codegate.storage.utils import restore_storage_backup
@@ -307,7 +309,11 @@ def serve(
307309
loop = asyncio.new_event_loop()
308310
asyncio.set_event_loop(loop)
309311

310-
app = init_app()
312+
# Initialize secrets manager and pipeline factory
313+
secrets_manager = SecretsManager()
314+
pipeline_factory = PipelineFactory(secrets_manager)
315+
316+
app = init_app(pipeline_factory)
311317

312318
# Run the server
313319
try:

src/codegate/pipeline/factory.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from typing import List
2+
3+
from codegate.config import Config
4+
from codegate.pipeline.base import PipelineStep, SequentialPipelineProcessor
5+
from codegate.pipeline.codegate_context_retriever.codegate import CodegateContextRetriever
6+
from codegate.pipeline.extract_snippets.extract_snippets import CodeSnippetExtractor
7+
from codegate.pipeline.extract_snippets.output import CodeCommentStep
8+
from codegate.pipeline.output import OutputPipelineProcessor, OutputPipelineStep
9+
from codegate.pipeline.secrets.manager import SecretsManager
10+
from codegate.pipeline.secrets.secrets import (
11+
CodegateSecrets,
12+
SecretRedactionNotifier,
13+
SecretUnredactionStep,
14+
)
15+
from codegate.pipeline.system_prompt.codegate import SystemPrompt
16+
from codegate.pipeline.version.version import CodegateVersion
17+
18+
19+
class PipelineFactory:
20+
def __init__(self, secrets_manager: SecretsManager):
21+
self.secrets_manager = secrets_manager
22+
23+
def create_input_pipeline(self) -> SequentialPipelineProcessor:
24+
input_steps: List[PipelineStep] = [
25+
# make sure that this step is always first in the pipeline
26+
# the other steps might send the request to a LLM for it to be analyzed
27+
# and without obfuscating the secrets, we'd leak the secrets during those
28+
# later steps
29+
CodegateSecrets(),
30+
CodegateVersion(),
31+
CodeSnippetExtractor(),
32+
SystemPrompt(Config.get_config().prompts.default_chat),
33+
CodegateContextRetriever(),
34+
]
35+
return SequentialPipelineProcessor(input_steps, self.secrets_manager)
36+
37+
def create_fim_pipeline(self) -> SequentialPipelineProcessor:
38+
fim_steps: List[PipelineStep] = [
39+
CodegateSecrets(),
40+
]
41+
return SequentialPipelineProcessor(fim_steps, self.secrets_manager)
42+
43+
def create_output_pipeline(self) -> OutputPipelineProcessor:
44+
output_steps: List[OutputPipelineStep] = [
45+
SecretRedactionNotifier(),
46+
SecretUnredactionStep(),
47+
CodeCommentStep(),
48+
]
49+
return OutputPipelineProcessor(output_steps)
50+
51+
def create_fim_output_pipeline(self) -> OutputPipelineProcessor:
52+
fim_output_steps: List[OutputPipelineStep] = [
53+
# temporarily disabled
54+
# SecretUnredactionStep(),
55+
]
56+
return OutputPipelineProcessor(fim_output_steps)

src/codegate/server.py

Lines changed: 23 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,10 @@
1-
from typing import List
2-
31
from fastapi import APIRouter, FastAPI
42
from fastapi.middleware.cors import CORSMiddleware
53

64
from codegate import __description__, __version__
7-
from codegate.config import Config
85
from codegate.dashboard.dashboard import dashboard_router
9-
from codegate.pipeline.base import PipelineStep, SequentialPipelineProcessor
10-
from codegate.pipeline.codegate_context_retriever.codegate import CodegateContextRetriever
11-
from codegate.pipeline.extract_snippets.extract_snippets import CodeSnippetExtractor
12-
from codegate.pipeline.extract_snippets.output import CodeCommentStep
13-
from codegate.pipeline.output import OutputPipelineProcessor, OutputPipelineStep
14-
from codegate.pipeline.secrets.manager import SecretsManager
15-
from codegate.pipeline.secrets.secrets import (
16-
CodegateSecrets,
17-
SecretRedactionNotifier,
18-
SecretUnredactionStep,
19-
)
6+
from codegate.pipeline.factory import PipelineFactory
207
from codegate.pipeline.secrets.signatures import CodegateSignatures
21-
from codegate.pipeline.system_prompt.codegate import SystemPrompt
22-
from codegate.pipeline.version.version import CodegateVersion
238
from codegate.providers.anthropic.provider import AnthropicProvider
249
from codegate.providers.llamacpp.provider import LlamaCppProvider
2510
from codegate.providers.ollama.provider import OllamaProvider
@@ -28,7 +13,7 @@
2813
from codegate.providers.vllm.provider import VLLMProvider
2914

3015

31-
def init_app() -> FastAPI:
16+
def init_app(pipeline_factory: PipelineFactory) -> FastAPI:
3217
"""Create the FastAPI application."""
3318
app = FastAPI(
3419
title="CodeGate",
@@ -43,44 +28,6 @@ def init_app() -> FastAPI:
4328
allow_headers=["*"],
4429
)
4530

46-
# Initialize secrets manager
47-
# TODO: we need to clean up the secrets manager
48-
# after the conversation is concluded
49-
# this was done in the pipeline step but I just removed it for now
50-
secrets_manager = SecretsManager()
51-
52-
# Define input pipeline steps
53-
input_steps: List[PipelineStep] = [
54-
CodegateSecrets(),
55-
CodegateVersion(),
56-
CodeSnippetExtractor(),
57-
SystemPrompt(Config.get_config().prompts.default_chat),
58-
CodegateContextRetriever(),
59-
]
60-
61-
# Define FIM pipeline steps
62-
fim_steps: List[PipelineStep] = [
63-
CodegateSecrets(),
64-
]
65-
66-
# Initialize input pipeline processors
67-
input_pipeline_processor = SequentialPipelineProcessor(input_steps, secrets_manager)
68-
fim_pipeline_processor = SequentialPipelineProcessor(fim_steps, secrets_manager)
69-
70-
# Define output pipeline steps
71-
output_steps: List[OutputPipelineStep] = [
72-
SecretRedactionNotifier(),
73-
SecretUnredactionStep(),
74-
CodeCommentStep(),
75-
]
76-
fim_output_steps: List[OutputPipelineStep] = [
77-
# temporarily disabled
78-
# SecretUnredactionStep(),
79-
]
80-
81-
output_pipeline_processor = OutputPipelineProcessor(output_steps)
82-
fim_output_pipeline_processor = OutputPipelineProcessor(fim_output_steps)
83-
8431
# Create provider registry
8532
registry = ProviderRegistry(app)
8633

@@ -91,60 +38,57 @@ def init_app() -> FastAPI:
9138
registry.add_provider(
9239
"openai",
9340
OpenAIProvider(
94-
pipeline_processor=input_pipeline_processor,
95-
fim_pipeline_processor=fim_pipeline_processor,
96-
output_pipeline_processor=output_pipeline_processor,
97-
fim_output_pipeline_processor=fim_output_pipeline_processor,
41+
pipeline_processor=pipeline_factory.create_input_pipeline(),
42+
fim_pipeline_processor=pipeline_factory.create_fim_pipeline(),
43+
output_pipeline_processor=pipeline_factory.create_output_pipeline(),
44+
fim_output_pipeline_processor=pipeline_factory.create_fim_output_pipeline(),
9845
),
9946
)
10047
registry.add_provider(
10148
"anthropic",
10249
AnthropicProvider(
103-
pipeline_processor=input_pipeline_processor,
104-
fim_pipeline_processor=fim_pipeline_processor,
105-
output_pipeline_processor=output_pipeline_processor,
106-
fim_output_pipeline_processor=fim_output_pipeline_processor,
50+
pipeline_processor=pipeline_factory.create_input_pipeline(),
51+
fim_pipeline_processor=pipeline_factory.create_fim_pipeline(),
52+
output_pipeline_processor=pipeline_factory.create_output_pipeline(),
53+
fim_output_pipeline_processor=pipeline_factory.create_fim_output_pipeline(),
10754
),
10855
)
10956
registry.add_provider(
11057
"llamacpp",
11158
LlamaCppProvider(
112-
pipeline_processor=input_pipeline_processor,
113-
fim_pipeline_processor=fim_pipeline_processor,
114-
output_pipeline_processor=output_pipeline_processor,
115-
fim_output_pipeline_processor=fim_output_pipeline_processor,
59+
pipeline_processor=pipeline_factory.create_input_pipeline(),
60+
fim_pipeline_processor=pipeline_factory.create_fim_pipeline(),
61+
output_pipeline_processor=pipeline_factory.create_output_pipeline(),
62+
fim_output_pipeline_processor=pipeline_factory.create_fim_output_pipeline(),
11663
),
11764
)
11865
registry.add_provider(
11966
"vllm",
12067
VLLMProvider(
121-
pipeline_processor=input_pipeline_processor,
122-
fim_pipeline_processor=fim_pipeline_processor,
123-
output_pipeline_processor=output_pipeline_processor,
124-
fim_output_pipeline_processor=fim_output_pipeline_processor,
68+
pipeline_processor=pipeline_factory.create_input_pipeline(),
69+
fim_pipeline_processor=pipeline_factory.create_fim_pipeline(),
70+
output_pipeline_processor=pipeline_factory.create_output_pipeline(),
71+
fim_output_pipeline_processor=pipeline_factory.create_fim_output_pipeline(),
12572
),
12673
)
12774
registry.add_provider(
12875
"ollama",
12976
OllamaProvider(
130-
pipeline_processor=input_pipeline_processor,
131-
fim_pipeline_processor=fim_pipeline_processor,
132-
output_pipeline_processor=output_pipeline_processor,
133-
fim_output_pipeline_processor=fim_output_pipeline_processor,
77+
pipeline_processor=pipeline_factory.create_input_pipeline(),
78+
fim_pipeline_processor=pipeline_factory.create_fim_pipeline(),
79+
output_pipeline_processor=pipeline_factory.create_output_pipeline(),
80+
fim_output_pipeline_processor=pipeline_factory.create_fim_output_pipeline(),
13481
),
13582
)
13683

13784
# Create and add system routes
138-
system_router = APIRouter(tags=["System"]) # Tags group endpoints in the docs
85+
system_router = APIRouter(tags=["System"])
13986

14087
@system_router.get("/health")
14188
async def health_check():
14289
return {"status": "healthy"}
14390

144-
# Include the router in the app - this exposes the health check endpoint
14591
app.include_router(system_router)
146-
147-
# Include the routes for the dashboard
14892
app.include_router(dashboard_router)
14993

15094
return app

0 commit comments

Comments
 (0)