Skip to content

Commit 02c714c

Browse files
codebase/using-amazon-athena-with-spring-boot-to-query-s3-data [BAEL 7931] (#16866)
* adding athena configuration beans * adding service layer to execute SQL queries in athena * adding SQL query controller * adding exception handling * adding SELECT SQL queries validation * logging exceptions * fix: index out of bound * adding athena initializer to execute init sql scripts * updating pom.xml * extract wait period as constant * update exception handling * updating codebase * converting result to domain model * renaming variable * renaming variable * making return type generic * fix: execute() invocation from AthenaInitializer * adding parameterized query code snippet
1 parent d02de89 commit 02c714c

File tree

13 files changed

+405
-1
lines changed

13 files changed

+405
-1
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
lombok.nonNull.exceptionType=IllegalArgumentException

aws-modules/amazon-athena/pom.xml

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<artifactId>amazon-athena</artifactId>
7+
<version>0.0.1</version>
8+
<packaging>jar</packaging>
9+
<name>amazon-athena</name>
10+
<description>codebase demonstrating the integration of Amazon Athena in Spring Boot to query data stored in a S3 bucket</description>
11+
12+
<parent>
13+
<groupId>com.baeldung</groupId>
14+
<artifactId>parent-boot-3</artifactId>
15+
<version>0.0.1-SNAPSHOT</version>
16+
<relativePath>../../parent-boot-3</relativePath>
17+
</parent>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.springframework.boot</groupId>
22+
<artifactId>spring-boot-starter-web</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.springframework.boot</groupId>
26+
<artifactId>spring-boot-starter-validation</artifactId>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.springframework.boot</groupId>
30+
<artifactId>spring-boot-configuration-processor</artifactId>
31+
</dependency>
32+
<dependency>
33+
<groupId>software.amazon.awssdk</groupId>
34+
<artifactId>athena</artifactId>
35+
<version>${amazon-athena.version}</version>
36+
</dependency>
37+
<dependency>
38+
<groupId>commons-io</groupId>
39+
<artifactId>commons-io</artifactId>
40+
<version>${commons-io.version}</version>
41+
</dependency>
42+
<dependency>
43+
<groupId>org.json</groupId>
44+
<artifactId>json</artifactId>
45+
<version>${org-json.version}</version>
46+
</dependency>
47+
<dependency>
48+
<groupId>com.fasterxml.jackson.datatype</groupId>
49+
<artifactId>jackson-datatype-json-org</artifactId>
50+
</dependency>
51+
<dependency>
52+
<groupId>org.projectlombok</groupId>
53+
<artifactId>lombok</artifactId>
54+
<optional>true</optional>
55+
</dependency>
56+
</dependencies>
57+
58+
<build>
59+
<plugins>
60+
<plugin>
61+
<groupId>org.springframework.boot</groupId>
62+
<artifactId>spring-boot-maven-plugin</artifactId>
63+
<configuration>
64+
<excludes>
65+
<exclude>
66+
<groupId>org.projectlombok</groupId>
67+
<artifactId>lombok</artifactId>
68+
</exclude>
69+
</excludes>
70+
</configuration>
71+
</plugin>
72+
</plugins>
73+
</build>
74+
75+
<properties>
76+
<java.version>17</java.version>
77+
<org-json.version>20240303</org-json.version>
78+
<commons-io.version>2.16.1</commons-io.version>
79+
<amazon-athena.version>2.26.0</amazon-athena.version>
80+
</properties>
81+
82+
</project>
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.baeldung.athena;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class Application {
8+
9+
public static void main(String[] args) {
10+
SpringApplication.run(Application.class, args);
11+
}
12+
13+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.baeldung.athena.configuration;
2+
3+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
4+
import org.springframework.context.annotation.Bean;
5+
import org.springframework.context.annotation.Configuration;
6+
7+
import lombok.RequiredArgsConstructor;
8+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
9+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
10+
import software.amazon.awssdk.services.athena.AthenaClient;
11+
import software.amazon.awssdk.services.athena.model.QueryExecutionContext;
12+
import software.amazon.awssdk.services.athena.model.ResultConfiguration;
13+
14+
@Configuration
15+
@RequiredArgsConstructor
16+
@EnableConfigurationProperties(AwsConfigurationProperties.class)
17+
public class AmazonAthenaConfiguration {
18+
19+
private final AwsConfigurationProperties awsConfigurationProperties;
20+
21+
@Bean
22+
public AthenaClient athenaClient() {
23+
return AthenaClient.builder()
24+
.credentialsProvider(constructCredentials())
25+
.build();
26+
}
27+
28+
@Bean
29+
public QueryExecutionContext queryExecutionContext() {
30+
final var database = awsConfigurationProperties.getAthena().getDatabase();
31+
return QueryExecutionContext.builder()
32+
.database(database)
33+
.build();
34+
}
35+
36+
@Bean
37+
public ResultConfiguration resultConfiguration() {
38+
final var outputLocation = awsConfigurationProperties.getAthena().getS3OutputLocation();
39+
return ResultConfiguration.builder()
40+
.outputLocation(outputLocation)
41+
.build();
42+
}
43+
44+
private StaticCredentialsProvider constructCredentials() {
45+
final var accessKey = awsConfigurationProperties.getAccessKey();
46+
final var secretKey = awsConfigurationProperties.getSecretKey();
47+
final var awsCredentials = AwsBasicCredentials.create(accessKey, secretKey);
48+
return StaticCredentialsProvider.create(awsCredentials);
49+
}
50+
51+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package com.baeldung.athena.configuration;
2+
3+
import org.springframework.boot.context.properties.ConfigurationProperties;
4+
import org.springframework.lang.Nullable;
5+
import org.springframework.validation.annotation.Validated;
6+
7+
import jakarta.validation.Valid;
8+
import jakarta.validation.constraints.NotBlank;
9+
import lombok.Getter;
10+
import lombok.Setter;
11+
12+
@Getter
13+
@Setter
14+
@Validated
15+
@ConfigurationProperties(prefix = "com.baeldung.aws")
16+
public class AwsConfigurationProperties {
17+
18+
@NotBlank(message = "AWS access key must be configured")
19+
private String accessKey;
20+
21+
@NotBlank(message = "AWS secret key must be configured")
22+
private String secretKey;
23+
24+
@Valid
25+
private Athena athena = new Athena();
26+
27+
@Getter
28+
@Setter
29+
public class Athena {
30+
31+
@Nullable
32+
private String database = "default";
33+
34+
@NotBlank(message = "S3 output location must be configured")
35+
private String s3OutputLocation;
36+
37+
}
38+
39+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.baeldung.athena.exception;
2+
3+
public class QueryExecutionFailureException extends RuntimeException {
4+
5+
private static final long serialVersionUID = 4359781704223584247L;
6+
7+
public QueryExecutionFailureException() {
8+
}
9+
10+
public QueryExecutionFailureException(String message) {
11+
super(message);
12+
}
13+
14+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.baeldung.athena.initialization;
2+
3+
import java.nio.charset.StandardCharsets;
4+
5+
import org.apache.commons.io.FileUtils;
6+
import org.springframework.boot.ApplicationArguments;
7+
import org.springframework.boot.ApplicationRunner;
8+
import org.springframework.core.io.support.ResourcePatternResolver;
9+
import org.springframework.stereotype.Component;
10+
11+
import com.baeldung.athena.service.QueryService;
12+
13+
import lombok.RequiredArgsConstructor;
14+
import lombok.SneakyThrows;
15+
import lombok.extern.slf4j.Slf4j;
16+
17+
@Slf4j
18+
@Component
19+
@RequiredArgsConstructor
20+
public class AthenaInitializer implements ApplicationRunner {
21+
22+
private final QueryService queryService;
23+
private final ResourcePatternResolver resourcePatternResolver;
24+
25+
private static final String ATHENA_INIT_SCRIPT_PATTERN = "classpath:athena-init/*.sql";
26+
27+
@Override
28+
@SneakyThrows
29+
public void run(ApplicationArguments args) {
30+
final var initScripts = resourcePatternResolver.getResources(ATHENA_INIT_SCRIPT_PATTERN);
31+
for (final var script : initScripts) {
32+
final var sqlScript = FileUtils.readFileToString(script.getFile(), StandardCharsets.UTF_8);
33+
queryService.execute(sqlScript, Void.class);
34+
log.info("Successfully executed {}.", script.getFilename());
35+
}
36+
}
37+
38+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package com.baeldung.athena.service;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collections;
5+
import java.util.List;
6+
import java.util.concurrent.TimeUnit;
7+
8+
import org.json.JSONObject;
9+
import org.springframework.lang.Nullable;
10+
import org.springframework.stereotype.Service;
11+
12+
import com.baeldung.athena.exception.QueryExecutionFailureException;
13+
import com.fasterxml.jackson.databind.ObjectMapper;
14+
import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
15+
16+
import lombok.NonNull;
17+
import lombok.RequiredArgsConstructor;
18+
import lombok.SneakyThrows;
19+
import lombok.extern.slf4j.Slf4j;
20+
import software.amazon.awssdk.services.athena.AthenaClient;
21+
import software.amazon.awssdk.services.athena.model.Datum;
22+
import software.amazon.awssdk.services.athena.model.GetQueryResultsResponse;
23+
import software.amazon.awssdk.services.athena.model.InvalidRequestException;
24+
import software.amazon.awssdk.services.athena.model.QueryExecutionContext;
25+
import software.amazon.awssdk.services.athena.model.QueryExecutionState;
26+
import software.amazon.awssdk.services.athena.model.ResultConfiguration;
27+
28+
@Slf4j
29+
@Service
30+
@RequiredArgsConstructor
31+
public class QueryService {
32+
33+
private final AthenaClient athenaClient;
34+
private final ResultConfiguration resultConfiguration;
35+
private final QueryExecutionContext queryExecutionContext;
36+
37+
private static final long WAIT_PERIOD = 30;
38+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new JsonOrgModule());
39+
40+
public <T> List<T> execute(@NonNull final String sqlQuery, @NonNull final Class<T> targetClass) {
41+
return execute(sqlQuery, null, targetClass);
42+
}
43+
44+
public <T> List<T> execute(@NonNull final String sqlQuery, @Nullable final List<String> parameters,
45+
@NonNull final Class<T> targetClass) {
46+
String queryExecutionId;
47+
try {
48+
queryExecutionId = athenaClient.startQueryExecution(query ->
49+
query.queryString(sqlQuery)
50+
.queryExecutionContext(queryExecutionContext)
51+
.resultConfiguration(resultConfiguration)
52+
.executionParameters(parameters)
53+
).queryExecutionId();
54+
} catch (final InvalidRequestException exception) {
55+
log.error("Invalid SQL syntax detected in query {}", sqlQuery, exception);
56+
throw new QueryExecutionFailureException();
57+
}
58+
59+
waitForQueryToComplete(queryExecutionId);
60+
61+
final var queryResult = athenaClient.getQueryResults(request ->
62+
request.queryExecutionId(queryExecutionId));
63+
return transformQueryResult(queryResult, targetClass);
64+
}
65+
66+
@SneakyThrows
67+
private void waitForQueryToComplete(@NonNull final String queryExecutionId) {
68+
QueryExecutionState queryState;
69+
70+
do {
71+
final var response = athenaClient.getQueryExecution(request ->
72+
request.queryExecutionId(queryExecutionId));
73+
queryState = response.queryExecution().status().state();
74+
75+
switch (queryState) {
76+
case FAILED:
77+
case CANCELLED:
78+
final var error = response.queryExecution().status().athenaError().errorMessage();
79+
log.error("Query execution failed: {}", error);
80+
throw new QueryExecutionFailureException();
81+
case QUEUED:
82+
case RUNNING:
83+
TimeUnit.MILLISECONDS.sleep(WAIT_PERIOD);
84+
break;
85+
case SUCCEEDED:
86+
queryState = QueryExecutionState.SUCCEEDED;
87+
return;
88+
default:
89+
throw new IllegalStateException("Invalid query state");
90+
}
91+
} while (queryState != QueryExecutionState.SUCCEEDED);
92+
}
93+
94+
@SneakyThrows
95+
private <T> List<T> transformQueryResult(@NonNull final GetQueryResultsResponse queryResultsResponse,
96+
@NonNull final Class<T> targetClass) {
97+
final var response = new ArrayList<T>();
98+
final var rows = queryResultsResponse.resultSet().rows();
99+
if (rows.isEmpty()) {
100+
return Collections.emptyList();
101+
}
102+
final var headers = rows.get(0).data().stream()
103+
.map(Datum::varCharValue)
104+
.toList();
105+
106+
rows.stream()
107+
.skip(1)
108+
.forEach(row -> {
109+
final var element = new JSONObject();
110+
final var data = row.data();
111+
112+
for (int i = 0; i < headers.size(); i++) {
113+
final var key = headers.get(i);
114+
final var value = data.get(i).varCharValue();
115+
element.put(key, value);
116+
}
117+
final var obj = OBJECT_MAPPER.convertValue(element, targetClass);
118+
response.add(obj);
119+
});
120+
return response;
121+
}
122+
123+
public record User(Integer id, String name, Integer age, String city) {};
124+
125+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.baeldung.athena.service;
2+
3+
import java.util.List;
4+
5+
import org.springframework.stereotype.Service;
6+
7+
import com.baeldung.athena.service.QueryService.User;
8+
9+
import lombok.NonNull;
10+
import lombok.RequiredArgsConstructor;
11+
12+
@Service
13+
@RequiredArgsConstructor
14+
public class UserService {
15+
16+
private final QueryService queryService;
17+
18+
public List<User> getUsersByName(@NonNull final String name) {
19+
final var query = "SELECT * FROM users WHERE name = ?";
20+
return queryService.execute(query, List.of(name), User.class);
21+
}
22+
23+
}

0 commit comments

Comments
 (0)