Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions docs/en/connector-v2/sink/GraphQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ Used to launch web hooks using data.

## Supported DataSource Info

In order to use the Http connector, the following dependencies are required.
In order to use the GraphQL connector, the following dependencies are required.
They can be downloaded via install-plugin.sh or from the Maven central repository.

| Datasource | Supported Versions | Dependency |
|------------|--------------------|------------------------------------------------------------------------------------------------------------------|
| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-prometheus) |
| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-http) |

## Sink Options

Expand All @@ -47,11 +47,6 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
| retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed |
| connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. |
| socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. |
| key_timestamp | Int | NO | - | prometheus timestamp key . |
| key_label | String | yes | - | prometheus label key |
| key_value | Double | yes | - | prometheus value |
| batch_size | Int | false | 1024 | prometheus batch size write |
| flush_interval | Long | false | 300000L | prometheus flush commit interval |
| common-options | | No | - | Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details |

## Example
Expand Down
11 changes: 2 additions & 9 deletions docs/en/connector-v2/source/GraphQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ Used to read data from GraphQL.
| timeout | Long | No | - |
| content_field | String | Yes | $.data.{query_object}.* |
| schema.fields | Config | Yes | - |
| format | String | No | json |
| params | Map | Yes | - |
| poll_interval_millis | int | No | - |
| retry | int | No | - |
Expand Down Expand Up @@ -56,8 +55,8 @@ variables = {

### enable_subscription [boolean]

1. true : Build a socket reader to subscribe to the GraphQL service
2. false : Build an http reader subscription to the GraphQL service
1. true : Enable streaming subscription mode (WebSocket)
2. false : Enable batch query mode (HTTP)

### timeout [Long]

Expand Down Expand Up @@ -87,10 +86,6 @@ The retry-backoff times(millis) multiplier if request http failed

The maximum retry-backoff times(millis) if request http failed

### format [String]

the format of upstream data, default `json`.

### schema [Config]

Fill in a fixed value
Expand Down Expand Up @@ -122,7 +117,6 @@ Source plugin common parameters, please refer to [Source Common Options](../sour
source {
GraphQL {
url = "http://192.168.1.103:9081/v1/graphql"
format = "json"
content_field = "$.data.source"
query = """
query MyQuery($limit: Int) {
Expand Down Expand Up @@ -155,7 +149,6 @@ source {
source {
GraphQL {
url = "http://192.168.1.103:9081/v1/graphql"
format = "json"
content_field = "$.data.source"
query = """
query MyQuery($limit: Int) {
Expand Down
9 changes: 2 additions & 7 deletions docs/zh/connector-v2/sink/GraphQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import ChangeLog from '../changelog/connector-graphql.md';

## 支持的数据源信息

想使用 Prometheus 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖
想使用 GraphQL 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖

| 数据源 | 支持版本 | 依赖 |
| ------ | --------- | ------------------------------------------------------------ |
| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-prometheus) |
| Http | universal | [Download](https://mvnrepository.com/artifact/org.apache.seatunnel/seatunnel-connectors-v2/connector-http) |

## 接收器选项

Expand All @@ -46,11 +46,6 @@ import ChangeLog from '../changelog/connector-graphql.md';
| retry_backoff_max_ms | Int | No | 10000 | The maximum retry-backoff times(millis) if request http failed |
| connect_timeout_ms | Int | No | 12000 | Connection timeout setting, default 12s. |
| socket_timeout_ms | Int | No | 60000 | Socket timeout setting, default 60s. |
| key_timestamp | Int | NO | - | prometheus timestamp key . |
| key_label | String | yes | - | prometheus label key |
| key_value | Double | yes | - | prometheus value |
| batch_size | Int | false | 1024 | prometheus batch size write |
| flush_interval | Long | false | 300000L | prometheus flush commit interval |
| common-options | | No | - | Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details |

## 示例
Expand Down
13 changes: 3 additions & 10 deletions docs/zh/connector-v2/source/GraphQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import ChangeLog from '../changelog/connector-graphql.md';
## 主要特性

- [x] [批处理](../../concept/connector-v2-features.md)
- [ ] [流处理](../../concept/connector-v2-features.md)
- [x] [流处理](../../concept/connector-v2-features.md)
- [ ] [并行](../../concept/connector-v2-features.md)

## 源选项
Expand All @@ -25,7 +25,6 @@ import ChangeLog from '../changelog/connector-graphql.md';
| timeout | Long | No | - |
| content_field | String | Yes | $.data.{query_object}.* |
| schema.fields | Config | Yes | - |
| format | String | No | json |
| params | Map | Yes | - |
| poll_interval_millis | int | No | - |
| retry | int | No | - |
Expand Down Expand Up @@ -56,8 +55,8 @@ variables = {

### enable_subscription [boolean]

1. true : 构建一个套接字读取器来订阅GraphQL服务
2. false : 构建GraphQL服务的http阅读器订阅
1. true : 开启流式订阅模式(WebSocket)
2. false : 开启批处理查询模式(HTTP)

### timeout [Long]

Expand Down Expand Up @@ -87,10 +86,6 @@ HTTP请求参数

如果http请求失败,最大重试回退时间(毫秒)

### format [String]

上游数据的格式,默认为json。

### schema [Config]

填写一个固定值
Expand Down Expand Up @@ -122,7 +117,6 @@ HTTP请求参数
source {
GraphQL {
url = "http://192.168.1.103:9081/v1/graphql"
format = "json"
content_field = "$.data.source"
query = """
query MyQuery($limit: Int) {
Expand Down Expand Up @@ -155,7 +149,6 @@ source {
source {
GraphQL {
url = "http://192.168.1.103:9081/v1/graphql"
format = "json"
content_field = "$.data.source"
query = """
query MyQuery($limit: Int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@

package org.apache.seatunnel.connectors.seatunnel.graphql.source;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.graphql.config.GraphQLSourceParameter;
import org.apache.seatunnel.connectors.seatunnel.graphql.source.reader.GraphQLSourceHttpReader;
import org.apache.seatunnel.connectors.seatunnel.graphql.source.reader.GraphQLSourceSocketReader;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -49,7 +58,24 @@ public String getPluginName() {

@Override
protected void buildSchemaWithConfig(ReadonlyConfig pluginConfig) {
super.buildSchemaWithConfig(pluginConfig);
if (pluginConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
this.deserializationSchema = new JsonDeserializationSchema(catalogTable, false, false);
Config config = pluginConfig.toConfig();
if (config.hasPath(HttpSourceOptions.JSON_FIELD.key())) {
jsonField = getJsonField(config.getConfig(HttpSourceOptions.JSON_FIELD.key()));
}
if (config.hasPath(HttpSourceOptions.CONTENT_FIELD.key())) {
contentField = config.getString(HttpSourceOptions.CONTENT_FIELD.key());
}
}
}

private JsonField getJsonField(Config jsonFieldConf) {
ConfigRenderOptions options = ConfigRenderOptions.concise();
return JsonField.builder()
.fields(JsonUtils.toMap(jsonFieldConf.root().render(options)))
.build();
}

@Override
Expand Down