diff --git a/docs/en/connector-v2/sink/GraphQL.md b/docs/en/connector-v2/sink/GraphQL.md index 2ac634083bc7..df9f1bbee36e 100644 --- a/docs/en/connector-v2/sink/GraphQL.md +++ b/docs/en/connector-v2/sink/GraphQL.md @@ -26,12 +26,12 @@ Used to launch web hooks using data. ## Supported DataSource Info -In order to use the Http connector, the following dependencies are required. +In order to use the GraphQL connector, the following dependencies are required. They can be downloaded via install-plugin.sh or from the Maven central repository. | Datasource | Supported Versions | Dependency | |------------|--------------------|------------------------------------------------------------------------------------------------------------------| -| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-prometheus) | +| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-http) | ## Sink Options @@ -47,11 +47,6 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor | retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed | | connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. | | socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. | -| key_timestamp | Int | NO | - | prometheus timestamp key . | -| key_label | String | yes | - | prometheus label key | -| key_value | Double | yes | - | prometheus value | -| batch_size | Int | false | 1024 | prometheus batch size write | -| flush_interval | Long | false | 300000L | prometheus flush commit interval | | common-options | | No | - | Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details | ## Example diff --git a/docs/en/connector-v2/source/GraphQL.md b/docs/en/connector-v2/source/GraphQL.md index 30738ed1f0f0..2f062c8bf040 100644 --- a/docs/en/connector-v2/source/GraphQL.md +++ b/docs/en/connector-v2/source/GraphQL.md @@ -25,7 +25,6 @@ Used to read data from GraphQL. | timeout | Long | No | - | | content_field | String | Yes | $.data.{query_object}.* | | schema.fields | Config | Yes | - | -| format | String | No | json | | params | Map | Yes | - | | poll_interval_millis | int | No | - | | retry | int | No | - | @@ -56,8 +55,8 @@ variables = { ### enable_subscription [boolean] -1. true : Build a socket reader to subscribe to the GraphQL service -2. false : Build an http reader subscription to the GraphQL service +1. true : Enable streaming subscription mode (WebSocket) +2. false : Enable batch query mode (HTTP) ### timeout [Long] @@ -87,10 +86,6 @@ The retry-backoff times(millis) multiplier if request http failed The maximum retry-backoff times(millis) if request http failed -### format [String] - -the format of upstream data, default `json`. - ### schema [Config] Fill in a fixed value @@ -122,7 +117,6 @@ Source plugin common parameters, please refer to [Source Common Options](../sour source { GraphQL { url = "http://192.168.1.103:9081/v1/graphql" - format = "json" content_field = "$.data.source" query = """ query MyQuery($limit: Int) { @@ -155,7 +149,6 @@ source { source { GraphQL { url = "http://192.168.1.103:9081/v1/graphql" - format = "json" content_field = "$.data.source" query = """ query MyQuery($limit: Int) { diff --git a/docs/zh/connector-v2/sink/GraphQL.md b/docs/zh/connector-v2/sink/GraphQL.md index b73d9a6117a5..45210e7157d4 100644 --- a/docs/zh/connector-v2/sink/GraphQL.md +++ b/docs/zh/connector-v2/sink/GraphQL.md @@ -26,11 +26,11 @@ import ChangeLog from '../changelog/connector-graphql.md'; ## 支持的数据源信息 -想使用 Prometheus 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖 +想使用 GraphQL 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖 | 数据源 | 支持版本 | 依赖 | | ------ | --------- | ------------------------------------------------------------ | -| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-prometheus) | +| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-http) | ## 接收器选项 @@ -46,11 +46,6 @@ import ChangeLog from '../changelog/connector-graphql.md'; | retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed | | connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. | | socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. | -| key_timestamp | Int | NO | - | prometheus timestamp key . | -| key_label | String | yes | - | prometheus label key | -| key_value | Double | yes | - | prometheus value | -| batch_size | Int | false | 1024 | prometheus batch size write | -| flush_interval | Long | false | 300000L | prometheus flush commit interval | | common-options | | No | - | Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details | ## 示例 diff --git a/docs/zh/connector-v2/source/GraphQL.md b/docs/zh/connector-v2/source/GraphQL.md index ac578af6428f..c50a3ef06b6a 100644 --- a/docs/zh/connector-v2/source/GraphQL.md +++ b/docs/zh/connector-v2/source/GraphQL.md @@ -11,7 +11,7 @@ import ChangeLog from '../changelog/connector-graphql.md'; ## 主要特性 - [x] [批处理](../../concept/connector-v2-features.md) -- [ ] [流处理](../../concept/connector-v2-features.md) +- [x] [流处理](../../concept/connector-v2-features.md) - [ ] [并行](../../concept/connector-v2-features.md) ## 源选项 @@ -25,7 +25,6 @@ import ChangeLog from '../changelog/connector-graphql.md'; | timeout | Long | No | - | | content_field | String | Yes | $.data.{query_object}.* | | schema.fields | Config | Yes | - | -| format | String | No | json | | params | Map | Yes | - | | poll_interval_millis | int | No | - | | retry | int | No | - | @@ -56,8 +55,8 @@ variables = { ### enable_subscription [boolean] -1. true : 构建一个套接字读取器来订阅GraphQL服务 -2. false : 构建GraphQL服务的http阅读器订阅 +1. true : 开启流式订阅模式(WebSocket) +2. false : 开启批处理查询模式(HTTP) ### timeout [Long] @@ -87,10 +86,6 @@ HTTP请求参数 如果http请求失败,最大重试回退时间(毫秒) -### format [String] - -上游数据的格式,默认为json。 - ### schema [Config] 填写一个固定值 @@ -122,7 +117,6 @@ HTTP请求参数 source { GraphQL { url = "http://192.168.1.103:9081/v1/graphql" - format = "json" content_field = "$.data.source" query = """ query MyQuery($limit: Int) { @@ -155,7 +149,6 @@ source { source { GraphQL { url = "http://192.168.1.103:9081/v1/graphql" - format = "json" content_field = "$.data.source" query = """ query MyQuery($limit: Int) { diff --git a/seatunnel-connectors-v2/connector-graphql/src/main/java/org/apache/seatunnel/connectors/seatunnel/graphql/source/GraphQLSource.java b/seatunnel-connectors-v2/connector-graphql/src/main/java/org/apache/seatunnel/connectors/seatunnel/graphql/source/GraphQLSource.java index 9e36744bc251..0cceea3d2070 100644 --- a/seatunnel-connectors-v2/connector-graphql/src/main/java/org/apache/seatunnel/connectors/seatunnel/graphql/source/GraphQLSource.java +++ b/seatunnel-connectors-v2/connector-graphql/src/main/java/org/apache/seatunnel/connectors/seatunnel/graphql/source/GraphQLSource.java @@ -17,8 +17,12 @@ package org.apache.seatunnel.connectors.seatunnel.graphql.source; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.options.ConnectorCommonOptions; import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; @@ -26,7 +30,9 @@ import org.apache.seatunnel.connectors.seatunnel.graphql.config.GraphQLSourceParameter; import org.apache.seatunnel.connectors.seatunnel.graphql.source.reader.GraphQLSourceHttpReader; import org.apache.seatunnel.connectors.seatunnel.graphql.source.reader.GraphQLSourceSocketReader; +import org.apache.seatunnel.connectors.seatunnel.http.config.HttpSourceOptions; import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource; +import org.apache.seatunnel.format.json.JsonDeserializationSchema; import lombok.extern.slf4j.Slf4j; @@ -49,7 +55,17 @@ public String getPluginName() { @Override protected void buildSchemaWithConfig(ReadonlyConfig pluginConfig) { - super.buildSchemaWithConfig(pluginConfig); + if (pluginConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) { + this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig); + this.deserializationSchema = new JsonDeserializationSchema(catalogTable, false, false); + Config config = pluginConfig.toConfig(); + if (config.hasPath(HttpSourceOptions.JSON_FIELD.key())) { + jsonField = getJsonField(config.getConfig(HttpSourceOptions.JSON_FIELD.key())); + } + if (config.hasPath(HttpSourceOptions.CONTENT_FIELD.key())) { + contentField = config.getString(HttpSourceOptions.CONTENT_FIELD.key()); + } + } } @Override diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java index c630bbe64496..44629f4f753f 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java @@ -197,7 +197,7 @@ public AbstractSingleSplitReader createReader( pageInfo); } - private JsonField getJsonField(Config jsonFieldConf) { + protected JsonField getJsonField(Config jsonFieldConf) { ConfigRenderOptions options = ConfigRenderOptions.concise(); return JsonField.builder() .fields(JsonUtils.toMap(jsonFieldConf.root().render(options)))