2121import io .github .openfacade .http .HttpClientConfig ;
2222import io .github .openfacade .http .HttpClientFactory ;
2323import io .github .openfacade .http .HttpResponse ;
24- import io .opengemini .client .api .AuthConfig ;
25- import io .opengemini .client .api .AuthType ;
26- import io .opengemini .client .api .Configuration ;
27- import io .opengemini .client .api .OpenGeminiAsyncClient ;
28- import io .opengemini .client .api .OpenGeminiException ;
29- import io .opengemini .client .api .Point ;
30- import io .opengemini .client .api .Pong ;
31- import io .opengemini .client .api .Query ;
32- import io .opengemini .client .api .QueryResult ;
33- import io .opengemini .client .api .RetentionPolicy ;
34- import io .opengemini .client .api .RpConfig ;
24+ import io .opengemini .client .api .*;
3525import io .opengemini .client .common .BaseClient ;
3626import io .opengemini .client .common .CommandFactory ;
3727import io .opengemini .client .common .HeaderConst ;
3828import io .opengemini .client .common .JacksonService ;
3929import io .opengemini .client .common .ResultMapper ;
30+ import io .opengemini .client .interceptor .Interceptor ;
4031import org .apache .commons .lang3 .StringUtils ;
4132import org .jetbrains .annotations .NotNull ;
4233
4334import java .io .IOException ;
4435import java .nio .charset .StandardCharsets ;
45- import java .util .List ;
46- import java .util .Optional ;
47- import java .util .StringJoiner ;
36+ import java .util .*;
4837import java .util .concurrent .CompletableFuture ;
4938
5039public class OpenGeminiClient extends BaseClient implements OpenGeminiAsyncClient {
40+ private final List <Interceptor > interceptors = new ArrayList <>();
41+
42+ /**
43+ * Add interceptors to the client.
44+ */
45+ public void addInterceptors (Interceptor ... interceptors ) {
46+ Collections .addAll (this .interceptors , interceptors );
47+ }
48+
49+
50+
5151 protected final Configuration conf ;
5252
5353 private final HttpClient client ;
@@ -57,6 +57,10 @@ public OpenGeminiClient(@NotNull Configuration conf) {
5757 this .conf = conf ;
5858 AuthConfig authConfig = conf .getAuthConfig ();
5959 HttpClientConfig httpConfig = conf .getHttpConfig ();
60+ if (httpConfig == null ) {
61+ httpConfig = new HttpClientConfig .Builder ().build ();
62+ conf .setHttpConfig (httpConfig );
63+ }
6064 if (authConfig != null && authConfig .getAuthType ().equals (AuthType .PASSWORD )) {
6165 httpConfig .addRequestFilter (
6266 new BasicAuthRequestFilter (authConfig .getUsername (), String .valueOf (authConfig .getPassword ())));
@@ -159,7 +163,7 @@ public CompletableFuture<Void> write(String database, String retentionPolicy, Po
159163 if (StringUtils .isEmpty (body )) {
160164 return CompletableFuture .completedFuture (null );
161165 }
162- return executeWrite (database , retentionPolicy , body );
166+ return executeWrite (database , retentionPolicy , body );
163167 }
164168
165169 @ Override
@@ -195,9 +199,27 @@ public CompletableFuture<Pong> ping() {
195199 *
196200 * @param query the query to execute.
197201 */
198- protected CompletableFuture <QueryResult > executeQuery (Query query ) {
202+ public CompletableFuture <QueryResult > executeQuery (Query query ) {
199203 String queryUrl = getQueryUrl (query );
200- return get (queryUrl ).thenCompose (response -> convertResponse (response , QueryResult .class ));
204+
205+ // 执行所有queryBefore拦截器
206+ CompletableFuture <Void > beforeFutures = CompletableFuture .allOf (
207+ interceptors .stream ()
208+ .map (interceptor -> interceptor .queryBefore (query ))
209+ .toArray (CompletableFuture []::new )
210+ );
211+
212+ return beforeFutures .thenCompose (voidResult -> {
213+ return executeHttpQuery (query ).thenCompose (response -> {
214+ // 执行所有queryAfter拦截器
215+ CompletableFuture <Void > afterFutures = CompletableFuture .allOf (
216+ interceptors .stream ()
217+ .map (interceptor -> interceptor .queryAfter (query , response ))
218+ .toArray (CompletableFuture []::new )
219+ );
220+ return afterFutures .thenCompose (voidResult2 -> convertResponse (response , QueryResult .class ));
221+ });
222+ });
201223 }
202224
203225 /**
@@ -217,9 +239,33 @@ protected CompletableFuture<QueryResult> executePostQuery(Query query) {
217239 * @param retentionPolicy the name of the retention policy.
218240 * @param lineProtocol the line protocol string to write.
219241 */
220- protected CompletableFuture <Void > executeWrite (String database , String retentionPolicy , String lineProtocol ) {
242+ public CompletableFuture <Void > executeWrite (String database , String retentionPolicy , String lineProtocol ) {
221243 String writeUrl = getWriteUrl (database , retentionPolicy );
222- return post (writeUrl , lineProtocol ).thenCompose (response -> convertResponse (response , Void .class ));
244+ Write write = new Write (
245+ database ,
246+ retentionPolicy ,
247+ "default_measurement" , // Default measurement name
248+ lineProtocol ,
249+ "ns" // Default precision
250+ );
251+
252+ // Execute all writeBefore interceptors
253+ CompletableFuture <Void > beforeFutures = CompletableFuture .allOf (
254+ interceptors .stream ()
255+ .map (interceptor -> interceptor .writeBefore (write ))
256+ .toArray (CompletableFuture []::new )
257+ );
258+
259+ return beforeFutures .thenCompose (voidResult -> {
260+ return executeHttpWrite (write ).thenCompose (response -> { // response 是 io.github.openfacade.http.HttpResponse
261+ CompletableFuture <Void > afterFutures = CompletableFuture .allOf (
262+ interceptors .stream ()
263+ .map (interceptor -> interceptor .writeAfter (write , response )) // 传递正确的类型
264+ .toArray (CompletableFuture []::new )
265+ );
266+ return afterFutures .thenCompose (voidResult2 -> convertResponse (response , Void .class ));
267+ });
268+ });
223269 }
224270
225271 /**
@@ -258,7 +304,7 @@ private CompletableFuture<HttpResponse> get(String url) {
258304
259305 private CompletableFuture <HttpResponse > post (String url , String body ) {
260306 return client .post (buildUriWithPrefix (url ), body == null ? new byte [0 ] : body .getBytes (StandardCharsets .UTF_8 ),
261- headers );
307+ headers );
262308 }
263309
264310 @ Override
@@ -270,4 +316,20 @@ public void close() throws IOException {
270316 public String toString () {
271317 return "OpenGeminiClient{" + "httpEngine=" + conf .getHttpConfig ().engine () + '}' ;
272318 }
273- }
319+
320+ /**
321+ * 执行 HTTP 查询请求
322+ */
323+ private CompletableFuture <HttpResponse > executeHttpQuery (Query query ) {
324+ String queryUrl = getQueryUrl (query );
325+ return get (queryUrl );
326+ }
327+
328+ /**
329+ * 执行 HTTP 写入请求
330+ */
331+ private CompletableFuture <HttpResponse > executeHttpWrite (Write write ) {
332+ String writeUrl = getWriteUrl (write .getDatabase (), write .getRetentionPolicy ());
333+ return post (writeUrl , write .getLineProtocol ());
334+ }
335+ }
0 commit comments