Skip to content

Commit ec54bce

Browse files
Added Javadoc for the end-to-end test classes
Signed-off-by: Norman Jordan <norman.jordan@improving.com>
1 parent 0cd95f7 commit ec54bce

File tree

3 files changed

+113
-1
lines changed

3 files changed

+113
-1
lines changed

e2e-test/src/test/scala/org/opensearch/spark/e2e/EndToEndITSuite.scala

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import org.apache.spark.internal.Logging
2424
import org.apache.spark.sql.{Dataset, Row}
2525

2626
/**
27-
* Tests requiring the should extend OpenSearchSuite.
27+
* Tests queries for expected results on the integration test docker cluster. Queries can be run using
28+
* Spark Connect or the OpenSearch Async Query API.
2829
*/
2930
class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with BeforeAndAfterAll with SparkTrait with S3ClientTrait with Assertions with Logging {
3031
self: Suite =>
@@ -53,6 +54,9 @@ class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with Be
5354
S3_SECRET_KEY
5455
}
5556

57+
/**
58+
* Starts up the integration test docker cluster.
59+
*/
5660
override def beforeAll(): Unit = {
5761
logInfo("Starting docker cluster")
5862

@@ -83,6 +87,9 @@ class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with Be
8387
createIndices()
8488
}
8589

90+
/**
91+
* Shuts down the integration test docker cluster.
92+
*/
8693
override def afterAll(): Unit = {
8794
logInfo("Stopping docker cluster")
8895
waitForSparkSubmitCompletion()
@@ -99,6 +106,10 @@ class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with Be
99106
logInfo("Stopped docker cluster")
100107
}
101108

109+
/**
110+
* Wait for all Spark submit containers to finish. Spark submit containers are used for processing Async Query
111+
* API requests. Each Async Query API session will have at most one Spark submit container.
112+
*/
102113
def waitForSparkSubmitCompletion(): Unit = {
103114
val endTime = System.currentTimeMillis() + 300000
104115
while (System.currentTimeMillis() < endTime) {
@@ -127,6 +138,11 @@ class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with Be
127138
}
128139
}
129140

141+
/**
142+
* Creates Spark tables. Looks for parquet files in "e2e-test/src/test/resources/spark/tables".
143+
*
144+
* The tables are created as external tables with the data stored in the MinIO(S3) container.
145+
*/
130146
def createTables(): Unit = {
131147
try {
132148
val tablesDir = new File("e2e-test/src/test/resources/spark/tables")
@@ -146,6 +162,13 @@ class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with Be
146162
}
147163
}
148164

165+
/**
166+
* Creates OpenSearch indices. Looks for ".mapping.json" files in
167+
* "e2e-test/src/test/resources/opensearch/indices".
168+
*
169+
* An index is created using the mapping data in the ".mapping.json" file. If there is a similarly named
170+
* file with only a ".json" extension, then it is used to do a bulk import of data into the index.
171+
*/
149172
def createIndices(): Unit = {
150173
val indicesDir = new File("e2e-test/src/test/resources/opensearch/indices")
151174
val backend = HttpClientSyncBackend()
@@ -179,6 +202,13 @@ class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with Be
179202
})
180203
}
181204

205+
/**
206+
* Tests SQL queries on the "spark" container. Looks for ".sql" files in
207+
* "e2e-test/src/test/resources/spark/queries/sql".
208+
*
209+
* Uses Spark Connect to run the query on the "spark" container and compares the results to the expected results
210+
* in the corresponding ".results" file. The ".results" file is in CSV format with a header.
211+
*/
182212
it should "SQL Queries" in {
183213
val queriesDir = new File("e2e-test/src/test/resources/spark/queries/sql")
184214
val queriesTableData : ListBuffer[(String, String)] = new ListBuffer()
@@ -209,6 +239,13 @@ class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with Be
209239
}
210240
}
211241

242+
/**
243+
* Tests PPL queries on the "spark" container. Looks for ".ppl" files in
244+
* "e2e-test/src/test/resources/spark/queries/ppl".
245+
*
246+
* Uses Spark Connect to run the query on the "spark" container and compares the results to the expected results
247+
* in the corresponding ".results" file. The ".results" file is in CSV format with a header.
248+
*/
212249
it should "PPL Queries" in {
213250
val queriesDir = new File("e2e-test/src/test/resources/spark/queries/ppl")
214251
val queriesTableData : ListBuffer[(String, String)] = new ListBuffer()
@@ -239,6 +276,16 @@ class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with Be
239276
}
240277
}
241278

279+
/**
280+
* Tests SQL queries using the Async Query API. Looks for ".sql" files in
281+
* "e2e-test/src/test/resources/opensearch/queries/sql".
282+
*
283+
* Submits the query using a REST call to the Async Query API. Will wait until the results are available and
284+
* then compare them to the expected results in the corresponding ".results" file. The ".results" file is the
285+
* JSON response from fetching the results.
286+
*
287+
* All queries are tested using the same Async Query API session.
288+
*/
242289
it should "Async SQL Queries" in {
243290
var sessionId : String = null
244291
val backend = HttpClientSyncBackend()
@@ -274,6 +321,16 @@ class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with Be
274321
}
275322
}
276323

324+
/**
325+
* Tests SQL queries using the Async Query API. Looks for ".ppl" files in
326+
* "e2e-test/src/test/resources/opensearch/queries/ppl".
327+
*
328+
* Submits the query using a REST call to the Async Query API. Will wait until the results are available and
329+
* then compare them to the expected results in the corresponding ".results" file. The ".results" file is the
330+
* JSON response from fetching the results.
331+
*
332+
* All queries are tested using the same Async Query API session.
333+
*/
277334
it should "Async PPL Queries" in {
278335
var sessionId : String = null
279336
val backend = HttpClientSyncBackend()
@@ -309,6 +366,13 @@ class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with Be
309366
}
310367
}
311368

369+
/**
370+
* Retrieves the results from S3 of a query submitted using Spark Connect. The results are saved in S3 in CSV
371+
* format.
372+
*
373+
* @param s3Path S3 "folder" where the results were saved
374+
* @return CSV formatted results
375+
*/
312376
def getActualResults(s3Path : String): String = {
313377
val objectSummaries = getS3Client().listObjects("test-resources", s3Path).getObjectSummaries
314378
var jsonKey : String = null
@@ -337,6 +401,15 @@ class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with Be
337401
throw new Exception("Object not found")
338402
}
339403

404+
/**
405+
* Submits a request to the Async Query API to execute a query.
406+
*
407+
* @param language query language (either "ppl" or "sql")
408+
* @param query query to execute
409+
* @param sessionId Async Query API session to use (can be null)
410+
* @param backend sttp backend to use for submitting requests
411+
* @return sttp Response object of the submitted request
412+
*/
340413
def executeAsyncQuery(language: String, query: String, sessionId: String, backend: SttpBackend[Identity, Any]) : Identity[Response[Either[ResponseException[String, JsError], JsValue]]] = {
341414
var queryBody : String = null
342415
val escapedQuery = query.replaceAll("\n", "\\\\n")
@@ -354,6 +427,14 @@ class EndToEndITSuite extends AnyFlatSpec with TableDrivenPropertyChecks with Be
354427
.send(backend)
355428
}
356429

430+
/**
431+
* Retrieves the results of an Async Query API query. Will wait up to 30 seconds for the query to finish and make
432+
* the results available.
433+
*
434+
* @param queryId ID of the previously submitted query
435+
* @param backend sttp backend to use
436+
* @return results of the previously submitted query in JSON
437+
*/
357438
def getAsyncResults(queryId: String, backend: SttpBackend[Identity, Any]): JsValue = {
358439
val endTime = System.currentTimeMillis() + 30000
359440

e2e-test/src/test/scala/org/opensearch/spark/e2e/S3ClientTrait.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,32 @@ import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
99
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
1010
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
1111

12+
/**
13+
* Provides a method for obtaining an S3 client.
14+
*/
1215
trait S3ClientTrait {
1316
var s3Client : AmazonS3 = null
1417

18+
/**
19+
* Retrieves the S3 access key to use
20+
*
21+
* @return S3 access key to use
22+
*/
1523
def getS3AccessKey(): String
1624

25+
/**
26+
* Retrieves the S3 secret key to use
27+
*
28+
* @return S3 secret key to use
29+
*/
1730
def getS3SecretKey(): String
1831

32+
/**
33+
* Returns an S3 client. Constructs a new S3 client for use with the integration test docker cluster. Creates a
34+
* new S3 client first time this is called, otherwise the existing S3 client is returned.
35+
*
36+
* @return an S3 client
37+
*/
1938
def getS3Client(): AmazonS3 = {
2039
this.synchronized {
2140
if (s3Client == null) {

e2e-test/src/test/scala/org/opensearch/spark/e2e/SparkTrait.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,20 @@ import org.apache.spark.sql.SparkSession
1010
trait SparkTrait {
1111
var spark : SparkSession = null
1212

13+
/**
14+
* Retrieves the exposed port of Spark Connect on the "spark" container.
15+
*
16+
* @return Spark Connect port of the "spark" container
17+
*/
1318
def getSparkConnectPort(): Int
1419

20+
/**
21+
* Returns an SparkSession object. Constructs a new SparkSession object for use with the integration test docker
22+
* cluster "spark" container. Creates a new SparkSession first time this is called, otherwise the existing S3
23+
* client is returned.
24+
*
25+
* @return a SparkSession for use with the integration test docker cluster
26+
*/
1527
def getSparkSession(): SparkSession = {
1628
this.synchronized {
1729
if (spark == null) {

0 commit comments

Comments
 (0)