1717import gzip
1818import io
1919import itertools
20+ import os .path
2021from abc import ABC
2122from http import HTTPStatus
2223from typing import List
2324
25+ import grpc
2426import requests
2527from requests import HTTPError
2628
2729from opengemini_client .client import Client
2830from opengemini_client .measurement import Measurement , MeasurementCondition
2931from opengemini_client .models import Config , BatchPoints , Query , QueryResult , Series , SeriesResult , RpConfig , \
30- ValuesResult , KeyValue
32+ ValuesResult , KeyValue , AuthConfig
33+ from opengemini_client .proto import write_pb2 , write_pb2_grpc
34+ from opengemini_client .record_transform import RecordTransform
3135from opengemini_client .url_const import UrlConst
3236from opengemini_client .models import AuthType , TlsConfig
3337
3438
39+ def check_auth_config (auth_config : AuthConfig ):
40+ if auth_config is not None :
41+ if auth_config .auth_type == AuthType .PASSWORD :
42+ if len (auth_config .username ) == 0 :
43+ raise ValueError ("invalid auth config due to empty username" )
44+ if len (auth_config .password ) == 0 :
45+ raise ValueError ("invalid auth config due to empty password" )
46+ if auth_config .auth_type == AuthType .TOKEN and len (auth_config .token ) == 0 :
47+ raise ValueError ("invalid auth config due to empty token" )
48+
49+
3550def check_config (config : Config ):
3651 if len (config .address ) == 0 :
3752 raise ValueError ("must have at least one address" )
3853
39- if config .auth_config is not None :
40- if config .auth_config .auth_type == AuthType .PASSWORD :
41- if len (config .auth_config .username ) == 0 :
42- raise ValueError ("invalid auth config due to empty username" )
43- if len (config .auth_config .password ) == 0 :
44- raise ValueError ("invalid auth config due to empty password" )
45- if config .auth_config .auth_type == AuthType .TOKEN and len (config .auth_config .token ) == 0 :
46- raise ValueError ("invalid auth config due to empty token" )
54+ check_auth_config (config .auth_config )
4755
4856 if config .tls_enabled and config .tls_config is None :
4957 config .tls_config = TlsConfig ()
@@ -60,6 +68,17 @@ def check_config(config: Config):
6068 if config .connection_timeout is None or config .connection_timeout <= datetime .timedelta (seconds = 0 ):
6169 config .connection_timeout = datetime .timedelta (seconds = 10 )
6270
71+ if config .grpc_config is None :
72+ return config
73+
74+ if len (config .grpc_config .address ) == 0 :
75+ raise ValueError ("grpc config must have at least one address" )
76+
77+ check_auth_config (config .grpc_config .auth_config )
78+
79+ if config .grpc_config .tls_enable and config .grpc_config .tls_config is None :
80+ config .grpc_config .tls_config = TlsConfig ()
81+
6382 return config
6483
6584
@@ -95,6 +114,9 @@ def __init__(self, config: Config):
95114 self .session .verify = config .tls_config .ca_file
96115 self .endpoints = [f"{ protocol } { addr .host } :{ addr .port } " for addr in config .address ]
97116 self .endpoints_iter = itertools .cycle (self .endpoints )
117+ if self .config .grpc_config is not None :
118+ self .grpc_endpoints = [f"{ addr .host } :{ addr .port } " for addr in config .grpc_config .address ]
119+ self .grpc_endpoints_iter = itertools .cycle (self .grpc_endpoints )
98120
99121 def close (self ):
100122 self .session .close ()
@@ -108,6 +130,31 @@ def __exit__(self, _exc_type, _exc_val, _exc_tb):
108130 def _get_server_url (self ):
109131 return next (self .endpoints_iter )
110132
133+ def _get_grpc_server_url (self ):
134+ return next (self .grpc_endpoints_iter )
135+
136+ def _get_grpc_channel (self ):
137+ server_url = self ._get_grpc_server_url ()
138+ if self .config .grpc_config .tls_enable is False :
139+ return grpc .insecure_channel (server_url )
140+
141+ root_certificates = None
142+ private_key = None
143+ certificate_chain = None
144+ if os .path .exists (self .config .grpc_config .tls_config .ca_file ):
145+ with open (self .config .grpc_config .tls_config .ca_file , 'rb' ) as fd :
146+ root_certificates = fd .read ()
147+ if os .path .exists (self .config .grpc_config .tls_config .cert_file ):
148+ with open (self .config .grpc_config .tls_config .cert_file , 'rb' ) as fd :
149+ certificate_chain = fd .read ()
150+ if os .path .exists (self .config .grpc_config .tls_config .key_file ):
151+ with open (self .config .grpc_config .tls_config .key_file , 'rb' ) as fd :
152+ private_key = fd .read ()
153+ return grpc .secure_channel (
154+ target = server_url ,
155+ credentials = grpc .ssl_channel_credentials (root_certificates , private_key , certificate_chain )
156+ )
157+
111158 def _update_headers (self , method , url_path , headers = None ) -> dict :
112159 if headers is None :
113160 headers = {}
@@ -191,6 +238,47 @@ def write_batch_points(self, database: str, batch_points: BatchPoints):
191238 return
192239 raise HTTPError (f"write_batch_points error resp, code: { resp .status_code } , body: { resp .text } " )
193240
241+ def write_by_grpc (self , database : str , batch_points : BatchPoints , rp : str = '' ):
242+ # generate grpc request records
243+ record_transforms = {}
244+ for point in batch_points .points :
245+ rt = record_transforms .get (point .measurement )
246+ if rt is None :
247+ rt = RecordTransform ()
248+ rt .add_point (point .tags , point .fields , point .generate_timestamp ())
249+ record_transforms [point .measurement ] = rt
250+ records = []
251+ for measurement , rt in record_transforms .items ():
252+ record = write_pb2 .Record (
253+ measurement = measurement ,
254+ min_time = rt .min_time ,
255+ max_time = rt .max_time ,
256+ block = rt .convert_to_record ().marshal (b'' ),
257+ )
258+ records .append (record )
259+
260+ username = ''
261+ password = ''
262+ if self .config .grpc_config .auth_config is not None :
263+ username = self .config .grpc_config .auth_config .username
264+ password = self .config .grpc_config .auth_config .password
265+
266+ # send grpc request
267+ channel = self ._get_grpc_channel ()
268+ response = write_pb2_grpc .WriteServiceStub (channel ).Write (
269+ write_pb2 .WriteRequest (
270+ database = database ,
271+ retention_policy = rp ,
272+ username = username ,
273+ password = password ,
274+ records = records ,
275+ ),
276+ timeout = self .config .timeout .seconds
277+ )
278+ if response .code == 0 :
279+ return
280+ raise HTTPError (f"write_by_grpc error resp, code: { response .code } " )
281+
194282 def create_database (self , database : str , rp : RpConfig = None ):
195283 if not database :
196284 raise ValueError ("empty database name" )
0 commit comments