Skip to content

Commit 13253ef

Browse files
authored
Migrere data fra inntekt_v1_arena_mapping til inntekt_v1_person_mapping (#89)
* Migrere data fra inntekt_v1_arena_mapping til inntekt_v1_person_mapping * Used postgresql advisory lock to ensure that only one intance of dp-inntekt-api does the job Also, first try in preprod navikt/dagpenger#397
1 parent 16385b9 commit 13253ef

File tree

4 files changed

+234
-1
lines changed

4 files changed

+234
-1
lines changed

src/main/kotlin/no/nav/dagpenger/inntekt/InntektApi.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ import java.net.URI
3434
import java.net.URL
3535
import java.util.concurrent.TimeUnit
3636
import kotlin.concurrent.fixedRateTimer
37+
import kotlinx.coroutines.launch
3738
import kotlinx.coroutines.runBlocking
3839
import mu.KotlinLogging
40+
import no.nav.dagpenger.inntekt.db.ArenaMappingMigrator
3941
import no.nav.dagpenger.inntekt.db.IllegalInntektIdException
4042
import no.nav.dagpenger.inntekt.db.InntektNotFoundException
4143
import no.nav.dagpenger.inntekt.db.InntektStore
@@ -95,6 +97,16 @@ fun main() = runBlocking {
9597
LOGGER.info { "Vaktmesteren er ferdig... for denne gang" }
9698
})
9799

100+
val arenaMappingMigrator = ArenaMappingMigrator(dataSource)
101+
102+
if (config.application.profile != Profile.PROD) {
103+
launch {
104+
LOGGER.info { "Starting ArenaMappingMigrator" }
105+
arenaMappingMigrator.migrate()
106+
LOGGER.info { "Done running ArenaMappingMigrator" }
107+
}
108+
}
109+
98110
val inntektskomponentHttpClient = InntektskomponentHttpClient(
99111
config.application.hentinntektListeUrl,
100112
stsOidcClient
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package no.nav.dagpenger.inntekt.db
2+
3+
import javax.sql.DataSource
4+
import kotliquery.queryOf
5+
import kotliquery.sessionOf
6+
import kotliquery.using
7+
import mu.KotlinLogging
8+
import org.intellij.lang.annotations.Language
9+
10+
internal class ArenaMappingMigrator(private val datasource: DataSource) {
11+
12+
companion object {
13+
private val logger = KotlinLogging.logger {}
14+
private val lockKey = 1337
15+
}
16+
17+
@Language("sql")
18+
private val statement: String = """
19+
INSERT INTO inntekt_v1_person_mapping (inntektid, aktørid, vedtakid, beregningsdato)
20+
SELECT inntektid, aktørid, vedtakid, beregningsdato
21+
FROM inntekt_v1_arena_mapping
22+
WHERE inntektid NOT IN (SELECT inntektid from inntekt_v1_person_mapping)
23+
""".trimIndent()
24+
25+
fun migrate(): Int {
26+
try {
27+
val locked = lock()
28+
return if (locked) {
29+
logger.info { "Obtained lock for migrator" }
30+
val rowsMigrated = using(sessionOf(datasource)) { session ->
31+
session.transaction {
32+
it.run(queryOf(statement).asUpdate)
33+
}
34+
}
35+
36+
logger.info { "Migrated $rowsMigrated rows from inntekt_v1_arena_mapping to inntekt_v1_person_mapping" }
37+
rowsMigrated
38+
} else {
39+
logger.info { "Could not obtain lock for migrator" }
40+
0
41+
}
42+
} finally {
43+
val unlocked = unlock()
44+
logger.info { "Unlocked = $unlocked for migrator" }
45+
}
46+
}
47+
48+
private fun unlock(): Boolean {
49+
return using(sessionOf(datasource)) { session ->
50+
session.run(
51+
queryOf("select pg_advisory_unlock($lockKey)").map {
52+
it.string(1) == "t"
53+
}.asSingle
54+
) ?: false
55+
}
56+
}
57+
58+
private fun lock(): Boolean {
59+
return using(sessionOf(datasource)) { session ->
60+
session.run(
61+
queryOf("select pg_try_advisory_lock($lockKey)").map {
62+
it.string(1) == "t"
63+
}.asSingle
64+
) ?: false
65+
}
66+
}
67+
}

src/main/kotlin/no/nav/dagpenger/inntekt/db/PostgresInntektStore.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.postgresql.util.PSQLException
2121
internal class PostgresInntektStore(private val dataSource: DataSource) : InntektStore, HealthCheck {
2222

2323
companion object {
24-
private val adapter: JsonAdapter<InntektkomponentResponse> =
24+
internal val adapter: JsonAdapter<InntektkomponentResponse> =
2525
moshiInstance.adapter(InntektkomponentResponse::class.java)
2626
private val ulidGenerator = ULID()
2727
private val LOGGER = KotlinLogging.logger {}
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package no.nav.dagpenger.inntekt.db
2+
3+
import de.huxhorn.sulky.ulid.ULID
4+
import io.kotest.matchers.collections.shouldContainAll
5+
import io.kotest.matchers.ints.shouldBeExactly
6+
import java.time.LocalDate
7+
import java.time.ZonedDateTime
8+
import kotlin.random.Random
9+
import kotliquery.LoanPattern.using
10+
import kotliquery.queryOf
11+
import kotliquery.sessionOf
12+
import no.nav.dagpenger.inntekt.DataSource as TestDataSource
13+
import no.nav.dagpenger.inntekt.inntektskomponenten.v1.Aktoer
14+
import no.nav.dagpenger.inntekt.inntektskomponenten.v1.AktoerType
15+
import no.nav.dagpenger.inntekt.inntektskomponenten.v1.InntektkomponentResponse
16+
import no.nav.dagpenger.inntekt.withMigratedDb
17+
import org.junit.jupiter.api.Test
18+
import org.postgresql.util.PGobject
19+
20+
internal class ArenaMappingMigratorTest {
21+
22+
companion object {
23+
24+
private val hentInntektListeResponse = InntektkomponentResponse(
25+
emptyList(),
26+
Aktoer(AktoerType.AKTOER_ID, "1234")
27+
)
28+
}
29+
30+
@Test
31+
fun `Skal migrere data fra arena mapping til person tabell `() {
32+
withMigratedDb {
33+
val numberOfInserts = 100
34+
val commands = (1..numberOfInserts).toList().map { InntektId(ULID().nextULID()) to StoreInntektCommand(
35+
inntektparametre = Inntektparametre(aktørId = Random.nextInt().toString(), vedtakId = Random.nextLong().toString(), beregningsdato = LocalDate.now()),
36+
inntekt = hentInntektListeResponse
37+
) }
38+
fillArenaMappingTable(TestDataSource.instance, commands)
39+
val arenaMappingMigrator = ArenaMappingMigrator(TestDataSource.instance)
40+
val rowsMigrated = arenaMappingMigrator.migrate()
41+
42+
val inntektStore = PostgresInntektStore(TestDataSource.instance)
43+
44+
val ids = commands.mapNotNull { (_, command) ->
45+
inntektStore.getInntektId(
46+
inntektparametre = command.inntektparametre
47+
)
48+
}
49+
50+
rowsMigrated shouldBeExactly numberOfInserts
51+
ids.size shouldBeExactly numberOfInserts
52+
ids shouldContainAll commands.map { it.first }
53+
}
54+
}
55+
56+
@Test
57+
fun `Skal migrere data fra arena mapping til person tabell med samme vedtak id og aktør id men forskjellig beregningsdato`() {
58+
withMigratedDb {
59+
val numberOfInserts = 4
60+
val commandsWithSameVedtakId: List<Pair<InntektId, StoreInntektCommand>> = (1..4).toList().map { InntektId(ULID().nextULID()) to StoreInntektCommand(
61+
inntektparametre = Inntektparametre(aktørId = "1234", vedtakId = "5678", beregningsdato = LocalDate.now().minusDays(Random.nextInt(365).toLong())),
62+
inntekt = hentInntektListeResponse
63+
) }
64+
fillArenaMappingTable(TestDataSource.instance, commandsWithSameVedtakId)
65+
val arenaMappingMigrator = ArenaMappingMigrator(TestDataSource.instance)
66+
val rowsMigrated = arenaMappingMigrator.migrate()
67+
68+
val inntektStore = PostgresInntektStore(TestDataSource.instance)
69+
70+
val ids = commandsWithSameVedtakId.mapNotNull { (_, command) ->
71+
inntektStore.getInntektId(
72+
inntektparametre = command.inntektparametre
73+
)
74+
}
75+
76+
rowsMigrated shouldBeExactly numberOfInserts
77+
ids.size shouldBeExactly numberOfInserts
78+
ids shouldContainAll commandsWithSameVedtakId.map { it.first }
79+
}
80+
}
81+
82+
@Test
83+
fun ` Migrere kan skje flere ganger `() {
84+
85+
withMigratedDb {
86+
val numberOfInserts = 100
87+
val commands = (1..numberOfInserts).toList().map {
88+
InntektId(ULID().nextULID()) to StoreInntektCommand(
89+
inntektparametre = Inntektparametre(
90+
aktørId = Random.nextInt().toString(),
91+
vedtakId = Random.nextLong().toString(),
92+
beregningsdato = LocalDate.now()
93+
),
94+
inntekt = hentInntektListeResponse
95+
)
96+
}
97+
fillArenaMappingTable(TestDataSource.instance, commands)
98+
val arenaMappingMigrator = ArenaMappingMigrator(TestDataSource.instance)
99+
val rowsMigrated = arenaMappingMigrator.migrate()
100+
val rowsMigratedSecondTime = arenaMappingMigrator.migrate()
101+
102+
val inntektStore = PostgresInntektStore(TestDataSource.instance)
103+
104+
val ids = commands.mapNotNull { (_, command) ->
105+
inntektStore.getInntektId(
106+
inntektparametre = command.inntektparametre
107+
)
108+
}
109+
110+
rowsMigrated shouldBeExactly numberOfInserts
111+
rowsMigratedSecondTime shouldBeExactly 0
112+
ids.size shouldBeExactly numberOfInserts
113+
ids shouldContainAll commands.map { it.first }
114+
}
115+
}
116+
117+
private fun fillArenaMappingTable(dataSource: javax.sql.DataSource, commands: List<Pair<InntektId, StoreInntektCommand>>) {
118+
commands.forEach { (id, command) ->
119+
using(sessionOf(dataSource)) { session ->
120+
session.transaction { tx ->
121+
tx.run(
122+
queryOf(
123+
"INSERT INTO inntekt_V1 (id, inntekt, manuelt_redigert, timestamp) VALUES (:id, :data, :manuelt, :created)",
124+
mapOf(
125+
"id" to id.id,
126+
"created" to ZonedDateTime.now(),
127+
"data" to PGobject().apply {
128+
type = "jsonb"
129+
value = PostgresInntektStore.adapter.toJson(command.inntekt)
130+
},
131+
when (command.manueltRedigert) {
132+
null -> "manuelt" to false
133+
else -> "manuelt" to true
134+
}
135+
)
136+
137+
).asUpdate
138+
)
139+
tx.run(
140+
queryOf(
141+
"INSERT INTO inntekt_V1_arena_mapping VALUES (:inntektId, :aktorId, :vedtakId, :beregningsDato)",
142+
mapOf(
143+
"inntektId" to id.id,
144+
"aktorId" to command.inntektparametre.aktørId,
145+
"vedtakId" to command.inntektparametre.vedtakId.toLong(),
146+
"beregningsDato" to command.inntektparametre.beregningsdato
147+
)
148+
).asUpdate
149+
)
150+
}
151+
}
152+
}
153+
}
154+
}

0 commit comments

Comments
 (0)