Skip to content

Commit cad0e40

Browse files
authored
[Fix][Connector-V2][GraphQL] Clarify subscription modes and JSON-only response (#10303)
1 parent 2cc680f commit cad0e40

File tree

6 files changed

+27
-35
lines changed

6 files changed

+27
-35
lines changed

docs/en/connector-v2/sink/GraphQL.md

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ Used to launch web hooks using data.
2626

2727
## Supported DataSource Info
2828

29-
In order to use the Http connector, the following dependencies are required.
29+
In order to use the GraphQL connector, the following dependencies are required.
3030
They can be downloaded via install-plugin.sh or from the Maven central repository.
3131

3232
| Datasource | Supported Versions | Dependency |
3333
|------------|--------------------|------------------------------------------------------------------------------------------------------------------|
34-
| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-prometheus) |
34+
| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-http) |
3535

3636
## Sink Options
3737

@@ -47,11 +47,6 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
4747
| retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed |
4848
| connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. |
4949
| socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. |
50-
| key_timestamp | Int | NO | - | prometheus timestamp key . |
51-
| key_label | String | yes | - | prometheus label key |
52-
| key_value | Double | yes | - | prometheus value |
53-
| batch_size | Int | false | 1024 | prometheus batch size write |
54-
| flush_interval | Long | false | 300000L | prometheus flush commit interval |
5550
| common-options | | No | - | Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details |
5651

5752
## Example

docs/en/connector-v2/source/GraphQL.md

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ Used to read data from GraphQL.
2525
| timeout | Long | No | - |
2626
| content_field | String | Yes | $.data.{query_object}.* |
2727
| schema.fields | Config | Yes | - |
28-
| format | String | No | json |
2928
| params | Map | Yes | - |
3029
| poll_interval_millis | int | No | - |
3130
| retry | int | No | - |
@@ -56,8 +55,8 @@ variables = {
5655

5756
### enable_subscription [boolean]
5857

59-
1. true : Build a socket reader to subscribe to the GraphQL service
60-
2. false : Build an http reader subscription to the GraphQL service
58+
1. true : Enable streaming subscription mode (WebSocket)
59+
2. false : Enable batch query mode (HTTP)
6160

6261
### timeout [Long]
6362

@@ -87,10 +86,6 @@ The retry-backoff times(millis) multiplier if request http failed
8786

8887
The maximum retry-backoff times(millis) if request http failed
8988

90-
### format [String]
91-
92-
the format of upstream data, default `json`.
93-
9489
### schema [Config]
9590

9691
Fill in a fixed value
@@ -122,7 +117,6 @@ Source plugin common parameters, please refer to [Source Common Options](../sour
122117
source {
123118
GraphQL {
124119
url = "http://192.168.1.103:9081/v1/graphql"
125-
format = "json"
126120
content_field = "$.data.source"
127121
query = """
128122
query MyQuery($limit: Int) {
@@ -155,7 +149,6 @@ source {
155149
source {
156150
GraphQL {
157151
url = "http://192.168.1.103:9081/v1/graphql"
158-
format = "json"
159152
content_field = "$.data.source"
160153
query = """
161154
query MyQuery($limit: Int) {

docs/zh/connector-v2/sink/GraphQL.md

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ import ChangeLog from '../changelog/connector-graphql.md';
2626

2727
## 支持的数据源信息
2828

29-
想使用 Prometheus 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖
29+
想使用 GraphQL 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖
3030

3131
| 数据源 | 支持版本 | 依赖 |
3232
| ------ | --------- | ------------------------------------------------------------ |
33-
| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-prometheus) |
33+
| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-http) |
3434

3535
## 接收器选项
3636

@@ -46,11 +46,6 @@ import ChangeLog from '../changelog/connector-graphql.md';
4646
| retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed |
4747
| connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. |
4848
| socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. |
49-
| key_timestamp | Int | NO | - | prometheus timestamp key . |
50-
| key_label | String | yes | - | prometheus label key |
51-
| key_value | Double | yes | - | prometheus value |
52-
| batch_size | Int | false | 1024 | prometheus batch size write |
53-
| flush_interval | Long | false | 300000L | prometheus flush commit interval |
5449
| common-options | | No | - | Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details |
5550

5651
## 示例

docs/zh/connector-v2/source/GraphQL.md

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import ChangeLog from '../changelog/connector-graphql.md';
1111
## 主要特性
1212

1313
- [x] [批处理](../../concept/connector-v2-features.md)
14-
- [ ] [流处理](../../concept/connector-v2-features.md)
14+
- [x] [流处理](../../concept/connector-v2-features.md)
1515
- [ ] [并行](../../concept/connector-v2-features.md)
1616

1717
## 源选项
@@ -25,7 +25,6 @@ import ChangeLog from '../changelog/connector-graphql.md';
2525
| timeout | Long | No | - |
2626
| content_field | String | Yes | $.data.{query_object}.* |
2727
| schema.fields | Config | Yes | - |
28-
| format | String | No | json |
2928
| params | Map | Yes | - |
3029
| poll_interval_millis | int | No | - |
3130
| retry | int | No | - |
@@ -56,8 +55,8 @@ variables = {
5655

5756
### enable_subscription [boolean]
5857

59-
1. true : 构建一个套接字读取器来订阅GraphQL服务
60-
2. false : 构建GraphQL服务的http阅读器订阅
58+
1. true : 开启流式订阅模式(WebSocket)
59+
2. false : 开启批处理查询模式(HTTP)
6160

6261
### timeout [Long]
6362

@@ -87,10 +86,6 @@ HTTP请求参数
8786

8887
如果http请求失败,最大重试回退时间(毫秒)
8988

90-
### format [String]
91-
92-
上游数据的格式,默认为json。
93-
9489
### schema [Config]
9590

9691
填写一个固定值
@@ -122,7 +117,6 @@ HTTP请求参数
122117
source {
123118
GraphQL {
124119
url = "http://192.168.1.103:9081/v1/graphql"
125-
format = "json"
126120
content_field = "$.data.source"
127121
query = """
128122
query MyQuery($limit: Int) {
@@ -155,7 +149,6 @@ source {
155149
source {
156150
GraphQL {
157151
url = "http://192.168.1.103:9081/v1/graphql"
158-
format = "json"
159152
content_field = "$.data.source"
160153
query = """
161154
query MyQuery($limit: Int) {

seatunnel-connectors-v2/connector-graphql/src/main/java/org/apache/seatunnel/connectors/seatunnel/graphql/source/GraphQLSource.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,22 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.graphql.source;
1919

20+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21+
2022
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
23+
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
2124
import org.apache.seatunnel.api.source.Boundedness;
25+
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
2226
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2327
import org.apache.seatunnel.common.constants.JobMode;
2428
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
2529
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
2630
import org.apache.seatunnel.connectors.seatunnel.graphql.config.GraphQLSourceParameter;
2731
import org.apache.seatunnel.connectors.seatunnel.graphql.source.reader.GraphQLSourceHttpReader;
2832
import org.apache.seatunnel.connectors.seatunnel.graphql.source.reader.GraphQLSourceSocketReader;
33+
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpSourceOptions;
2934
import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource;
35+
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
3036

3137
import lombok.extern.slf4j.Slf4j;
3238

@@ -49,7 +55,17 @@ public String getPluginName() {
4955

5056
@Override
5157
protected void buildSchemaWithConfig(ReadonlyConfig pluginConfig) {
52-
super.buildSchemaWithConfig(pluginConfig);
58+
if (pluginConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
59+
this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
60+
this.deserializationSchema = new JsonDeserializationSchema(catalogTable, false, false);
61+
Config config = pluginConfig.toConfig();
62+
if (config.hasPath(HttpSourceOptions.JSON_FIELD.key())) {
63+
jsonField = getJsonField(config.getConfig(HttpSourceOptions.JSON_FIELD.key()));
64+
}
65+
if (config.hasPath(HttpSourceOptions.CONTENT_FIELD.key())) {
66+
contentField = config.getString(HttpSourceOptions.CONTENT_FIELD.key());
67+
}
68+
}
5369
}
5470

5571
@Override

seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public AbstractSingleSplitReader<SeaTunnelRow> createReader(
197197
pageInfo);
198198
}
199199

200-
private JsonField getJsonField(Config jsonFieldConf) {
200+
protected JsonField getJsonField(Config jsonFieldConf) {
201201
ConfigRenderOptions options = ConfigRenderOptions.concise();
202202
return JsonField.builder()
203203
.fields(JsonUtils.toMap(jsonFieldConf.root().render(options)))

0 commit comments

Comments
 (0)