diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml index 8cf12e3bd..2df6bff77 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/pom.xml @@ -1,55 +1,62 @@ - - + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + + - org.apache.geaflow geaflow-dsl-connector + org.apache.geaflow 0.6.8-SNAPSHOT - 4.0.0 - geaflow-dsl-connector-elasticsearch - geaflow-dsl-connector-elasticsearch + 4.0.0 - 7.17.10 - + 7.17.9 + > + + geaflow-dsl-connector-elasticsearch org.apache.geaflow geaflow-dsl-common + org.apache.geaflow geaflow-dsl-connector-api + org.elasticsearch.client elasticsearch-rest-high-level-client ${elasticsearch.version} + + com.fasterxml.jackson.core + jackson-databind + + org.testng testng @@ -57,5 +64,4 @@ test - diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchConfigKeys.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchConfigKeys.java new file mode 100644 index 000000000..583624a83 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchConfigKeys.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.elasticsearch; + +import org.apache.geaflow.common.config.ConfigKey; +import org.apache.geaflow.common.config.ConfigKeys; + +public class ElasticSearchConfigKeys { + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_HOSTS = ConfigKeys + .key("geaflow.dsl.elasticsearch.hosts") + .noDefaultValue() + .description("ElasticSearch hosts, separated by comma."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_INDEX = ConfigKeys + .key("geaflow.dsl.elasticsearch.index") + .noDefaultValue() + .description("ElasticSearch index name."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_USERNAME = ConfigKeys + .key("geaflow.dsl.elasticsearch.username") + .noDefaultValue() + .description("ElasticSearch username."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_PASSWORD = ConfigKeys + .key("geaflow.dsl.elasticsearch.password") + .noDefaultValue() + .description("ElasticSearch password."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_FLUSH_SIZE = ConfigKeys + .key("geaflow.dsl.elasticsearch.flush.size") + .defaultValue(100) + .description("Number of records to flush at once."); + + public static final ConfigKey GEAFLOW_DSL_ELASTICSEARCH_RETRY_TIMES = ConfigKeys + .key("geaflow.dsl.elasticsearch.retry.times") + .defaultValue(3) + .description("Number of retry times when write failed."); +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnector.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnector.java new file mode 100644 index 000000000..08f9c42c8 --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnector.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.elasticsearch; + +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.dsl.connector.api.TableSink; +import org.apache.geaflow.dsl.connector.api.TableWritableConnector; + +public class ElasticSearchTableConnector implements TableWritableConnector { + + public static final String TYPE = "ELASTICSEARCH"; + + @Override + public String getType() { + return TYPE; + } + + @Override + public TableSink createSink(Configuration conf) { + return new ElasticSearchTableSink(); + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSink.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSink.java new file mode 100644 index 000000000..9f6e311eb --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableSink.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.elasticsearch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.geaflow.api.context.RuntimeContext; +import org.apache.geaflow.common.config.Configuration; +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException; +import org.apache.geaflow.dsl.common.types.StructType; +import org.apache.geaflow.dsl.connector.api.TableSink; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.xcontent.XContentType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticSearchTableSink implements TableSink { + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchTableSink.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private Configuration tableConf; + private StructType schema; + private String[] hosts; + private String index; + private String username; + private String password; + private int flushSize; + private int retryTimes; + + private transient RestHighLevelClient client; + private transient List batchRequests; + + @Override + public void init(Configuration tableConf, StructType schema) { + LOGGER.info("Initializing ElasticSearch sink with config: {}, schema: {}", tableConf, schema); + this.tableConf = tableConf; + this.schema = schema; + + String hostsStr = tableConf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS); + this.hosts = hostsStr.split(","); + this.index = tableConf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX); + this.username = tableConf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME, (String) null); + this.password = tableConf.getString(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD, (String) null); + this.flushSize = tableConf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_FLUSH_SIZE); + this.retryTimes = tableConf.getInteger(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_RETRY_TIMES); + } + + @Override + public void open(RuntimeContext context) { + LOGGER.info("Opening ElasticSearch sink"); + try { + // Create ElasticSearch client + HttpHost[] httpHosts = new HttpHost[hosts.length]; + for (int i = 0; i < hosts.length; i++) { + httpHosts[i] = HttpHost.create(hosts[i]); + } + + RestClientBuilder builder = RestClient.builder(httpHosts); + + if (username != null && password != null) { + // 设置认证信息 + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(username, password)); + + builder.setHttpClientConfigCallback(httpClientBuilder -> + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); + + } + + client = new RestHighLevelClient(builder); + batchRequests = new ArrayList<>(); + } catch (Exception e) { + throw new GeaFlowDSLException("Failed to create ElasticSearch client", e); + } + } + + @Override + public void write(Row row) throws IOException { + // Convert row to JSON document + Map document = new HashMap<>(); + for (int i = 0; i < schema.size(); i++) { + document.put(schema.getField(i).getName(), row.getField(i, schema.getField(i).getType())); + } + + // Create index request + String jsonDoc = OBJECT_MAPPER.writeValueAsString(document); + IndexRequest request = new IndexRequest(index) + .source(jsonDoc, XContentType.JSON); + + batchRequests.add(request); + + // Flush if batch size reached + if (batchRequests.size() >= flushSize) { + flush(); + } + } + + @Override + public void finish() throws IOException { + flush(); + } + + @Override + public void close() { + LOGGER.info("Closing ElasticSearch sink"); + try { + flush(); + } catch (IOException e) { + LOGGER.error("Failed to flush remaining records", e); + } + + if (client != null) { + try { + client.close(); + } catch (IOException e) { + LOGGER.error("Failed to close ElasticSearch client", e); + } + } + } + + private void flush() throws IOException { + if (batchRequests.isEmpty()) { + return; + } + + LOGGER.info("Flushing {} records to ElasticSearch", batchRequests.size()); + + BulkRequest bulkRequest = new BulkRequest(); + for (IndexRequest request : batchRequests) { + bulkRequest.add(request); + } + + boolean success = false; + Exception lastException = null; + + for (int i = 0; i <= retryTimes; i++) { + try { + BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT); + if (!bulkResponse.hasFailures()) { + success = true; + break; + } else { + lastException = new IOException("Bulk request failed: " + bulkResponse.buildFailureMessage()); + } + } catch (Exception e) { + lastException = e; + LOGGER.warn("Failed to execute bulk request, attempt {}/{}", i + 1, retryTimes + 1, e); + + if (i < retryTimes) { + // Wait before retry + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting for retry", ie); + } + } + } + } + + if (!success) { + batchRequests.clear(); + throw new IOException("Failed to execute bulk request after " + (retryTimes + 1) + " attempts", lastException); + } + + batchRequests.clear(); + LOGGER.info("Successfully flushed records to ElasticSearch"); + } +} diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector index 93f5ad880..9ccf1de3d 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/main/resources/META-INF/services/org.apache.geaflow.dsl.connector.api.TableConnector @@ -16,4 +16,6 @@ # specific language governing permissions and limitations # under the License. # -org.apache.geaflow.dsl.connector.elasticsearch.ElasticsearchTableConnector + +org.apache.geaflow.dsl.connector.elasticsearch.ElasticSearchTableConnector + diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnectorTest.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnectorTest.java new file mode 100644 index 000000000..a8e5bd9de --- /dev/null +++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-elasticsearch/src/test/java/org/apache/geaflow/dsl/connector/elasticsearch/ElasticSearchTableConnectorTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.geaflow.dsl.connector.elasticsearch; + + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import org.apache.geaflow.dsl.common.data.impl.ObjectRow; +import org.apache.geaflow.dsl.common.data.Row; +import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException; +import org.apache.geaflow.dsl.common.types.StructType; +import org.apache.geaflow.dsl.common.types.TableField; +import org.apache.geaflow.common.type.Types; +import org.apache.geaflow.dsl.connector.api.TableSink; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RestHighLevelClient; +import org.junit.Assert; +import org.junit.Test; +import org.apache.geaflow.common.config.Configuration; + + +public class ElasticSearchTableConnectorTest { + + @Test + public void testElasticSearchConnector() { + ElasticSearchTableConnector connector = new ElasticSearchTableConnector(); + Assert.assertEquals(connector.getType(), "ELASTICSEARCH"); + + // Test that we can create a sink + Configuration conf = new Configuration(); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS.getKey(), "http://127.0.0.1:9200"); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX.getKey(), "test"); + + TableSink sink = connector.createSink(conf); + Assert.assertNotNull(sink); + Assert.assertEquals(ElasticSearchTableSink.class, sink.getClass()); + } + + @Test + public void testElasticSearchConfigKeys() { + // Test that config keys are properly defined + Assert.assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS); + Assert.assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX); + Assert.assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME); + Assert.assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD); + Assert.assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_FLUSH_SIZE); + Assert.assertNotNull(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_RETRY_TIMES); + + // Test default values + Assert.assertEquals(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_FLUSH_SIZE.getDefaultValue(), Integer.valueOf(100)); + Assert.assertEquals(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_RETRY_TIMES.getDefaultValue(), Integer.valueOf(3)); + } + + @Test + public void testElasticSearchTableSinkInit() throws NoSuchFieldException, IllegalAccessException { + ElasticSearchTableSink sink = new ElasticSearchTableSink(); + + Configuration conf = new Configuration(); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS.getKey(), "http://127.0.0.1:9200"); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX.getKey(), "test"); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_USERNAME.getKey(), "zhangsan"); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_PASSWORD, "123"); + + List fields = new ArrayList<>(); + fields.add(new TableField("id", Types.INTEGER, true)); + fields.add(new TableField("name", Types.STRING, true)); + StructType schema = new StructType(fields); + + // This should not throw an exception + sink.init(conf, schema); + + Field hosts = sink.getClass().getDeclaredField("hosts"); + hosts.setAccessible(true); + String[] hostsStr = (String[]) hosts.get(sink);; + Field index = sink.getClass().getDeclaredField("index"); + index.setAccessible(true); + Field username = sink.getClass().getDeclaredField("username"); + username.setAccessible(true); + Field password = sink.getClass().getDeclaredField("password"); + password.setAccessible(true); + + + // Verify the configuration was set correctly + Assert.assertEquals(ElasticSearchTableSink.class, sink.getClass()); + Assert.assertEquals("http://127.0.0.1:9200", hostsStr[0]); + Assert.assertEquals("test",index.get(sink)); + Assert.assertEquals("zhangsan",username.get(sink)); + Assert.assertEquals("123",password.get(sink)); + } + + @Test + public void testElasticSearchTableSinkOpen() { + ElasticSearchTableSink sink = new ElasticSearchTableSink(); + + Configuration conf = new Configuration(); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS.getKey(), "http://127.0.0.1:9200"); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX.getKey(), "test"); + + List fields = new ArrayList<>(); + fields.add(new TableField("id", Types.INTEGER, true)); + fields.add(new TableField("name", Types.STRING, true)); + StructType schema = new StructType(fields); + + // Initialize the sink + sink.init(conf, schema); + + // Test open method - verify it doesn't throw unexpected exceptions + try { + sink.open(null); + Field client = sink.getClass().getDeclaredField("client"); + client.setAccessible(true); + RestHighLevelClient restHighLevelClient = (RestHighLevelClient) client.get(sink); + Assert.assertNotNull(restHighLevelClient); + // No exception means the client initialized successfully (or mock is working) + } catch (Exception e) { + // Verify the exception is equal to define + Assert.assertEquals(GeaFlowDSLException.class, e.getClass()); + } + } + + @Test + public void testElasticSearchTableSinkWrite() { + ElasticSearchTableSink sink = new ElasticSearchTableSink(); + + Configuration conf = new Configuration(); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS.getKey(), "http://127.0.0.1:9200"); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX.getKey(), "test"); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_FLUSH_SIZE.getKey(), "1"); // Small flush size for testing + + List fields = new ArrayList<>(); + fields.add(new TableField("id", Types.INTEGER, true)); + fields.add(new TableField("name", Types.STRING, true)); + StructType schema = new StructType(fields); + + // Initialize the sink + sink.init(conf, schema); + + // Initialize the client and batchRequests + sink.open(null); + + // Create a proper row that matches the schema + Row row = ObjectRow.create(1, "张三"); + + // Test write method - verify it doesn't throw unexpected exceptions + try { + sink.write(row); + } catch (Exception e) { + // Verify the exception is equal to define + Assert.assertEquals("Failed to execute bulk request after 4 attempts", e.getMessage()); + + // Verify the cause is connection-related,because we don't have a real Elasticsearch server + Assert.assertEquals("java.util.concurrent.ExecutionException: java.net.ConnectException: Connection refused",e.getCause().getMessage()); + } + } + + @Test + public void testElasticSearchTableSinkWriteWithMultipleRows() { + ElasticSearchTableSink sink = new ElasticSearchTableSink(); + + Configuration conf = new Configuration(); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_HOSTS.getKey(), "http://127.0.0.1:9200"); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_INDEX.getKey(), "test"); + conf.put(ElasticSearchConfigKeys.GEAFLOW_DSL_ELASTICSEARCH_FLUSH_SIZE.getKey(), "3"); // Flush after 3 rows + + List fields = new ArrayList<>(); + fields.add(new TableField("id", Types.INTEGER, true)); + fields.add(new TableField("name", Types.STRING, true)); + StructType schema = new StructType(fields); + + // Initialize the sink + sink.init(conf, schema); + + // Initialize the client and batchRequests + sink.open(null); + + // Create multiple proper rows that match the schema + Row row1 = ObjectRow.create(1, "张三"); + Row row2 = ObjectRow.create(2, "李四"); + Row row3 = ObjectRow.create(3, "王五"); + + // Test write method with multiple rows + try { + sink.write(row1); + sink.write(row2); + + // Verify batchRequests contains 2 requests, flush when up to 3 + Field batchRequests = sink.getClass().getDeclaredField("batchRequests"); + batchRequests.setAccessible(true); + List batchRequestsList = (List) batchRequests.get(sink); + Assert.assertEquals(2, batchRequestsList.size()); + + sink.write(row3); + } catch (Exception e) { + // Verify the exception is equal to define + Assert.assertEquals("Failed to execute bulk request after 4 attempts", e.getMessage()); + + // Verify the cause is connection-related,because we don't have a real Elasticsearch server + Assert.assertEquals("java.util.concurrent.ExecutionException: java.net.ConnectException: Connection refused",e.getCause().getMessage()); + } + } +} \ No newline at end of file