Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 60 additions & 8 deletions src/common/cloud_control/proto/resource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,68 @@ option go_package = "databend.com/cloudcontrol/proto";

package resourceproto;

message ApplyResourceRequest {
string type = 3;
string script = 4;
message Worker {
string name = 1;
map<string, string> tags = 2;
string created_at = 3;
string updated_at = 4;
map<string, string> options = 5;
}

message ApplyResourceResponse {
string endpoint = 1;
map<string, string> headers = 2;
message CreateWorkerRequest {
string tenant_id = 1;
string name = 2;
bool if_not_exists = 3;
map<string, string> tags = 4;
string type = 5;
string script = 6;
map<string, string> options = 7;
}

service ResourceService {
rpc ApplyResource(ApplyResourceRequest) returns (ApplyResourceResponse);
message CreateWorkerResponse {
Worker worker = 1;
string endpoint = 2;
map<string, string> headers = 3;
}

message AlterWorkerRequest {
enum WorkerStateAction {
Unspecified = 0;
Suspend = 1;
Resume = 2;
}
string tenant_id = 1;
string name = 2;
map<string, string> set_tags = 3;
repeated string unset_tags = 4;
map<string, string> set_options = 5;
repeated string unset_options = 6;
WorkerStateAction state_action = 7;
}

message AlterWorkerResponse {
Worker worker = 1;
}

message DropWorkerRequest {
string tenant_id = 1;
string name = 2;
bool if_exists = 3;
}

message DropWorkerResponse {}

message ListWorkersRequest {
string tenant_id = 1;
}

message ListWorkersResponse {
repeated Worker workers = 1;
}

service WorkerService {
rpc CreateWorker(CreateWorkerRequest) returns (CreateWorkerResponse);
rpc AlterWorker(AlterWorkerRequest) returns (AlterWorkerResponse);
rpc DropWorker(DropWorkerRequest) returns (DropWorkerResponse);
rpc ListWorkers(ListWorkersRequest) returns (ListWorkersResponse);
}
6 changes: 3 additions & 3 deletions src/common/cloud_control/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ impl ClientConfig {
);
}

pub fn add_resource_version_info(&mut self) {
pub fn add_worker_version_info(&mut self) {
self.add_metadata(
crate::resource_client::RESOURCE_CLIENT_VERSION_NAME,
crate::resource_client::RESOURCE_CLIENT_VERSION,
crate::worker_client::WORKER_CLIENT_VERSION_NAME,
crate::worker_client::WORKER_CLIENT_VERSION,
);
}
pub fn get_metadata(&self) -> &Vec<(String, String)> {
Expand Down
12 changes: 6 additions & 6 deletions src/common/cloud_control/src/cloud_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;

use crate::notification_client::NotificationClient;
use crate::resource_client::ResourceClient;
use crate::task_client::TaskClient;
use crate::worker_client::WorkerClient;

pub const CLOUD_REQUEST_TIMEOUT_SEC: u64 = 5; // 5 seconds

pub struct CloudControlApiProvider {
pub task_client: Arc<TaskClient>,
pub notification_client: Arc<NotificationClient>,
pub resource_client: Arc<ResourceClient>,
pub worker_client: Arc<WorkerClient>,
pub timeout: Duration,
}

Expand All @@ -44,11 +44,11 @@ impl CloudControlApiProvider {
let channel = endpoint.connect_lazy();
let task_client = TaskClient::new(channel.clone()).await?;
let notification_client = NotificationClient::new(channel.clone()).await?;
let resource_client = ResourceClient::new(channel).await?;
let worker_client = WorkerClient::new(channel).await?;
Ok(Arc::new(CloudControlApiProvider {
task_client,
notification_client,
resource_client,
worker_client,
timeout,
}))
}
Expand Down Expand Up @@ -89,8 +89,8 @@ impl CloudControlApiProvider {
self.notification_client.clone()
}

pub fn get_resource_client(&self) -> Arc<ResourceClient> {
self.resource_client.clone()
pub fn get_worker_client(&self) -> Arc<WorkerClient> {
self.worker_client.clone()
}
pub fn get_timeout(&self) -> Duration {
self.timeout
Expand Down
2 changes: 1 addition & 1 deletion src/common/cloud_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ pub mod client_config;
pub mod cloud_api;
pub mod notification_client;
pub mod notification_utils;
pub mod resource_client;
pub mod task_client;
pub mod task_utils;
pub mod worker_client;

#[allow(clippy::derive_partial_eq_without_eq)]
#[allow(clippy::large_enum_variant)]
Expand Down
46 changes: 0 additions & 46 deletions src/common/cloud_control/src/resource_client.rs

This file was deleted.

78 changes: 78 additions & 0 deletions src/common/cloud_control/src/worker_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use tonic::Request;
use tonic::transport::Channel;

use crate::pb::AlterWorkerRequest;
use crate::pb::AlterWorkerResponse;
use crate::pb::CreateWorkerRequest;
use crate::pb::CreateWorkerResponse;
use crate::pb::DropWorkerRequest;
use crate::pb::DropWorkerResponse;
use crate::pb::ListWorkersRequest;
use crate::pb::ListWorkersResponse;
use crate::pb::worker_service_client::WorkerServiceClient;

pub(crate) const WORKER_CLIENT_VERSION: &str = "v1";
pub(crate) const WORKER_CLIENT_VERSION_NAME: &str = "WORKER_CLIENT_VERSION";

pub struct WorkerClient {
pub client: WorkerServiceClient<Channel>,
}

impl WorkerClient {
pub async fn new(channel: Channel) -> databend_common_exception::Result<Arc<WorkerClient>> {
let client = WorkerServiceClient::new(channel);
Ok(Arc::new(WorkerClient { client }))
}

pub async fn create_worker(
&self,
req: Request<CreateWorkerRequest>,
) -> databend_common_exception::Result<CreateWorkerResponse> {
let mut client = self.client.clone();
let resp = client.create_worker(req).await?;
Ok(resp.into_inner())
}

pub async fn alter_worker(
&self,
req: Request<AlterWorkerRequest>,
) -> databend_common_exception::Result<AlterWorkerResponse> {
let mut client = self.client.clone();
let resp = client.alter_worker(req).await?;
Ok(resp.into_inner())
}

pub async fn drop_worker(
&self,
req: Request<DropWorkerRequest>,
) -> databend_common_exception::Result<DropWorkerResponse> {
let mut client = self.client.clone();
let resp = client.drop_worker(req).await?;
Ok(resp.into_inner())
}

pub async fn list_workers(
&self,
req: Request<ListWorkersRequest>,
) -> databend_common_exception::Result<ListWorkersResponse> {
let mut client = self.client.clone();
let resp = client.list_workers(req).await?;
Ok(resp.into_inner())
}
}
2 changes: 2 additions & 0 deletions src/query/ast/src/ast/statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ mod user;
mod view;
mod virtual_column;
mod warehouse;
mod worker;
mod workload;

pub use call::*;
Expand Down Expand Up @@ -110,4 +111,5 @@ pub use user::*;
pub use view::*;
pub use virtual_column::*;
pub use warehouse::*;
pub use worker::*;
pub use workload::*;
15 changes: 15 additions & 0 deletions src/query/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::ast::statements::role::AlterRoleStmt;
use crate::ast::statements::settings::Settings;
use crate::ast::statements::task::CreateTaskStmt;
use crate::ast::statements::warehouse::ShowWarehousesStmt;
use crate::ast::statements::worker::ShowWorkersStmt;
use crate::ast::statements::workload::CreateWorkloadGroupStmt;
use crate::ast::statements::workload::DropWorkloadGroupStmt;
use crate::ast::statements::workload::RenameWorkloadGroupStmt;
Expand Down Expand Up @@ -153,6 +154,12 @@ pub enum Statement {
AssignWarehouseNodes(AssignWarehouseNodesStmt),
UnassignWarehouseNodes(UnassignWarehouseNodesStmt),

// Workers
ShowWorkers(ShowWorkersStmt),
CreateWorker(CreateWorkerStmt),
AlterWorker(AlterWorkerStmt),
DropWorker(DropWorkerStmt),

// Workloads
ShowWorkloadGroups(ShowWorkloadGroupsStmt),
CreateWorkloadGroup(CreateWorkloadGroupStmt),
Expand Down Expand Up @@ -545,6 +552,7 @@ impl Statement {
| Statement::DescProcedure(..)
| Statement::CallProcedure(..)
| Statement::ShowWarehouses(..)
| Statement::ShowWorkers(..)
| Statement::ShowOnlineNodes(..)
| Statement::InspectWarehouse(..) => true,

Expand Down Expand Up @@ -628,6 +636,9 @@ impl Statement {
| Statement::UnassignWarehouseNodes(..)
| Statement::ResumeWarehouse(..)
| Statement::SuspendWarehouse(..)
| Statement::CreateWorker(..)
| Statement::AlterWorker(..)
| Statement::DropWorker(..)
| Statement::ShowWorkloadGroups(..)
| Statement::CreateWorkloadGroup(..)
| Statement::DropWorkloadGroup(..)
Expand Down Expand Up @@ -1109,6 +1120,10 @@ impl Display for Statement {
Statement::RenameWarehouseCluster(stmt) => write!(f, "{stmt}")?,
Statement::AssignWarehouseNodes(stmt) => write!(f, "{stmt}")?,
Statement::UnassignWarehouseNodes(stmt) => write!(f, "{stmt}")?,
Statement::ShowWorkers(stmt) => write!(f, "{stmt}")?,
Statement::CreateWorker(stmt) => write!(f, "{stmt}")?,
Statement::AlterWorker(stmt) => write!(f, "{stmt}")?,
Statement::DropWorker(stmt) => write!(f, "{stmt}")?,
Statement::ShowWorkloadGroups(stmt) => write!(f, "{stmt}")?,
Statement::CreateWorkloadGroup(stmt) => write!(f, "{stmt}")?,
Statement::DropWorkloadGroup(stmt) => write!(f, "{stmt}")?,
Expand Down
Loading
Loading