Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 75 additions & 64 deletions frameworks/Kotlin/ktor/ktor-exposed/app/src/main/kotlin/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ import org.jetbrains.exposed.v1.core.eq
import org.jetbrains.exposed.v1.core.statements.BatchUpdateStatement
import org.jetbrains.exposed.v1.core.vendors.PostgreSQLDialect
import org.jetbrains.exposed.v1.jdbc.Database
import org.jetbrains.exposed.v1.jdbc.JdbcTransaction
import org.jetbrains.exposed.v1.jdbc.transactions.suspendTransaction
import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabase
import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabaseConfig
import org.jetbrains.exposed.v1.r2dbc.R2dbcTransaction
import org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction
import java.util.concurrent.ThreadLocalRandom
import org.jetbrains.exposed.v1.jdbc.select as jdbcSelect
import org.jetbrains.exposed.v1.jdbc.statements.toExecutable as toJdbcExecutable
import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager.Companion as JdbcTransactionManager
import org.jetbrains.exposed.v1.r2dbc.select as r2dbcSelect
import org.jetbrains.exposed.v1.r2dbc.statements.toExecutable as toR2dbcExecutable
import org.jetbrains.exposed.v1.r2dbc.transactions.TransactionManager as R2dbcTransactionManager

enum class ConnectionMode {
Jdbc, R2dbc
Expand Down Expand Up @@ -69,41 +69,43 @@ private const val DB_ROWS = 10_000
fun ThreadLocalRandom.nextIntWithinRows() =
nextInt(DB_ROWS) + 1

interface ExposedOps<TDatabase> {
interface ExposedOps<TDatabase, TTransaction> {
fun createDatabase(): TDatabase
suspend fun <T> transaction(db: TDatabase, statement: suspend /*JdbcTransaction.*/() -> T): T
suspend fun <T> transaction(db: TDatabase, statement: suspend TTransaction.() -> T): T

// Repository pattern functions. These can also be extracted into a separate interface.
suspend fun getWorldWithId(id: Int): World
suspend fun getRandomWorlds(queries: Int, random: ThreadLocalRandom): List<World>
suspend fun getAllFortunesAndAddTo(result: MutableList<Fortune>)
suspend fun getRandomWorldsAndUpdate(queries: Int, random: ThreadLocalRandom): List<World>
suspend fun TTransaction.getWorldWithId(id: Int): World
suspend fun TTransaction.getRandomWorlds(queries: Int, random: ThreadLocalRandom): List<World>
suspend fun TTransaction.getAllFortunesAndAddTo(result: MutableList<Fortune>)
suspend fun TTransaction.getRandomWorldsAndUpdate(queries: Int, random: ThreadLocalRandom): List<World>

interface Jdbc : ExposedOps<Database> {
interface Jdbc : ExposedOps<Database, JdbcTransaction> {
override fun createDatabase(): Database {
val poolSize = Runtime.getRuntime().availableProcessors() * 2
val pool = HikariDataSource(HikariConfig().apply { configurePostgres(poolSize) })
return Database.connect(pool)
}

override suspend fun <T> transaction(db: Database, statement: suspend () -> T): T =
override suspend fun <T> transaction(db: Database, statement: suspend JdbcTransaction.() -> T): T =
suspendTransaction(db) { statement() }

object Dsl : Jdbc {
override suspend fun getWorldWithId(id: Int): World =
override suspend fun JdbcTransaction.getWorldWithId(id: Int): World =
WorldTable.jdbcSelect(WorldTable.id, WorldTable.randomNumber).where(WorldTable.id eq id)
.single().toWorld()

override suspend fun getRandomWorlds(queries: Int, random: ThreadLocalRandom): List<World> =
override suspend fun JdbcTransaction.getRandomWorlds(queries: Int, random: ThreadLocalRandom): List<World> =
// Coroutine concurrent `select`s lead to connection pool exhaustion.
List(queries) { getWorldWithId(random.nextIntWithinRows()) }

override suspend fun getAllFortunesAndAddTo(result: MutableList<Fortune>) {
override suspend fun JdbcTransaction.getAllFortunesAndAddTo(result: MutableList<Fortune>) {
FortuneTable.jdbcSelect(FortuneTable.id, FortuneTable.message)
.mapTo(result) { it.toFortune() }
}

override suspend fun getRandomWorldsAndUpdate(queries: Int, random: ThreadLocalRandom): List<World> {
override suspend fun JdbcTransaction.getRandomWorldsAndUpdate(
queries: Int, random: ThreadLocalRandom
): List<World> {
val result = getRandomWorlds(queries, random)
result.forEach { it.randomNumber = random.nextIntWithinRows() }
val batch = BatchUpdateStatement(WorldTable)
Expand All @@ -112,24 +114,26 @@ interface ExposedOps<TDatabase> {
batch[WorldTable.randomNumber] = world.randomNumber
}
// also consider passing the transaction explicitly
batch.toJdbcExecutable().execute(JdbcTransactionManager.current())
batch.toJdbcExecutable().execute(this)
return result
}
}

object Dao : Jdbc {
override suspend fun getWorldWithId(id: Int): World =
override suspend fun JdbcTransaction.getWorldWithId(id: Int): World =
WorldDao[id].toWorld()

override suspend fun getRandomWorlds(queries: Int, random: ThreadLocalRandom): List<World> =
override suspend fun JdbcTransaction.getRandomWorlds(queries: Int, random: ThreadLocalRandom): List<World> =
//List(queries) { WorldDao[random.nextIntWithinRows()].toWorld() }
throw IllegalArgumentException("DAO not supported because it appears to cache results")

override suspend fun getAllFortunesAndAddTo(result: MutableList<Fortune>) {
override suspend fun JdbcTransaction.getAllFortunesAndAddTo(result: MutableList<Fortune>) {
FortuneDao.all().mapTo(result) { it.toFortune() }
}

override suspend fun getRandomWorldsAndUpdate(queries: Int, random: ThreadLocalRandom): List<World> {
override suspend fun JdbcTransaction.getRandomWorldsAndUpdate(
queries: Int, random: ThreadLocalRandom
): List<World> {
/*
val worldDaosAndNewRandomNumbers =
List(queries) { WorldDao[random.nextIntWithinRows()] to random.nextIntWithinRows() }
Expand All @@ -146,100 +150,107 @@ interface ExposedOps<TDatabase> {
}

// TODO consider moving to separate files to avoid import conflicts/aliases
interface R2dbc : ExposedOps<R2dbcDatabase> {
interface R2dbc : ExposedOps<R2dbcDatabase, R2dbcTransaction> {
override fun createDatabase(): R2dbcDatabase =
R2dbcDatabase.connect(configurePostgresR2DBC(), R2dbcDatabaseConfig {
// This can't be omitted.
explicitDialect = PostgreSQLDialect()
})

override suspend fun <T> transaction(db: R2dbcDatabase, statement: suspend () -> T): T =
override suspend fun <T> transaction(db: R2dbcDatabase, statement: suspend R2dbcTransaction.() -> T): T =
suspendTransaction(db) { statement() }

object Dsl : R2dbc {
override suspend fun getWorldWithId(id: Int): World =
override suspend fun R2dbcTransaction.getWorldWithId(id: Int): World =
WorldTable.r2dbcSelect(WorldTable.id, WorldTable.randomNumber).where(WorldTable.id eq id)
.single().toWorld()

override suspend fun getRandomWorlds(queries: Int, random: ThreadLocalRandom): List<World> =
override suspend fun R2dbcTransaction.getRandomWorlds(
queries: Int,
random: ThreadLocalRandom
): List<World> =
// Coroutine concurrent `select`s lead to connection pool exhaustion.
List(queries) { getWorldWithId(random.nextIntWithinRows()) }

override suspend fun getAllFortunesAndAddTo(result: MutableList<Fortune>) {
override suspend fun R2dbcTransaction.getAllFortunesAndAddTo(result: MutableList<Fortune>) {
FortuneTable.r2dbcSelect(FortuneTable.id, FortuneTable.message)
.map { it.toFortune() }.toList(result)
}

override suspend fun getRandomWorldsAndUpdate(queries: Int, random: ThreadLocalRandom): List<World> {
override suspend fun R2dbcTransaction.getRandomWorldsAndUpdate(
queries: Int,
random: ThreadLocalRandom
): List<World> {
val result = getRandomWorlds(queries, random)
result.forEach { it.randomNumber = random.nextIntWithinRows() }
val batch = BatchUpdateStatement(WorldTable)
result.sortedBy { it.id }.forEach { world ->
batch.addBatch(EntityID(world.id, WorldTable))
batch[WorldTable.randomNumber] = world.randomNumber
}
// also consider passing the transaction explicitly
batch.toR2dbcExecutable().execute(R2dbcTransactionManager.current())
batch.toR2dbcExecutable().execute(this)
return result
}
}
}
}

fun <Database> Application.parameterizedModule(exposedOps: ExposedOps<Database>) {
fun <TDatabase, TTransaction> Application.parameterizedModule(exposedOps: ExposedOps<TDatabase, TTransaction>) {
install(DefaultHeaders)

routing {
val database = exposedOps.createDatabase()
with(exposedOps) {
val database = createDatabase()

suspend fun <T> transaction(statement: suspend () -> T): T =
exposedOps.transaction(database) { statement() }
suspend fun <T> transaction(statement: suspend TTransaction.() -> T): T =
transaction(database, statement)

get("/db") {
val random = ThreadLocalRandom.current()
val result = transaction { exposedOps.getWorldWithId(random.nextIntWithinRows()) }
call.respondText(json.encodeToString(result), ContentType.Application.Json)
}
get("/db") {
val random = ThreadLocalRandom.current()
val result = transaction { getWorldWithId(random.nextIntWithinRows()) }
call.respondText(json.encodeToString(result), ContentType.Application.Json)
}

get("/queries") {
val queries = call.queries()
val random = ThreadLocalRandom.current()
val result = transaction { exposedOps.getRandomWorlds(queries, random) }
call.respondText(json.encodeToString(result), ContentType.Application.Json)
}
get("/queries") {
val queries = call.queries()
val random = ThreadLocalRandom.current()
val result = transaction { getRandomWorlds(queries, random) }
call.respondText(json.encodeToString(result), ContentType.Application.Json)
}

get("/fortunes") {
val result = mutableListOf<Fortune>()
get("/fortunes") {
val result = mutableListOf<Fortune>()

transaction { exposedOps.getAllFortunesAndAddTo(result) }
transaction { getAllFortunesAndAddTo(result) }

result.add(Fortune(0, "Additional fortune added at request time."))
result.sortBy { it.message }
result.add(Fortune(0, "Additional fortune added at request time."))
result.sortBy { it.message }

call.respondHtml {
head { title { +"Fortunes" } }
body {
table {
tr {
th { +"id" }
th { +"message" }
}
for (fortune in result) {
call.respondHtml {
head { title { +"Fortunes" } }
body {
table {
tr {
td { +fortune.id.toString() }
td { +fortune.message }
th { +"id" }
th { +"message" }
}
for (fortune in result) {
tr {
td { +fortune.id.toString() }
td { +fortune.message }
}
}
}
}
}
}
}

get("/updates") {
val queries = call.queries()
val random = ThreadLocalRandom.current()
val result = transaction { exposedOps.getRandomWorldsAndUpdate(queries, random) }
call.respondText(json.encodeToString(result), ContentType.Application.Json)
get("/updates") {
val queries = call.queries()
val random = ThreadLocalRandom.current()
val result = transaction { getRandomWorldsAndUpdate(queries, random) }
call.respondText(json.encodeToString(result), ContentType.Application.Json)
}
}
}
}
Loading