Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/en/connector-v2/sink/Email.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ The tested email version is 1.5.6.
| email_authorization_code | string | no | - |
| email_message_headline | string | yes | - |
| email_message_content | string | yes | - |
| email_attachment_name | string | no | emailsink.csv |
| email_field_delimiter | string | no | , |
| common-options | | no | - |

### email_from_address [string]
Expand Down Expand Up @@ -65,6 +67,14 @@ The subject line of the entire message.

The body of the entire message.

### email_attachment_name [string]

The name of the email attachment file. Default is `emailsink.csv`.

### email_field_delimiter [string]

The delimiter used to separate fields in the attachment file. Default is comma `,`.

### common options

Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details.
Expand All @@ -82,6 +92,8 @@ Sink plugin common parameters, please refer to [Sink Common Options](../sink-com
email_authorization_code=""
email_message_headline=""
email_message_content=""
email_attachment_name="report.csv" # Optional, default is emailsink.csv
email_field_delimiter="|" # Optional, default is ,
}

```
Expand Down
36 changes: 35 additions & 1 deletion docs/en/connector-v2/sink/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Output data to hdfs file

| Name | Type | Required | Default | Description |
|---------------------------------------|---------|----------|--------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| fs.defaultFS | string | yes | - | The hadoop cluster address that start with `hdfs://`, for example: `hdfs://hadoopcluster` |
| fs.defaultFS | string | yes | - | Hadoop cluster address. Supports the following formats:<br/>- Standard HDFS: `hdfs://hadoopcluster` or `hdfs://namenode:9000`<br/>- ViewFS (Federated HDFS): `viewfs://mycluster`<br/>See ViewFS configuration example below. |
| path | string | yes | - | The target dir path is required. |
| tmp_path | string | yes | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a hdfs path. |
| hdfs_site_path | string | no | - | The path of `hdfs-site.xml`, used to load ha configuration of namenodes |
Expand Down Expand Up @@ -240,6 +240,40 @@ HdfsFile {
}
```

### ViewFS (Federated HDFS) Configuration Example

ViewFS allows you to unify multiple HDFS clusters or namespaces into a single logical namespace. This is very useful for HDFS Federation scenarios.

```hocon
HdfsFile {
fs.defaultFS = "viewfs://mycluster"
path = "/data/output"
file_format_type = "parquet"
hdfs_site_path = "/path/to/core-site.xml"
data_save_mode = "DROP_DATA"
}
```

Configure mount table in `core-site.xml`:

```xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>fs.viewfs.mounttable.mycluster.link./data</name>
<value>hdfs://namenode1:9000/data</value>
</property>
<property>
<name>fs.viewfs.mounttable.mycluster.link./logs</name>
<value>hdfs://namenode2:9000/logs</value>
</property>
<property>
<name>fs.viewfs.mounttable.mycluster.link./tmp</name>
<value>hdfs://namenode3:9000/tmp</value>
</property>
</configuration>
```

## Changelog

<ChangeLog />
22 changes: 17 additions & 5 deletions docs/zh/connector-v2/sink/Email.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import ChangeLog from '../changelog/connector-email.md';
| email_host | string | 是 | - |
| email_transport_protocol | string | 是 | - |
| email_smtp_auth | boolean | 是 | - |
| email_smtp_port | int | 否 | 465 |
| email_authorization_code | string | 否 | - |
| email_message_headline | string | 是 | - |
| email_message_content | string | 是 | - |
| common-options | | 否 | - |
| email_smtp_port | int | 否 | 465 |
| email_authorization_code | string | 否 | - |
| email_message_headline | string | 是 | - |
| email_message_content | string | 是 | - |
| email_attachment_name | string | 否 | emailsink.csv |
| email_field_delimiter | string | 否 | , |
| common-options | | 否 | - |

### email_from_address [string]

Expand Down Expand Up @@ -67,6 +69,14 @@ import ChangeLog from '../changelog/connector-email.md';

邮件消息的正文

### email_attachment_name [string]

邮件附件的文件名。默认为 `emailsink.csv`。

### email_field_delimiter [string]

附件文件中用于分隔字段的分隔符。默认为逗号 `,`。

### common options

Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md) 了解详情.
Expand All @@ -84,6 +94,8 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
email_authorization_code=""
email_message_headline=""
email_message_content=""
email_attachment_name="report.csv" # 可选,默认为 emailsink.csv
email_field_delimiter="|" # 可选,默认为 ,
}

```
Expand Down
37 changes: 36 additions & 1 deletion docs/zh/connector-v2/sink/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import ChangeLog from '../changelog/connector-file-hadoop.md';

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|----------------------------------|---------|------|--------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| fs.defaultFS | string | 是 | - | `hdfs://` 开头的 Hadoop 集群地址,例如:`hdfs://hadoopcluster` |
| fs.defaultFS | string | 是 | - | Hadoop 集群地址。支持以下格式:<br/>- 标准 HDFS:`hdfs://hadoopcluster` 或 `hdfs://namenode:9000`<br/>- ViewFS(联邦 HDFS):`viewfs://mycluster`<br/>详见下方 ViewFS 配置示例。 |
| path | string | 是 | - | 目标目录路径是必需的。 |
| tmp_path | string | 是 | /tmp/seatunnel | 结果文件将首先写入临时路径,然后使用 `mv` 命令将临时目录提交到目标目录。需要一个Hdfs路径。 |
| hdfs_site_path | string | 否 | - | `hdfs-site.xml` 的路径,用于加载 namenodes 的 ha 配置。 |
Expand Down Expand Up @@ -235,6 +235,41 @@ HdfsFile {
}
```

### ViewFS(联邦 HDFS)配置示例

ViewFS 允许您将多个 HDFS 集群或命名空间统一到一个逻辑命名空间中。这对于 HDFS 联邦(Federation)场景非常有用。

```
HdfsFile {
fs.defaultFS = "viewfs://mycluster"
path = "/data/output"
file_format_type = "parquet"
hdfs_site_path = "/path/to/core-site.xml"
data_save_mode = "DROP_DATA"
}
```

在 `core-site.xml` 中配置挂载表:

```xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- ViewFS mount table for mycluster -->
<property>
<name>fs.viewfs.mounttable.mycluster.link./data</name>
<value>hdfs://namenode1:9000/data</value>
</property>
<property>
<name>fs.viewfs.mounttable.mycluster.link./logs</name>
<value>hdfs://namenode2:9000/logs</value>
</property>
<property>
<name>fs.viewfs.mounttable.mycluster.link./tmp</name>
<value>hdfs://namenode3:9000/tmp</value>
</property>
</configuration>
```

## 变更日志

<ChangeLog />
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

import java.io.Serializable;

import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_ATTACHMENT_NAME;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_AUTHORIZATION_CODE;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_FROM_ADDRESS;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_HOST;
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_MESSAGE_CONTENT;
Expand All @@ -45,6 +47,8 @@ public class EmailSinkConfig implements Serializable {
private String emailTransportProtocol;
private Boolean emailSmtpAuth;
private Integer emailSmtpPort;
private String emailAttachmentName;
private String emailFieldDelimiter;

public EmailSinkConfig(@NonNull ReadonlyConfig pluginConfig) {
super();
Expand All @@ -57,5 +61,7 @@ public EmailSinkConfig(@NonNull ReadonlyConfig pluginConfig) {
this.emailTransportProtocol = pluginConfig.get(EMAIL_TRANSPORT_PROTOCOL);
this.emailSmtpAuth = pluginConfig.get(EMAIL_SMTP_AUTH);
this.emailSmtpPort = pluginConfig.get(EMAIL_SMTP_PORT);
this.emailAttachmentName = pluginConfig.get(EMAIL_ATTACHMENT_NAME);
this.emailFieldDelimiter = pluginConfig.get(EMAIL_FIELD_DELIMITER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,17 @@ public class EmailSinkOptions {
.intType()
.defaultValue(465)
.withDescription("Select port for authentication.");

public static final Option<String> EMAIL_ATTACHMENT_NAME =
Options.key("email_attachment_name")
.stringType()
.defaultValue("emailsink.csv")
.withDescription("The name of the email attachment file");

public static final Option<String> EMAIL_FIELD_DELIMITER =
Options.key("email_field_delimiter")
.stringType()
.defaultValue(",")
.withDescription(
"The delimiter used to separate fields in the attachment file");
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,43 @@ public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
private final SeaTunnelRowType seaTunnelRowType;
private final EmailSinkConfig config;
private StringBuffer stringBuffer;
private boolean hasData;

public EmailSinkWriter(SeaTunnelRowType seaTunnelRowType, EmailSinkConfig pluginConfig) {
this.seaTunnelRowType = seaTunnelRowType;
this.config = pluginConfig;
this.stringBuffer = new StringBuffer();
this.hasData = false;
}

@Override
public void write(SeaTunnelRow element) {
Object[] fields = element.getFields();

for (Object field : fields) {
stringBuffer.append(field.toString() + ",");
for (int i = 0; i < fields.length; i++) {
Object field = fields[i];
// Handle null field values to avoid NPE
if (field == null) {
stringBuffer.append("");
} else {
stringBuffer.append(field.toString());
}
if (i < fields.length - 1) {
stringBuffer.append(config.getEmailFieldDelimiter());
}
}
stringBuffer.deleteCharAt(fields.length - 1);
stringBuffer.append("\n");
hasData = true;
}

@Override
public void close() {
// Only send email if there was data written successfully
if (!hasData) {
log.info("No data to send, skipping email");
return;
}

createFile();
Properties properties = new Properties();
properties.setProperty("mail.host", config.getEmailHost());
Expand Down Expand Up @@ -136,7 +153,7 @@ protected PasswordAuthentication getPasswordAuthentication() {
multipart.addBodyPart(messageBodyPart);
// accessory
messageBodyPart = new MimeBodyPart();
String filename = "emailsink.csv";
String filename = config.getEmailAttachmentName();
DataSource source = new FileDataSource(filename);
messageBodyPart.setDataHandler(new DataHandler(source));
messageBodyPart.setFileName(filename);
Expand All @@ -153,7 +170,7 @@ protected PasswordAuthentication getPasswordAuthentication() {
}

public void createFile() {
String fileName = "emailsink.csv";
String fileName = config.getEmailAttachmentName();
try {
String data = stringBuffer.toString();
File file = new File(fileName);
Expand Down
Loading
Loading