Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.

Commit 5f4e944

Browse files
authored
[Bug][Connector-V2][Email] Fix NPE on null values, add configurable attachment name and delimiter (apache#10112)
1 parent e7267ca commit 5f4e944

File tree

6 files changed

+168
-10
lines changed

6 files changed

+168
-10
lines changed

docs/en/connector-v2/sink/Email.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ The tested email version is 1.5.6.
2727
| email_authorization_code | string | no | - |
2828
| email_message_headline | string | yes | - |
2929
| email_message_content | string | yes | - |
30+
| email_attachment_name | string | no | emailsink.csv |
31+
| email_field_delimiter | string | no | , |
3032
| common-options | | no | - |
3133

3234
### email_from_address [string]
@@ -65,6 +67,14 @@ The subject line of the entire message.
6567

6668
The body of the entire message.
6769

70+
### email_attachment_name [string]
71+
72+
The name of the email attachment file. Default is `emailsink.csv`.
73+
74+
### email_field_delimiter [string]
75+
76+
The delimiter used to separate fields in the attachment file. Default is comma `,`.
77+
6878
### common options
6979

7080
Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details.
@@ -82,6 +92,8 @@ Sink plugin common parameters, please refer to [Sink Common Options](../sink-com
8292
email_authorization_code=""
8393
email_message_headline=""
8494
email_message_content=""
95+
email_attachment_name="report.csv" # Optional, default is emailsink.csv
96+
email_field_delimiter="|" # Optional, default is ,
8597
}
8698

8799
```

docs/zh/connector-v2/sink/Email.md

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ import ChangeLog from '../changelog/connector-email.md';
2525
| email_host | string || - |
2626
| email_transport_protocol | string || - |
2727
| email_smtp_auth | boolean || - |
28-
| email_smtp_port | int || 465 |
29-
| email_authorization_code | string || - |
30-
| email_message_headline | string || - |
31-
| email_message_content | string || - |
32-
| common-options | || - |
28+
| email_smtp_port | int || 465 |
29+
| email_authorization_code | string || - |
30+
| email_message_headline | string || - |
31+
| email_message_content | string || - |
32+
| email_attachment_name | string || emailsink.csv |
33+
| email_field_delimiter | string || , |
34+
| common-options | || - |
3335

3436
### email_from_address [string]
3537

@@ -67,6 +69,14 @@ import ChangeLog from '../changelog/connector-email.md';
6769

6870
邮件消息的正文
6971

72+
### email_attachment_name [string]
73+
74+
邮件附件的文件名。默认为 `emailsink.csv`
75+
76+
### email_field_delimiter [string]
77+
78+
附件文件中用于分隔字段的分隔符。默认为逗号 `,`
79+
7080
### common options
7181

7282
Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md) 了解详情.
@@ -84,6 +94,8 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
8494
email_authorization_code=""
8595
email_message_headline=""
8696
email_message_content=""
97+
email_attachment_name="report.csv" # 可选,默认为 emailsink.csv
98+
email_field_delimiter="|" # 可选,默认为 ,
8799
}
88100

89101
```

seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424

2525
import java.io.Serializable;
2626

27+
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_ATTACHMENT_NAME;
2728
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_AUTHORIZATION_CODE;
29+
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_FIELD_DELIMITER;
2830
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_FROM_ADDRESS;
2931
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_HOST;
3032
import static org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkOptions.EMAIL_MESSAGE_CONTENT;
@@ -45,6 +47,8 @@ public class EmailSinkConfig implements Serializable {
4547
private String emailTransportProtocol;
4648
private Boolean emailSmtpAuth;
4749
private Integer emailSmtpPort;
50+
private String emailAttachmentName;
51+
private String emailFieldDelimiter;
4852

4953
public EmailSinkConfig(@NonNull ReadonlyConfig pluginConfig) {
5054
super();
@@ -57,5 +61,7 @@ public EmailSinkConfig(@NonNull ReadonlyConfig pluginConfig) {
5761
this.emailTransportProtocol = pluginConfig.get(EMAIL_TRANSPORT_PROTOCOL);
5862
this.emailSmtpAuth = pluginConfig.get(EMAIL_SMTP_AUTH);
5963
this.emailSmtpPort = pluginConfig.get(EMAIL_SMTP_PORT);
64+
this.emailAttachmentName = pluginConfig.get(EMAIL_ATTACHMENT_NAME);
65+
this.emailFieldDelimiter = pluginConfig.get(EMAIL_FIELD_DELIMITER);
6066
}
6167
}

seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/config/EmailSinkOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,17 @@ public class EmailSinkOptions {
7474
.intType()
7575
.defaultValue(465)
7676
.withDescription("Select port for authentication.");
77+
78+
public static final Option<String> EMAIL_ATTACHMENT_NAME =
79+
Options.key("email_attachment_name")
80+
.stringType()
81+
.defaultValue("emailsink.csv")
82+
.withDescription("The name of the email attachment file");
83+
84+
public static final Option<String> EMAIL_FIELD_DELIMITER =
85+
Options.key("email_field_delimiter")
86+
.stringType()
87+
.defaultValue(",")
88+
.withDescription(
89+
"The delimiter used to separate fields in the attachment file");
7790
}

seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,26 +57,43 @@ public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
5757
private final SeaTunnelRowType seaTunnelRowType;
5858
private final EmailSinkConfig config;
5959
private StringBuffer stringBuffer;
60+
private boolean hasData;
6061

6162
public EmailSinkWriter(SeaTunnelRowType seaTunnelRowType, EmailSinkConfig pluginConfig) {
6263
this.seaTunnelRowType = seaTunnelRowType;
6364
this.config = pluginConfig;
6465
this.stringBuffer = new StringBuffer();
66+
this.hasData = false;
6567
}
6668

6769
@Override
6870
public void write(SeaTunnelRow element) {
6971
Object[] fields = element.getFields();
7072

71-
for (Object field : fields) {
72-
stringBuffer.append(field.toString() + ",");
73+
for (int i = 0; i < fields.length; i++) {
74+
Object field = fields[i];
75+
// Handle null field values to avoid NPE
76+
if (field == null) {
77+
stringBuffer.append("");
78+
} else {
79+
stringBuffer.append(field.toString());
80+
}
81+
if (i < fields.length - 1) {
82+
stringBuffer.append(config.getEmailFieldDelimiter());
83+
}
7384
}
74-
stringBuffer.deleteCharAt(fields.length - 1);
7585
stringBuffer.append("\n");
86+
hasData = true;
7687
}
7788

7889
@Override
7990
public void close() {
91+
// Only send email if there was data written successfully
92+
if (!hasData) {
93+
log.info("No data to send, skipping email");
94+
return;
95+
}
96+
8097
createFile();
8198
Properties properties = new Properties();
8299
properties.setProperty("mail.host", config.getEmailHost());
@@ -136,7 +153,7 @@ protected PasswordAuthentication getPasswordAuthentication() {
136153
multipart.addBodyPart(messageBodyPart);
137154
// accessory
138155
messageBodyPart = new MimeBodyPart();
139-
String filename = "emailsink.csv";
156+
String filename = config.getEmailAttachmentName();
140157
DataSource source = new FileDataSource(filename);
141158
messageBodyPart.setDataHandler(new DataHandler(source));
142159
messageBodyPart.setFileName(filename);
@@ -153,7 +170,7 @@ protected PasswordAuthentication getPasswordAuthentication() {
153170
}
154171

155172
public void createFile() {
156-
String fileName = "emailsink.csv";
173+
String fileName = config.getEmailAttachmentName();
157174
try {
158175
String data = stringBuffer.toString();
159176
File file = new File(fileName);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.email;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.api.table.type.BasicType;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
24+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
25+
import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig;
26+
import org.apache.seatunnel.connectors.seatunnel.email.sink.EmailSinkWriter;
27+
28+
import org.junit.jupiter.api.Assertions;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.util.HashMap;
32+
import java.util.Map;
33+
34+
public class EmailSinkWriterTest {
35+
36+
@Test
37+
void testWriteWithNullValues() {
38+
// Create a mock config
39+
Map<String, Object> configMap = new HashMap<>();
40+
configMap.put("email_from_address", "test@example.com");
41+
configMap.put("email_to_address", "receiver@example.com");
42+
configMap.put("email_authorization_code", "code");
43+
configMap.put("email_message_headline", "Test");
44+
configMap.put("email_message_content", "Test content");
45+
configMap.put("email_host", "smtp.example.com");
46+
configMap.put("email_transport_protocol", "smtp");
47+
configMap.put("email_smtp_auth", true);
48+
configMap.put("email_smtp_port", 465);
49+
configMap.put("email_attachment_name", "test.csv");
50+
configMap.put("email_field_delimiter", ",");
51+
52+
ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
53+
EmailSinkConfig sinkConfig = new EmailSinkConfig(config);
54+
55+
// Create row type with string fields
56+
String[] fieldNames = {"field1", "field2", "field3"};
57+
SeaTunnelDataType<?>[] fieldTypes = {
58+
BasicType.STRING_TYPE, BasicType.STRING_TYPE, BasicType.STRING_TYPE
59+
};
60+
SeaTunnelRowType rowType = new SeaTunnelRowType(fieldNames, fieldTypes);
61+
62+
// Create writer
63+
EmailSinkWriter writer = new EmailSinkWriter(rowType, sinkConfig);
64+
65+
// Test writing row with null values - should not throw NPE
66+
SeaTunnelRow row = new SeaTunnelRow(new Object[] {"value1", null, "value3"});
67+
68+
Assertions.assertDoesNotThrow(() -> writer.write(row));
69+
70+
// Test writing row with all null values - should not throw NPE
71+
SeaTunnelRow nullRow = new SeaTunnelRow(new Object[] {null, null, null});
72+
73+
Assertions.assertDoesNotThrow(() -> writer.write(nullRow));
74+
}
75+
76+
@Test
77+
void testCustomDelimiter() {
78+
// Create a mock config with custom delimiter
79+
Map<String, Object> configMap = new HashMap<>();
80+
configMap.put("email_from_address", "test@example.com");
81+
configMap.put("email_to_address", "receiver@example.com");
82+
configMap.put("email_authorization_code", "code");
83+
configMap.put("email_message_headline", "Test");
84+
configMap.put("email_message_content", "Test content");
85+
configMap.put("email_host", "smtp.example.com");
86+
configMap.put("email_transport_protocol", "smtp");
87+
configMap.put("email_smtp_auth", true);
88+
configMap.put("email_smtp_port", 465);
89+
configMap.put("email_attachment_name", "test.csv");
90+
configMap.put("email_field_delimiter", "|");
91+
92+
ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
93+
EmailSinkConfig sinkConfig = new EmailSinkConfig(config);
94+
95+
Assertions.assertEquals("|", sinkConfig.getEmailFieldDelimiter());
96+
Assertions.assertEquals("test.csv", sinkConfig.getEmailAttachmentName());
97+
}
98+
}

0 commit comments

Comments
 (0)