@@ -43,13 +43,17 @@ use databend_common_config::InnerConfig;
4343use databend_common_exception:: ErrorCode ;
4444use databend_common_exception:: Result ;
4545use databend_common_grpc:: ConnectionFactory ;
46+ use databend_common_license:: license:: Feature ;
47+ use databend_common_license:: license_manager:: LicenseManagerSwitch ;
4648use databend_common_management:: WarehouseApi ;
4749use databend_common_management:: WarehouseMgr ;
50+ use databend_common_meta_app:: tenant:: Tenant ;
4851use databend_common_meta_store:: MetaStore ;
4952use databend_common_meta_store:: MetaStoreProvider ;
5053use databend_common_meta_types:: NodeInfo ;
5154use databend_common_meta_types:: SeqV ;
5255use databend_common_metrics:: cluster:: * ;
56+ use databend_common_settings:: Settings ;
5357use databend_common_version:: DATABEND_COMMIT_VERSION ;
5458use databend_enterprise_resources_management:: ResourcesManagement ;
5559use futures:: future:: select;
@@ -322,7 +326,6 @@ impl ClusterDiscovery {
322326 let mut node_info = NodeInfo :: create (
323327 config. query . node_id . clone ( ) ,
324328 config. query . node_secret . clone ( ) ,
325- config. query . num_cpus ,
326329 format ! (
327330 "{}:{}" ,
328331 config. query. http_handler_host, config. query. http_handler_port
@@ -387,7 +390,6 @@ impl ClusterDiscovery {
387390 let mut node_info = NodeInfo :: create (
388391 config. query . node_id . clone ( ) ,
389392 config. query . node_secret . clone ( ) ,
390- config. query . num_cpus ,
391393 format ! (
392394 "{}:{}" ,
393395 config. query. http_handler_host, config. query. http_handler_port
@@ -535,7 +537,6 @@ impl ClusterDiscovery {
535537
536538 #[ async_backtrace:: framed]
537539 pub async fn register_to_metastore ( self : & Arc < Self > , cfg : & InnerConfig ) -> Result < ( ) > {
538- let cpus = cfg. query . num_cpus ;
539540 let mut address = cfg. query . flight_api_address . clone ( ) ;
540541 let mut http_address = format ! (
541542 "{}:{}" ,
@@ -601,7 +602,6 @@ impl ClusterDiscovery {
601602 let mut node_info = NodeInfo :: create (
602603 self . local_id . clone ( ) ,
603604 self . local_secret . clone ( ) ,
604- cpus,
605605 http_address,
606606 address,
607607 discovery_address,
@@ -614,6 +614,9 @@ impl ClusterDiscovery {
614614
615615 self . drop_invalid_nodes ( & node_info) . await ?;
616616
617+ let online_nodes = self . warehouse_manager . list_online_nodes ( ) . await ?;
618+ self . check_license_key ( online_nodes) . await ?;
619+
617620 match self . warehouse_manager . start_node ( node_info) . await {
618621 Ok ( seq_node) => self . start_heartbeat ( seq_node) . await ,
619622 Err ( cause) => Err ( cause. add_message_back ( "(while cluster api add_node)." ) ) ,
@@ -628,6 +631,24 @@ impl ClusterDiscovery {
628631 heartbeat. start ( node_info, seq) ;
629632 Ok ( ( ) )
630633 }
634+
635+ async fn check_license_key ( & self , nodes : Vec < NodeInfo > ) -> Result < ( ) > {
636+ let license_key = Self :: get_license_key ( & self . tenant_id ) . await ?;
637+
638+ let total_cpu_nums = nodes. iter ( ) . map ( |x| x. cpu_nums ) . sum :: < u64 > ( ) ;
639+ LicenseManagerSwitch :: instance ( )
640+ . check_enterprise_enabled ( license_key. clone ( ) , Feature :: MaxNodeQuota ( nodes. len ( ) ) ) ?;
641+
642+ LicenseManagerSwitch :: instance ( )
643+ . check_enterprise_enabled ( license_key, Feature :: MaxCpuQuota ( total_cpu_nums as usize ) )
644+ }
645+
646+ async fn get_license_key ( tenant : & str ) -> Result < String > {
647+ // We must get the license key from settings. It may be in the configuration file.
648+ let settings = Settings :: create ( Tenant :: new_literal ( tenant) ) ;
649+ settings. load_changes ( ) . await ?;
650+ Ok ( settings. get_enterprise_license ( ) )
651+ }
631652}
632653
633654struct ClusterHeartbeat {
0 commit comments