Skip to content
This repository was archived by the owner on Dec 28, 2025. It is now read-only.

Commit ddd9238

Browse files
yzeng1618zengyi
andauthored
[Improve][Elasticsearch] Add LocalDateTime serialization test and simplify serializer (apache#10135)
Co-authored-by: zengyi <zengyi@chinatelecom.cn>
1 parent 3fc5917 commit ddd9238

File tree

2 files changed

+58
-27
lines changed

2 files changed

+58
-27
lines changed

seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545

4646
/** use in elasticsearch version >= 2.x and <= 8.x */
4747
public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer {
48+
4849
private final SeaTunnelRowType seaTunnelRowType;
4950
private final ObjectMapper objectMapper = new ObjectMapper();
5051

@@ -201,49 +202,49 @@ private Map<String, Object> toDocumentMap(SeaTunnelRow row, SeaTunnelRowType row
201202
}
202203

203204
private Object convertValue(String fieldName, Object value) {
205+
if (value == null) {
206+
return null;
207+
}
208+
204209
if (value instanceof Temporal) {
205210
// jackson not support jdk8 new time api
206211
return value.toString();
207-
} else if (value instanceof Map) {
212+
}
213+
214+
if (value instanceof Map) {
208215
for (Map.Entry<?, ?> entry : ((Map<?, ?>) value).entrySet()) {
209216
((Map) value).put(entry.getKey(), convertValue(fieldName, entry.getValue()));
210217
}
211218
return value;
212-
} else if (value instanceof List) {
219+
}
220+
221+
if (value instanceof List) {
213222
for (int i = 0; i < ((List) value).size(); i++) {
214223
((List) value).set(i, convertValue(fieldName, ((List) value).get(i)));
215224
}
216225
return value;
217-
} else if (value instanceof ByteBuffer) {
218-
// Check if this field is configured as a vectorization field
219-
if (vectorizationFields != null && vectorizationFields.contains(fieldName)) {
220-
ByteBuffer buffer = (ByteBuffer) value;
221-
Float[] floats = VectorUtils.toFloatArray(buffer);
222-
223-
// Use the configured dimension or calculate it from the buffer size
224-
int dimension = vectorDimension > 0 ? vectorDimension : buffer.remaining() / 4;
225-
226-
// Read the floats from the buffer
227-
for (int i = 0; i < dimension && buffer.remaining() >= 4; i++) {
228-
floats[i] = buffer.getFloat();
229-
}
226+
}
230227

231-
return floats;
232-
} else {
233-
// Default behavior for ByteBuffer fields not specified as vectorization fields
234-
ByteBuffer buffer = (ByteBuffer) value;
235-
Float[] floats = VectorUtils.toFloatArray(buffer);
236-
int floatCount = buffer.remaining() / 4;
228+
if (value instanceof ByteBuffer) {
229+
ByteBuffer buffer = (ByteBuffer) value;
230+
Float[] floats = VectorUtils.toFloatArray(buffer);
237231

238-
for (int i = 0; i < floatCount; i++) {
239-
floats[i] = buffer.getFloat();
240-
}
232+
// Use configured dimension for vectorization fields, otherwise calculate from buffer
233+
int dimension =
234+
(vectorizationFields != null
235+
&& vectorizationFields.contains(fieldName)
236+
&& vectorDimension > 0)
237+
? vectorDimension
238+
: buffer.remaining() / 4;
241239

242-
return floats;
240+
for (int i = 0; i < dimension && buffer.remaining() >= 4; i++) {
241+
floats[i] = buffer.getFloat();
243242
}
244-
} else {
245-
return value;
243+
244+
return floats;
246245
}
246+
247+
return value;
247248
}
248249

249250
private Map<String, String> createMetadata(@NonNull SeaTunnelRow row, @NonNull String key) {

seatunnel-connectors-v2/connector-elasticsearch/src/test/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializerTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.junit.jupiter.api.Assertions;
3232
import org.junit.jupiter.api.Test;
3333

34+
import java.time.LocalDateTime;
3435
import java.util.Arrays;
3536
import java.util.HashMap;
3637
import java.util.Map;
@@ -184,4 +185,33 @@ public void testSerializeDelete() {
184185
String upsertStr = serializer.serializeRow(row);
185186
Assertions.assertEquals(expected, upsertStr);
186187
}
188+
189+
@Test
190+
public void testSerializeLocalDateTimeFieldFormat() {
191+
String index = "st_index";
192+
Map<String, Object> confMap = new HashMap<>();
193+
confMap.put(ElasticsearchSinkOptions.INDEX.key(), index);
194+
195+
ReadonlyConfig pluginConf = ReadonlyConfig.fromMap(confMap);
196+
ElasticsearchClusterInfo clusterInfo =
197+
ElasticsearchClusterInfo.builder().clusterVersion("8.0.0").build();
198+
IndexInfo indexInfo = new IndexInfo(index, pluginConf);
199+
SeaTunnelRowType schema =
200+
new SeaTunnelRowType(
201+
new String[] {"id", "ts"},
202+
new SeaTunnelDataType[] {STRING_TYPE, STRING_TYPE});
203+
204+
final ElasticsearchRowSerializer serializer =
205+
new ElasticsearchRowSerializer(clusterInfo, indexInfo, schema);
206+
207+
String id = "0001";
208+
LocalDateTime ts = LocalDateTime.of(2023, 1, 2, 3, 4, 5);
209+
SeaTunnelRow row = new SeaTunnelRow(new Object[] {id, ts});
210+
row.setRowKind(RowKind.UPDATE_AFTER);
211+
212+
String result = serializer.serializeRow(row);
213+
Assertions.assertTrue(
214+
result.contains("\"ts\":\"2023-01-02T03:04:05\""),
215+
"LocalDateTime field should be formatted with ISO-8601 'T' separator");
216+
}
187217
}

0 commit comments

Comments
 (0)