Skip to content

Commit 56857c4

Browse files
authored
feat: Add s3 file ingestion example project (#123)
1 parent 7b1b313 commit 56857c4

File tree

5 files changed

+183
-2
lines changed

5 files changed

+183
-2
lines changed

examples/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ plugins {
44

55
allprojects {
66
group = "aws.sdk.kotlin.example"
7-
version = "1.0-SNAPSHOT"
7+
version = "2.0-SNAPSHOT"
88

99
repositories {
1010
maven {

examples/gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11

22
# AWS SDK
3-
awsSdkKotlinVersion=0.1.0
3+
awsSdkKotlinVersion=0.2.0
44

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
plugins {
2+
kotlin("jvm")
3+
}
4+
5+
val awsSdkKotlinVersion: String by project
6+
7+
dependencies {
8+
implementation(kotlin("stdlib"))
9+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3")
10+
implementation("aws.sdk.kotlin:s3:$awsSdkKotlinVersion")
11+
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
import aws.sdk.kotlin.services.s3.S3Client
6+
import aws.sdk.kotlin.services.s3.model.*
7+
import kotlinx.coroutines.flow.*
8+
import kotlinx.coroutines.runBlocking
9+
import software.aws.clientrt.content.ByteStream
10+
import software.aws.clientrt.content.fromFile
11+
import software.aws.clientrt.content.writeToFile
12+
import java.io.File
13+
import java.nio.file.Files
14+
15+
/**
16+
* This program reads media files from a specified directory and uploads media files to S3.
17+
* After uploading it will then download uploaded files back into a local directory.
18+
*
19+
* Any file with the extension `.avi` will be processed. To test create a text file and
20+
* name it such that it matches the [filenameMetadataRegex] regex, ex:
21+
* `title_2000.avi`.
22+
*
23+
* When running the sample adjust the following path constants as needed for your local environment.
24+
*/
25+
const val bucketName = "s3-media-ingestion-example"
26+
const val ingestionDirPath = "/tmp/media-in"
27+
const val completedDirPath = "/tmp/media-processed"
28+
const val failedDirPath = "/tmp/media-failed"
29+
const val downloadDirPath = "/tmp/media-down"
30+
31+
// media metadata is extracted from filename: <title>_<year>.avi
32+
val filenameMetadataRegex = "([\\w\\s]+)_([\\d]+).avi".toRegex()
33+
34+
fun main(): Unit = runBlocking {
35+
val client = S3Client { region = "us-east-2" }
36+
37+
try {
38+
// Setup
39+
client.ensureBucketExists(bucketName)
40+
listOf(completedDirPath, failedDirPath, downloadDirPath).forEach { validateDirectory(it) }
41+
val ingestionDir = validateDirectory(ingestionDirPath)
42+
43+
// Upload files
44+
val uploadResults = ingestionDir
45+
.walk()
46+
.asFlow()
47+
.mapNotNull(::mediaMetadataExtractor)
48+
.map { mediaMetadata -> client.uploadToS3(mediaMetadata) }
49+
.toList()
50+
51+
moveFiles(uploadResults)
52+
53+
// Print results of operation
54+
val (successes, failures) = uploadResults.partition { it is Success }
55+
when (failures.isEmpty()) {
56+
true -> println("Media uploaded successfully: $successes")
57+
false -> println("Successfully uploaded: $successes \nFailed to upload: $failures")
58+
}
59+
60+
// Download files to verify
61+
client.listObjects(ListObjectsRequest { bucket = bucketName }).contents?.forEach { obj ->
62+
client.getObject(GetObjectRequest { key = obj.key; bucket = bucketName }) { response ->
63+
val outputFile = File(downloadDirPath, obj.key!!)
64+
response.body?.writeToFile(outputFile).also { size ->
65+
println("Downloaded $outputFile ($size bytes) from S3")
66+
}
67+
}
68+
}
69+
} finally {
70+
client.close()
71+
}
72+
}
73+
74+
/** Check for valid S3 configuration based on account */
75+
suspend fun S3Client.ensureBucketExists(bucketName: String) {
76+
if (!bucketExists(bucketName)) {
77+
createBucket(
78+
CreateBucketRequest {
79+
bucket = bucketName
80+
createBucketConfiguration {
81+
locationConstraint = BucketLocationConstraint.UsEast2
82+
}
83+
}
84+
)
85+
}
86+
}
87+
88+
/** Upload to S3 if file not already uploaded */
89+
suspend fun S3Client.uploadToS3(mediaMetadata: MediaMetadata): UploadResult {
90+
if (keyExists(bucketName, mediaMetadata.s3KeyName))
91+
return FileExistsError("${mediaMetadata.s3KeyName} already uploaded.", mediaMetadata)
92+
93+
return try {
94+
putObject(
95+
PutObjectRequest {
96+
bucket = bucketName
97+
key = mediaMetadata.s3KeyName
98+
body = ByteStream.fromFile(mediaMetadata.file)
99+
metadata = mediaMetadata.toMap()
100+
}
101+
)
102+
Success("$bucketName/${mediaMetadata.s3KeyName}", mediaMetadata)
103+
} catch (e: Exception) { // Checking Service Exception coming in future release
104+
UploadError(e, mediaMetadata)
105+
}
106+
}
107+
108+
/** Determine if a object exists in a bucket */
109+
suspend fun S3Client.keyExists(s3bucket: String, s3key: String) =
110+
try {
111+
headObject(
112+
HeadObjectRequest {
113+
bucket = s3bucket
114+
key = s3key
115+
}
116+
)
117+
true
118+
} catch (e: Exception) { // Checking Service Exception coming in future release
119+
false
120+
}
121+
122+
/** Determine if a object exists in a bucket */
123+
suspend fun S3Client.bucketExists(s3bucket: String) =
124+
try {
125+
headBucket(HeadBucketRequest { bucket = s3bucket })
126+
true
127+
} catch (e: Exception) { // Checking Service Exception coming in future release
128+
false
129+
}
130+
131+
/** Move files to directories based on upload results */
132+
fun moveFiles(uploadResults: List<UploadResult>) =
133+
uploadResults
134+
.map { uploadResult -> uploadResult.mediaMetadata.file.toPath() to (uploadResult is Success) }
135+
.forEach { (file, uploadSuccess) ->
136+
val targetFilePath = if (uploadSuccess) completedDirPath else failedDirPath
137+
val targetPath = File(targetFilePath)
138+
Files.move(file, File(targetPath, file.fileName.toString()).toPath())
139+
}
140+
141+
// Classes for S3 upload results
142+
sealed class UploadResult { abstract val mediaMetadata: MediaMetadata }
143+
data class Success(val location: String, override val mediaMetadata: MediaMetadata) : UploadResult()
144+
data class UploadError(val error: Throwable, override val mediaMetadata: MediaMetadata) : UploadResult()
145+
data class FileExistsError(val reason: String, override val mediaMetadata: MediaMetadata) : UploadResult()
146+
147+
// Classes, properties, and functions for media metadata
148+
data class MediaMetadata(val title: String, val year: Int, val file: File)
149+
val MediaMetadata.s3KeyName get() = "$title-$year"
150+
fun MediaMetadata.toMap() = mapOf("title" to title, "year" to year.toString())
151+
fun mediaMetadataExtractor(file: File): MediaMetadata? {
152+
if (!file.isFile || file.length() == 0L) return null
153+
154+
val matchResult = filenameMetadataRegex.find(file.name) ?: return null
155+
156+
val (title, year) = matchResult.destructured
157+
return MediaMetadata(title, year.toInt(), file)
158+
}
159+
160+
/** Validate file path and optionally create directory */
161+
fun validateDirectory(dirPath: String): File {
162+
val dir = File(dirPath)
163+
164+
require(dir.isDirectory || !dir.exists()) { "Unable to use $dir" }
165+
166+
if (!dir.exists()) require(dir.mkdirs()) { "Unable to create $dir" }
167+
168+
return dir
169+
}

examples/settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
rootProject.name = "aws-sdk-kotlin-examples"
22

33
include(":dynamodb-movies")
4+
include(":s3-media-ingestion")

0 commit comments

Comments
 (0)