diff --git a/frameworks/Kotlin/ktor/ktor-exposed/app/src/main/kotlin/App.kt b/frameworks/Kotlin/ktor/ktor-exposed/app/src/main/kotlin/App.kt index 50f530b52ac..216c454aef9 100644 --- a/frameworks/Kotlin/ktor/ktor-exposed/app/src/main/kotlin/App.kt +++ b/frameworks/Kotlin/ktor/ktor-exposed/app/src/main/kotlin/App.kt @@ -20,17 +20,17 @@ import org.jetbrains.exposed.v1.core.eq import org.jetbrains.exposed.v1.core.statements.BatchUpdateStatement import org.jetbrains.exposed.v1.core.vendors.PostgreSQLDialect import org.jetbrains.exposed.v1.jdbc.Database +import org.jetbrains.exposed.v1.jdbc.JdbcTransaction import org.jetbrains.exposed.v1.jdbc.transactions.suspendTransaction import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabase import org.jetbrains.exposed.v1.r2dbc.R2dbcDatabaseConfig +import org.jetbrains.exposed.v1.r2dbc.R2dbcTransaction import org.jetbrains.exposed.v1.r2dbc.transactions.suspendTransaction import java.util.concurrent.ThreadLocalRandom import org.jetbrains.exposed.v1.jdbc.select as jdbcSelect import org.jetbrains.exposed.v1.jdbc.statements.toExecutable as toJdbcExecutable -import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager.Companion as JdbcTransactionManager import org.jetbrains.exposed.v1.r2dbc.select as r2dbcSelect import org.jetbrains.exposed.v1.r2dbc.statements.toExecutable as toR2dbcExecutable -import org.jetbrains.exposed.v1.r2dbc.transactions.TransactionManager as R2dbcTransactionManager enum class ConnectionMode { Jdbc, R2dbc @@ -69,41 +69,43 @@ private const val DB_ROWS = 10_000 fun ThreadLocalRandom.nextIntWithinRows() = nextInt(DB_ROWS) + 1 -interface ExposedOps { +interface ExposedOps { fun createDatabase(): TDatabase - suspend fun transaction(db: TDatabase, statement: suspend /*JdbcTransaction.*/() -> T): T + suspend fun transaction(db: TDatabase, statement: suspend TTransaction.() -> T): T // Repository pattern functions. These can also be extracted into a separate interface. - suspend fun getWorldWithId(id: Int): World - suspend fun getRandomWorlds(queries: Int, random: ThreadLocalRandom): List - suspend fun getAllFortunesAndAddTo(result: MutableList) - suspend fun getRandomWorldsAndUpdate(queries: Int, random: ThreadLocalRandom): List + suspend fun TTransaction.getWorldWithId(id: Int): World + suspend fun TTransaction.getRandomWorlds(queries: Int, random: ThreadLocalRandom): List + suspend fun TTransaction.getAllFortunesAndAddTo(result: MutableList) + suspend fun TTransaction.getRandomWorldsAndUpdate(queries: Int, random: ThreadLocalRandom): List - interface Jdbc : ExposedOps { + interface Jdbc : ExposedOps { override fun createDatabase(): Database { val poolSize = Runtime.getRuntime().availableProcessors() * 2 val pool = HikariDataSource(HikariConfig().apply { configurePostgres(poolSize) }) return Database.connect(pool) } - override suspend fun transaction(db: Database, statement: suspend () -> T): T = + override suspend fun transaction(db: Database, statement: suspend JdbcTransaction.() -> T): T = suspendTransaction(db) { statement() } object Dsl : Jdbc { - override suspend fun getWorldWithId(id: Int): World = + override suspend fun JdbcTransaction.getWorldWithId(id: Int): World = WorldTable.jdbcSelect(WorldTable.id, WorldTable.randomNumber).where(WorldTable.id eq id) .single().toWorld() - override suspend fun getRandomWorlds(queries: Int, random: ThreadLocalRandom): List = + override suspend fun JdbcTransaction.getRandomWorlds(queries: Int, random: ThreadLocalRandom): List = // Coroutine concurrent `select`s lead to connection pool exhaustion. List(queries) { getWorldWithId(random.nextIntWithinRows()) } - override suspend fun getAllFortunesAndAddTo(result: MutableList) { + override suspend fun JdbcTransaction.getAllFortunesAndAddTo(result: MutableList) { FortuneTable.jdbcSelect(FortuneTable.id, FortuneTable.message) .mapTo(result) { it.toFortune() } } - override suspend fun getRandomWorldsAndUpdate(queries: Int, random: ThreadLocalRandom): List { + override suspend fun JdbcTransaction.getRandomWorldsAndUpdate( + queries: Int, random: ThreadLocalRandom + ): List { val result = getRandomWorlds(queries, random) result.forEach { it.randomNumber = random.nextIntWithinRows() } val batch = BatchUpdateStatement(WorldTable) @@ -112,24 +114,26 @@ interface ExposedOps { batch[WorldTable.randomNumber] = world.randomNumber } // also consider passing the transaction explicitly - batch.toJdbcExecutable().execute(JdbcTransactionManager.current()) + batch.toJdbcExecutable().execute(this) return result } } object Dao : Jdbc { - override suspend fun getWorldWithId(id: Int): World = + override suspend fun JdbcTransaction.getWorldWithId(id: Int): World = WorldDao[id].toWorld() - override suspend fun getRandomWorlds(queries: Int, random: ThreadLocalRandom): List = + override suspend fun JdbcTransaction.getRandomWorlds(queries: Int, random: ThreadLocalRandom): List = //List(queries) { WorldDao[random.nextIntWithinRows()].toWorld() } throw IllegalArgumentException("DAO not supported because it appears to cache results") - override suspend fun getAllFortunesAndAddTo(result: MutableList) { + override suspend fun JdbcTransaction.getAllFortunesAndAddTo(result: MutableList) { FortuneDao.all().mapTo(result) { it.toFortune() } } - override suspend fun getRandomWorldsAndUpdate(queries: Int, random: ThreadLocalRandom): List { + override suspend fun JdbcTransaction.getRandomWorldsAndUpdate( + queries: Int, random: ThreadLocalRandom + ): List { /* val worldDaosAndNewRandomNumbers = List(queries) { WorldDao[random.nextIntWithinRows()] to random.nextIntWithinRows() } @@ -146,31 +150,37 @@ interface ExposedOps { } // TODO consider moving to separate files to avoid import conflicts/aliases - interface R2dbc : ExposedOps { + interface R2dbc : ExposedOps { override fun createDatabase(): R2dbcDatabase = R2dbcDatabase.connect(configurePostgresR2DBC(), R2dbcDatabaseConfig { // This can't be omitted. explicitDialect = PostgreSQLDialect() }) - override suspend fun transaction(db: R2dbcDatabase, statement: suspend () -> T): T = + override suspend fun transaction(db: R2dbcDatabase, statement: suspend R2dbcTransaction.() -> T): T = suspendTransaction(db) { statement() } object Dsl : R2dbc { - override suspend fun getWorldWithId(id: Int): World = + override suspend fun R2dbcTransaction.getWorldWithId(id: Int): World = WorldTable.r2dbcSelect(WorldTable.id, WorldTable.randomNumber).where(WorldTable.id eq id) .single().toWorld() - override suspend fun getRandomWorlds(queries: Int, random: ThreadLocalRandom): List = + override suspend fun R2dbcTransaction.getRandomWorlds( + queries: Int, + random: ThreadLocalRandom + ): List = // Coroutine concurrent `select`s lead to connection pool exhaustion. List(queries) { getWorldWithId(random.nextIntWithinRows()) } - override suspend fun getAllFortunesAndAddTo(result: MutableList) { + override suspend fun R2dbcTransaction.getAllFortunesAndAddTo(result: MutableList) { FortuneTable.r2dbcSelect(FortuneTable.id, FortuneTable.message) .map { it.toFortune() }.toList(result) } - override suspend fun getRandomWorldsAndUpdate(queries: Int, random: ThreadLocalRandom): List { + override suspend fun R2dbcTransaction.getRandomWorldsAndUpdate( + queries: Int, + random: ThreadLocalRandom + ): List { val result = getRandomWorlds(queries, random) result.forEach { it.randomNumber = random.nextIntWithinRows() } val batch = BatchUpdateStatement(WorldTable) @@ -178,68 +188,69 @@ interface ExposedOps { batch.addBatch(EntityID(world.id, WorldTable)) batch[WorldTable.randomNumber] = world.randomNumber } - // also consider passing the transaction explicitly - batch.toR2dbcExecutable().execute(R2dbcTransactionManager.current()) + batch.toR2dbcExecutable().execute(this) return result } } } } -fun Application.parameterizedModule(exposedOps: ExposedOps) { +fun Application.parameterizedModule(exposedOps: ExposedOps) { install(DefaultHeaders) routing { - val database = exposedOps.createDatabase() + with(exposedOps) { + val database = createDatabase() - suspend fun transaction(statement: suspend () -> T): T = - exposedOps.transaction(database) { statement() } + suspend fun transaction(statement: suspend TTransaction.() -> T): T = + transaction(database, statement) - get("/db") { - val random = ThreadLocalRandom.current() - val result = transaction { exposedOps.getWorldWithId(random.nextIntWithinRows()) } - call.respondText(json.encodeToString(result), ContentType.Application.Json) - } + get("/db") { + val random = ThreadLocalRandom.current() + val result = transaction { getWorldWithId(random.nextIntWithinRows()) } + call.respondText(json.encodeToString(result), ContentType.Application.Json) + } - get("/queries") { - val queries = call.queries() - val random = ThreadLocalRandom.current() - val result = transaction { exposedOps.getRandomWorlds(queries, random) } - call.respondText(json.encodeToString(result), ContentType.Application.Json) - } + get("/queries") { + val queries = call.queries() + val random = ThreadLocalRandom.current() + val result = transaction { getRandomWorlds(queries, random) } + call.respondText(json.encodeToString(result), ContentType.Application.Json) + } - get("/fortunes") { - val result = mutableListOf() + get("/fortunes") { + val result = mutableListOf() - transaction { exposedOps.getAllFortunesAndAddTo(result) } + transaction { getAllFortunesAndAddTo(result) } - result.add(Fortune(0, "Additional fortune added at request time.")) - result.sortBy { it.message } + result.add(Fortune(0, "Additional fortune added at request time.")) + result.sortBy { it.message } - call.respondHtml { - head { title { +"Fortunes" } } - body { - table { - tr { - th { +"id" } - th { +"message" } - } - for (fortune in result) { + call.respondHtml { + head { title { +"Fortunes" } } + body { + table { tr { - td { +fortune.id.toString() } - td { +fortune.message } + th { +"id" } + th { +"message" } + } + for (fortune in result) { + tr { + td { +fortune.id.toString() } + td { +fortune.message } + } } } } } } - } - get("/updates") { - val queries = call.queries() - val random = ThreadLocalRandom.current() - val result = transaction { exposedOps.getRandomWorldsAndUpdate(queries, random) } - call.respondText(json.encodeToString(result), ContentType.Application.Json) + get("/updates") { + val queries = call.queries() + val random = ThreadLocalRandom.current() + val result = transaction { getRandomWorldsAndUpdate(queries, random) } + call.respondText(json.encodeToString(result), ContentType.Application.Json) + } } } }