-
-
Notifications
You must be signed in to change notification settings - Fork 1
gRPC basis (and logs) #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
mirkobrombin
wants to merge
3
commits into
bottlesdevs:main
Choose a base branch
from
mirkobrombin:main
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,36 +1,188 @@ | ||
| use bottles_core::proto::{self, bottles_server::Bottles, wine_bridge_client::WineBridgeClient}; | ||
| use bottles_core::proto::bottles::{self, management_server::{self, Management}}; | ||
| use bottles_core::bottle::{Bottle, BottleType}; | ||
| use crate::state::SharedState; | ||
| use tonic::{Request, Response, Status}; | ||
|
|
||
| #[derive(Debug, Default)] | ||
| pub struct BottlesService; | ||
| pub mod state; | ||
| pub mod orchestrator; | ||
|
|
||
| pub struct BottlesService { | ||
| state: SharedState, | ||
| } | ||
|
|
||
| impl BottlesService { | ||
| pub fn new(state: SharedState) -> Self { | ||
| Self { state } | ||
| } | ||
| } | ||
|
|
||
| #[tonic::async_trait] | ||
| impl Bottles for BottlesService { | ||
| async fn health( | ||
| impl Management for BottlesService { | ||
| async fn create_bottle( | ||
| &self, | ||
| request: Request<bottles::CreateBottleRequest>, | ||
| ) -> Result<Response<bottles::Bottle>, Status> { | ||
| let req = request.into_inner(); | ||
| tracing::info!("Received CreateBottle request for: {}", req.name); | ||
| let name = req.name; | ||
|
|
||
| let mut state = self.state.write().map_err(|_| Status::internal("Lock error"))?; | ||
|
|
||
| // Validation: Check if exists | ||
| if state.bottles.iter().any(|b| b.name == name) { | ||
| tracing::warn!("Bottle {} already exists", name); | ||
| return Err(Status::already_exists("Bottle already exists")); | ||
| } | ||
|
|
||
| // Logic (stub path for now, usually should be derived from config) | ||
| let path = std::path::PathBuf::from(format!("/home/mirko/.local/share/bottles/bottles/{}", name)); // TODO: use proper config | ||
| let kind = match req.r#type.as_str() { | ||
| "Gaming" => BottleType::Gaming, | ||
| "Software" => BottleType::Software, | ||
| _ => BottleType::Custom, | ||
| }; | ||
|
|
||
| let bottle = Bottle::new(name.clone(), path, kind); | ||
|
|
||
| // Save | ||
| state.bottles.push(bottle.clone()); | ||
| state.save(); | ||
| tracing::info!("Bottle {} created successfully", name); | ||
|
|
||
| // Map to Proto | ||
| Ok(Response::new(bottles::Bottle { | ||
| name: bottle.name, | ||
| path: bottle.path.to_string_lossy().to_string(), | ||
| r#type: req.r#type, // Simplified mapping | ||
| active: false, | ||
| config: None, // Default config | ||
| })) | ||
| } | ||
|
|
||
| async fn delete_bottle( | ||
| &self, | ||
| request: tonic::Request<proto::HealthRequest>, | ||
| ) -> Result<tonic::Response<proto::HealthResponse>, tonic::Status> { | ||
| let request = request.get_ref(); | ||
| tracing::info!("Received request: {:?}", request); | ||
| Ok(tonic::Response::new(proto::HealthResponse { ok: true })) | ||
| request: Request<bottles::DeleteBottleRequest>, | ||
| ) -> Result<Response<bottles::ResultResponse>, Status> { | ||
| let name = request.into_inner().name; | ||
| tracing::info!("Received DeleteBottle request for: {}", name); | ||
| let mut state = self.state.write().map_err(|_| Status::internal("Lock error"))?; | ||
|
|
||
| if let Some(pos) = state.bottles.iter().position(|b| b.name == name) { | ||
| state.bottles.remove(pos); | ||
| state.save(); | ||
| tracing::info!("Bottle {} deleted successfully", name); | ||
| Ok(Response::new(bottles::ResultResponse { | ||
| success: true, | ||
| error_message: String::new(), | ||
| })) | ||
| } else { | ||
| tracing::warn!("Bottle {} not found for deletion", name); | ||
| Err(Status::not_found("Bottle not found")) | ||
| } | ||
| } | ||
|
|
||
| async fn notify( | ||
| async fn list_bottles( | ||
| &self, | ||
| request: tonic::Request<proto::NotifyRequest>, | ||
| ) -> Result<tonic::Response<proto::NotifyResponse>, tonic::Status> { | ||
| let request = request.get_ref(); | ||
| tracing::info!("Received request: {:?}", request); | ||
| let mut client = WineBridgeClient::connect("http://[::1]:50051") | ||
| .await | ||
| .map_err(|e| tonic::Status::from_error(Box::new(e)))?; | ||
|
|
||
| let request = proto::MessageRequest { | ||
| message: request.message.clone(), | ||
| }; | ||
| let response = client.message(request).await?; | ||
| let response = response.get_ref(); | ||
| Ok(tonic::Response::new(proto::NotifyResponse { | ||
| success: response.success, | ||
| _request: Request<bottles::ListBottlesRequest>, | ||
| ) -> Result<Response<bottles::ListBottlesResponse>, Status> { | ||
| tracing::info!("Received ListBottles request"); | ||
| let state = self.state.read().map_err(|_| Status::internal("Lock error"))?; | ||
|
|
||
| let bottles: Vec<bottles::Bottle> = state.bottles.iter().map(|b| { | ||
| let active = state.orchestrator.is_running(&b.name); | ||
| bottles::Bottle { | ||
| name: b.name.clone(), | ||
| path: b.path.to_string_lossy().to_string(), | ||
| r#type: format!("{:?}", b.kind), | ||
| active, | ||
| config: None, | ||
| } | ||
| }).collect(); | ||
|
|
||
| tracing::info!("Returning {} bottles", bottles.len()); | ||
|
|
||
| Ok(Response::new(bottles::ListBottlesResponse { bottles })) | ||
| } | ||
|
|
||
| async fn get_bottle( | ||
| &self, | ||
| request: Request<bottles::GetBottleRequest>, | ||
| ) -> Result<Response<bottles::Bottle>, Status> { | ||
| let name = request.into_inner().name; | ||
| let state = self.state.read().map_err(|_| Status::internal("Lock error"))?; | ||
|
|
||
| let bottle = state.bottles.iter().find(|b| b.name == name) | ||
| .ok_or_else(|| Status::not_found("Bottle not found"))?; | ||
|
|
||
| let active = state.orchestrator.is_running(&name); | ||
|
|
||
| Ok(Response::new(bottles::Bottle { | ||
| name: bottle.name.clone(), | ||
| path: bottle.path.to_string_lossy().to_string(), | ||
| r#type: format!("{:?}", bottle.kind), | ||
| active, | ||
| config: None, | ||
| })) | ||
| } | ||
|
|
||
| async fn start_bottle( | ||
| &self, | ||
| request: Request<bottles::BottleRequest>, | ||
| ) -> Result<Response<bottles::ResultResponse>, Status> { | ||
| let name = request.into_inner().name; | ||
| tracing::info!("Starting bottle: {}", name); | ||
|
|
||
| // TODO: In the future, this method will interact with the Orchestrator to: | ||
| // 1. Resolve the correct Runner via Component Manager. | ||
| // 2. Launch the Agent process. | ||
| // 3. Establish gRPC connection with the Agent. | ||
| // | ||
| // Currently, it updates the internal state to simulate a running bottle | ||
| // for protocol verification. | ||
|
|
||
| let state = self.state.read().map_err(|_| Status::internal("Lock error"))?; | ||
| // Check existence | ||
| if !state.bottles.iter().any(|b| b.name == name) { | ||
| return Err(Status::not_found("Bottle not found")); | ||
| } | ||
|
|
||
| state.orchestrator.start_bottle(name.clone()) | ||
| .map_err(|e| Status::internal(e))?; | ||
|
|
||
| tracing::info!("Bottle {} started (simulated)", name); | ||
|
|
||
| Ok(Response::new(bottles::ResultResponse { | ||
| success: true, | ||
| error_message: String::new(), | ||
| })) | ||
| } | ||
|
|
||
| async fn stop_bottle( | ||
| &self, | ||
| request: Request<bottles::BottleRequest>, | ||
| ) -> Result<Response<bottles::ResultResponse>, Status> { | ||
| let name = request.into_inner().name; | ||
| tracing::info!("Stopping bottle: {}", name); | ||
|
|
||
| let state = self.state.read().map_err(|_| Status::internal("Lock error"))?; | ||
| state.orchestrator.stop_bottle(&name) | ||
| .map_err(|e| Status::internal(e))?; | ||
|
|
||
| Ok(Response::new(bottles::ResultResponse { | ||
| success: true, | ||
| error_message: String::new(), | ||
| })) | ||
| } | ||
|
|
||
| async fn restart_bottle( | ||
| &self, | ||
| request: Request<bottles::BottleRequest>, | ||
| ) -> Result<Response<bottles::ResultResponse>, Status> { | ||
| let req = request.into_inner(); | ||
| let stop_req = Request::new(bottles::BottleRequest { name: req.name.clone() }); | ||
| let _ = self.stop_bottle(stop_req).await; | ||
|
|
||
| let start_req = Request::new(bottles::BottleRequest { name: req.name }); | ||
| self.start_bottle(start_req).await | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,19 +1,31 @@ | ||
| use bottles_core::proto::bottles_server::BottlesServer; | ||
| use bottles_server::BottlesService; | ||
| use bottles_core::proto::bottles::management_server::ManagementServer; | ||
| use bottles_server::{BottlesService, state::AppState}; | ||
| use std::sync::{Arc, RwLock}; | ||
| use tracing_subscriber::EnvFilter; | ||
|
|
||
| #[tokio::main] | ||
| async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||
| tracing_subscriber::fmt() | ||
| .with_env_filter(EnvFilter::from_default_env()) | ||
| .with_env_filter( | ||
| EnvFilter::try_from_default_env() | ||
| .unwrap_or_else(|_| EnvFilter::new("info")) | ||
| ) | ||
| .init(); | ||
|
|
||
| // Initialize State | ||
| let data_path = std::path::PathBuf::from("/home/mirko/.local/share/bottles/next"); | ||
| let state = Arc::new(RwLock::new(AppState::new(data_path))); | ||
|
|
||
| let addr = "[::1]:50052".parse().unwrap(); | ||
| let service = BottlesService::default(); | ||
| tracing::info!("Listening on {}", addr); | ||
| let service = BottlesService::new(state); | ||
|
|
||
| tracing::info!("Bottles Next Server listening on {}", addr); | ||
|
|
||
| tonic::transport::Server::builder() | ||
| .add_service(BottlesServer::new(service)) | ||
| .add_service(ManagementServer::new(service)) | ||
| // .add_service(ConfigurationServer::new(...)) // To be implemented | ||
| .serve(addr) | ||
| .await?; | ||
|
|
||
| Ok(()) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| use std::collections::HashSet; | ||
| use std::sync::{Arc, Mutex}; | ||
|
|
||
| /// The Orchestrator manages the lifecycle of bottles. | ||
| /// | ||
| /// TODO: This implementation currently serves as a functional mock to validate the gRPC | ||
| /// protocol and orchestration flow. It does not yet integrate with the actual Component Manager | ||
| /// or spawn real processes (Agent:WineBridge). | ||
| /// | ||
| /// Future implementation requirements: | ||
| /// - Integration with Component Manager for runner/dependency resolution. | ||
| /// - Spawning of actual Agent processes. | ||
| #[derive(Clone)] | ||
| pub struct Orchestrator { | ||
| // In a real implementation, this would hold handles to Child processes or gRPC clients. | ||
| running_bottles: Arc<Mutex<HashSet<String>>>, | ||
| } | ||
|
|
||
| impl Orchestrator { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| running_bottles: Arc::new(Mutex::new(HashSet::new())), | ||
| } | ||
| } | ||
|
|
||
| pub fn start_bottle(&self, name: String) -> Result<(), String> { | ||
| let mut bottles = self.running_bottles.lock().map_err(|_| "Lock poisoning")?; | ||
|
|
||
| if bottles.contains(&name) { | ||
| return Err("Bottle is already running".to_string()); | ||
| } | ||
|
|
||
| // TODO: Real process spawning logic goes here. | ||
| // - Resolve Runner via Component Manager. | ||
| // - Launch Agent process. | ||
| // - Wait for Agent readiness. | ||
|
|
||
| bottles.insert(name); | ||
| Ok(()) | ||
| } | ||
|
|
||
| pub fn stop_bottle(&self, name: &str) -> Result<(), String> { | ||
| let mut bottles = self.running_bottles.lock().map_err(|_| "Lock poisoning")?; | ||
|
|
||
| if !bottles.remove(name) { | ||
| return Err("Bottle is not running".to_string()); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| pub fn is_running(&self, name: &str) -> bool { | ||
| let bottles = self.running_bottles.lock().unwrap(); | ||
| bottles.contains(name) | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| use bottles_core::bottle::Bottle; | ||
| use bottles_core::persistence::Persistence; | ||
| use std::sync::{Arc, RwLock}; | ||
| use std::path::PathBuf; | ||
| use crate::orchestrator::Orchestrator; | ||
|
|
||
| pub struct AppState { | ||
| pub bottles: Vec<Bottle>, | ||
| pub persistence: Persistence, | ||
| pub orchestrator: Orchestrator, | ||
| } | ||
|
|
||
| impl AppState { | ||
| pub fn new(data_path: PathBuf) -> Self { | ||
| let persistence = Persistence::new(data_path); | ||
| let bottles = persistence.load_bottles().unwrap_or_else(|e| { | ||
| tracing::error!("Failed to load bottles: {}", e); | ||
| Vec::new() | ||
| }); | ||
| let orchestrator = Orchestrator::new(); | ||
|
|
||
| Self { | ||
| bottles, | ||
| persistence, | ||
| orchestrator, | ||
| } | ||
| } | ||
|
|
||
| pub fn save(&self) { | ||
| if let Err(e) = self.persistence.save_bottles(&self.bottles) { | ||
| tracing::error!("Failed to save bottles: {}", e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| pub type SharedState = Arc<RwLock<AppState>>; |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead this, implement
FromStrtrait onBottlesType