2525import co .elastic .clients .elasticsearch ._helpers .esql .jdbc .ResultSetEsqlAdapter ;
2626import co .elastic .clients .elasticsearch ._helpers .esql .objects .ObjectsEsqlAdapter ;
2727import co .elastic .clients .json .jackson .JacksonJsonpMapper ;
28+ import co .elastic .clients .transport .rest5_client .Rest5ClientTransport ;
29+ import co .elastic .clients .transport .rest5_client .low_level .ESRequest ;
30+ import co .elastic .clients .transport .rest5_client .low_level .Rest5Client ;
2831import co .elastic .clients .transport .rest_client .RestClientTransport ;
2932import com .fasterxml .jackson .annotation .JsonIgnoreProperties ;
3033import com .fasterxml .jackson .databind .PropertyNamingStrategies ;
@@ -55,26 +58,42 @@ public class EsqlAdapterEndToEndTest extends Assertions {
5558 @ BeforeAll
5659 public static void setup () throws Exception {
5760 ElasticsearchClient global = ElasticsearchTestServer .global ().client ();
58- RestClient restClient = ((RestClientTransport ) global ._transport ()).restClient ();
59- esClient = new ElasticsearchClient (new RestClientTransport (restClient , new JacksonJsonpMapper ()));
61+ if (global ._transport () instanceof RestClientTransport ) {
62+ RestClient restClient = ((RestClientTransport ) global ._transport ()).restClient ();
63+ esClient = new ElasticsearchClient (new RestClientTransport (restClient , new JacksonJsonpMapper ()));
6064
61- esClient .indices ().delete (d -> d .index ("employees" ).ignoreUnavailable (true ));
65+ esClient .indices ().delete (d -> d .index ("employees" ).ignoreUnavailable (true ));
6266
63- Request request = new Request ("POST" , "/employees/_bulk?refresh=true" );
67+ Request request = new Request ("POST" , "/employees/_bulk?refresh=true" );
6468
65- InputStream resourceAsStream = EsqlAdapterTest .class .getResourceAsStream ("employees.ndjson" );
66- byte [] bytes = IOUtils .toByteArray (resourceAsStream );
67- request .setEntity (new ByteArrayEntity (bytes , ContentType .APPLICATION_JSON ));
69+ InputStream resourceAsStream = EsqlAdapterTest .class .getResourceAsStream ("employees.ndjson" );
70+ byte [] bytes = IOUtils .toByteArray (resourceAsStream );
71+ request .setEntity (new ByteArrayEntity (bytes , ContentType .APPLICATION_JSON ));
6872
69- restClient .performRequest (request );
73+ restClient .performRequest (request );
74+ } else if (global ._transport () instanceof Rest5ClientTransport ) {
75+ Rest5Client restClient = ((Rest5ClientTransport ) global ._transport ()).restClient ();
76+ esClient = new ElasticsearchClient (new Rest5ClientTransport (restClient , new JacksonJsonpMapper ()));
77+
78+ esClient .indices ().delete (d -> d .index ("employees" ).ignoreUnavailable (true ));
79+
80+ ESRequest request = new ESRequest ("POST" , "/employees/_bulk?refresh=true" );
81+
82+ InputStream resourceAsStream = EsqlAdapterTest .class .getResourceAsStream ("employees.ndjson" );
83+ byte [] bytes = IOUtils .toByteArray (resourceAsStream );
84+ request .setEntity (new org .apache .hc .core5 .http .io .entity .ByteArrayEntity (bytes , org .apache .hc .core5 .http .ContentType .APPLICATION_JSON ));
85+
86+ restClient .performRequest (request );
87+ }
7088 }
7189
7290 @ Test
7391 public void resultSetTest () throws Exception {
7492
7593 ResultSet rs = esClient .esql ().query (
7694 ResultSetEsqlAdapter .INSTANCE ,
77- "FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | SORT emp_no | LIMIT 300" ,
95+ "FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | " +
96+ "SORT emp_no | LIMIT 300" ,
7897 // Testing parameters. Note that FROM and LIMIT do not accept parameters
7998 "10042" , "10002"
8099 );
@@ -116,7 +135,8 @@ public void resultSetTest() throws Exception {
116135 public void objectsTest () throws Exception {
117136 Iterable <EmpData > result = esClient .esql ().query (
118137 ObjectsEsqlAdapter .of (EmpData .class ),
119- "FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | SORT emp_no | LIMIT 300" ,
138+ "FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | " +
139+ "SORT emp_no | LIMIT 300" ,
120140 // Testing parameters. Note that FROM and LIMIT do not accept parameters
121141 "10042" , "10002"
122142 );
@@ -152,12 +172,14 @@ public void objectsTest() throws Exception {
152172 @ Test
153173 public void asyncObjects () throws Exception {
154174
155- ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient (esClient ._transport (), esClient ._transportOptions ());
175+ ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient (esClient ._transport (),
176+ esClient ._transportOptions ());
156177
157178
158179 CompletableFuture <Iterable <EmpData >> future = asyncClient .esql ().query (
159180 ObjectsEsqlAdapter .of (EmpData .class ),
160- "FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | SORT emp_no | LIMIT 300" ,
181+ "FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | " +
182+ "SORT emp_no | LIMIT 300" ,
161183 // Testing parameters. Note that FROM and LIMIT do not accept parameters
162184 "10042" , "10002"
163185 );
@@ -169,7 +191,8 @@ public void asyncObjects() throws Exception {
169191 EmpData emp = it .next ();
170192 assertEquals ("10002" , emp .empNo );
171193 List <String > jobPositions = emp .jobPositions ;
172- // In addition to the value, this tests that single strings are correctly deserialized as a list
194+ // In addition to the value, this tests that single strings are correctly deserialized
195+ // as a list
173196 assertEquals (Arrays .asList ("Senior Team Lead" ), emp .jobPositions );
174197 }
175198
@@ -183,7 +206,8 @@ public void asyncObjects() throws Exception {
183206 assertTrue (emp .jobPositions .contains ("Junior Developer" ));
184207
185208 assertEquals ("1993-03-21T00:00:00Z[UTC]" ,
186- DateTimeFormatter .ISO_DATE_TIME .format (emp .hireDate .toInstant ().atZone (ZoneId .of ("UTC" )))
209+ DateTimeFormatter .ISO_DATE_TIME .format (emp .hireDate .toInstant ().atZone (ZoneId .of (
210+ "UTC" )))
187211 );
188212 }
189213
0 commit comments