1
1
package no.nav.dagpenger.inntekt.db
2
2
3
3
import javax.sql.DataSource
4
+ import kotliquery.TransactionalSession
4
5
import kotliquery.queryOf
5
6
import kotliquery.sessionOf
6
7
import kotliquery.using
@@ -16,49 +17,46 @@ internal class ArenaMappingMigrator(private val datasource: DataSource) {
16
17
17
18
@Language(" sql" )
18
19
private val statement: String = """
19
- INSERT INTO inntekt_v1_person_mapping (inntektid, aktørid, vedtakid, beregningsdato)
20
- SELECT inntektid, aktørid, vedtakid, beregningsdato
20
+ INSERT INTO inntekt_v1_person_mapping (inntektid, aktørid, vedtakid, beregningsdato, timestamp )
21
+ SELECT inntektid, aktørid, vedtakid, beregningsdato, timestamp
21
22
FROM inntekt_v1_arena_mapping
22
23
WHERE inntektid NOT IN (SELECT inntektid from inntekt_v1_person_mapping)
23
24
""" .trimIndent()
24
25
25
26
fun migrate (): Int {
26
27
try {
27
- val locked = lock()
28
- return if (locked) {
29
- logger.info { " Obtained lock for migrator" }
30
- val rowsMigrated = using(sessionOf(datasource)) { session ->
31
- session.transaction {
32
- it.run (queryOf(statement).asUpdate)
28
+ val rowsMigrated = using(sessionOf(datasource)) { session ->
29
+ session.transaction { transaction ->
30
+ if (lock(transaction)) {
31
+ logger.info { " Obtained lock for migrator" }
32
+ transaction.run (queryOf(statement).asUpdate)
33
+ } else {
34
+ logger.info { " Could not obtain lock for migrator" }
35
+ 0
33
36
}
34
37
}
35
-
36
- logger.info { " Migrated $rowsMigrated rows from inntekt_v1_arena_mapping to inntekt_v1_person_mapping" }
37
- rowsMigrated
38
- } else {
39
- logger.info { " Could not obtain lock for migrator" }
40
- 0
41
38
}
39
+
40
+ logger.info { " Migrated $rowsMigrated rows from inntekt_v1_arena_mapping to inntekt_v1_person_mapping" }
41
+ return rowsMigrated
42
42
} finally {
43
43
val unlocked = unlock()
44
44
logger.info { " Unlocked = $unlocked for migrator" }
45
45
}
46
46
}
47
47
48
- private fun unlock (): Boolean {
49
- return using(sessionOf(datasource)) { session ->
50
- session.run (
51
- queryOf(" select pg_advisory_unlock($lockKey )" ).map {
52
- it.string(1 ) == " t"
53
- }.asSingle
54
- ) ? : false
55
- }
48
+ private fun lock (session : TransactionalSession ): Boolean {
49
+ return session.run (
50
+ queryOf(" select pg_try_advisory_xact_lock($lockKey )" ).map {
51
+ it.string(1 ) == " t"
52
+ }.asSingle
53
+ ) ? : false
56
54
}
57
55
58
- private fun lock (): Boolean {
56
+ private fun unlock (): Boolean {
59
57
return using(sessionOf(datasource)) { session ->
60
58
session.run (
61
- queryOf(" select pg_try_advisory_lock ($lockKey )" ).map {
59
+ queryOf(" select pg_advisory_unlock ($lockKey )" ).map {
62
60
it.string(1 ) == " t"
63
61
}.asSingle
64
62
) ? : false
0 commit comments