Skip to content

Commit 985180b

Browse files
committed
remove restriction on dataset twincache manipulation
This come for easing the manipulation of dataset twincache in ETL. Upload can now be done for every sourceType not just File type. Trx (transaction) has been removed from crud for twincache entities.
1 parent 9374a28 commit 985180b

File tree

2 files changed

+71
-73
lines changed

2 files changed

+71
-73
lines changed

dataset/src/integrationTest/kotlin/com/cosmotech/dataset/service/DatasetServiceIntegrationTest.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,6 @@ class DatasetServiceIntegrationTest : CsmRedisTestBase() {
598598

599599
val modifiedDataset =
600600
datasetApiService.findDatasetById(organizationSaved.id!!, datasetSaved.id!!)
601-
assertEquals(Dataset.IngestionStatus.SUCCESS.value, modifiedDataset.ingestionStatus!!.value)
602601
assertEquals(Dataset.TwincacheStatus.FULL.value, modifiedDataset.twincacheStatus!!.value)
603602
}
604603

dataset/src/main/kotlin/com/cosmotech/dataset/service/DatasetServiceImpl.kt

Lines changed: 71 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,8 @@ class DatasetServiceImpl(
304304
val dataset = getDatasetWithStatus(organizationId, datasetId)
305305
csmRbac.verify(dataset.getRbac(), PERMISSION_WRITE)
306306

307-
dataset.sourceType.takeIf { it == DatasetSourceType.File }
308-
?: throw CsmResourceNotFoundException("SourceType Dataset must be 'File'")
307+
dataset.sourceType.takeIf { it == DatasetSourceType.File || it == DatasetSourceType.ETL }
308+
?: throw CsmResourceNotFoundException("SourceType Dataset must be 'File' or 'ETL'")
309309

310310
// TODO: Validated with ressourceScanner PROD-12822
311311
val archiverType = ArchiveStreamFactory.detect(body.inputStream.buffered())
@@ -345,31 +345,34 @@ class DatasetServiceImpl(
345345
}
346346
}
347347
try {
348-
trx(dataset) {
349-
val queryBuffer = QueryBuffer(csmJedisPool.resource, dataset.twingraphId!!)
350-
nodes.forEach { file ->
351-
readCSV(file.content.inputStream()) {
352-
queryBuffer.addNode(file.filename, it.values.first(), it)
353-
}
348+
datasetRepository.save(dataset.apply { ingestionStatus = Dataset.IngestionStatus.PENDING })
349+
val queryBuffer = QueryBuffer(csmJedisPool.resource, dataset.twingraphId!!)
350+
nodes.forEach { file ->
351+
readCSV(file.content.inputStream()) {
352+
queryBuffer.addNode(file.filename, it.values.first(), it)
354353
}
355-
edges.forEach { file ->
356-
readCSV(file.content.inputStream()) {
357-
val sourceKey = it.keys.elementAt(0)
358-
val targetKey = it.keys.elementAt(1)
359-
val source = it[sourceKey].toString().trim()
360-
val target = it[targetKey].toString().trim()
361-
val properties = it.minus(sourceKey).minus(targetKey)
362-
queryBuffer.addEdge(file.filename, source, target, properties)
363-
}
354+
}
355+
edges.forEach { file ->
356+
readCSV(file.content.inputStream()) {
357+
val sourceKey = it.keys.elementAt(0)
358+
val targetKey = it.keys.elementAt(1)
359+
val source = it[sourceKey].toString().trim()
360+
val target = it[targetKey].toString().trim()
361+
val properties = it.minus(sourceKey).minus(targetKey)
362+
queryBuffer.addEdge(file.filename, source, target, properties)
364363
}
365-
queryBuffer.send()
366-
if (safeReplace) {
367-
csmJedisPool.resource.use { jedis ->
368-
jedis.eval("redis.call('DEL', KEYS[1]);", 1, "backupGraph-$datasetId")
369-
}
364+
}
365+
queryBuffer.send()
366+
if (safeReplace) {
367+
csmJedisPool.resource.use { jedis ->
368+
jedis.eval("redis.call('DEL', KEYS[1]);", 1, "backupGraph-$datasetId")
370369
}
371370
}
372-
datasetRepository.save(dataset.apply { twincacheStatus = Dataset.TwincacheStatus.FULL })
371+
datasetRepository.save(
372+
dataset.apply {
373+
twincacheStatus = Dataset.TwincacheStatus.FULL
374+
ingestionStatus = Dataset.IngestionStatus.SUCCESS
375+
})
373376
} catch (e: Exception) {
374377
if (safeReplace) {
375378
csmJedisPool.resource.use { jedis ->
@@ -380,6 +383,7 @@ class DatasetServiceImpl(
380383
"backupGraph-$datasetId")
381384
}
382385
}
386+
datasetRepository.save(dataset.apply { ingestionStatus = Dataset.IngestionStatus.ERROR })
383387
throw CsmClientException(e.message ?: "Twingraph upload error", e)
384388
}
385389
}
@@ -528,6 +532,9 @@ class DatasetServiceImpl(
528532
override fun rollbackRefresh(organizationId: String, datasetId: String): String {
529533
var dataset = getVerifiedDataset(organizationId, datasetId, PERMISSION_WRITE)
530534

535+
if (dataset.sourceType == DatasetSourceType.ETL)
536+
throw IllegalArgumentException("ETL source type can't be rollback")
537+
531538
val status = getDatasetTwingraphStatus(organizationId, datasetId)
532539
if (status != Dataset.IngestionStatus.ERROR.value) {
533540
throw IllegalArgumentException("The dataset hasn't failed and can't be rolled back")
@@ -762,27 +769,23 @@ class DatasetServiceImpl(
762769
val dataset = getDatasetWithStatus(organizationId, datasetId)
763770
csmRbac.verify(dataset.getRbac(), PERMISSION_WRITE)
764771
var result = ""
765-
trx(dataset) { localDataset ->
766-
when (type) {
767-
TYPE_NODE ->
768-
graphProperties.forEach {
769-
result +=
770-
query(
771-
localDataset,
772-
"CREATE (a:${it.type} {id:'${it.name}',${it.params}}) RETURN a")
773-
.toJsonString()
774-
}
775-
TYPE_RELATIONSHIP ->
776-
graphProperties.forEach {
777-
result +=
778-
query(
779-
localDataset,
780-
"MATCH (a),(b) WHERE a.id='${it.source}' AND b.id='${it.target}'" +
781-
"CREATE (a)-[r:${it.type} {id:'${it.name}', ${it.params}}]->(b) RETURN r")
782-
.toJsonString()
783-
}
784-
else -> throw CsmResourceNotFoundException("Bad Type : $type")
785-
}
772+
when (type) {
773+
TYPE_NODE ->
774+
graphProperties.forEach {
775+
result +=
776+
query(dataset, "CREATE (a:${it.type} {id:'${it.name}',${it.params}}) RETURN a")
777+
.toJsonString()
778+
}
779+
TYPE_RELATIONSHIP ->
780+
graphProperties.forEach {
781+
result +=
782+
query(
783+
dataset,
784+
"MATCH (a),(b) WHERE a.id='${it.source}' AND b.id='${it.target}'" +
785+
"CREATE (a)-[r:${it.type} {id:'${it.name}', ${it.params}}]->(b) RETURN r")
786+
.toJsonString()
787+
}
788+
else -> throw CsmResourceNotFoundException("Bad Type : $type")
786789
}
787790
return result
788791
}
@@ -891,27 +894,25 @@ class DatasetServiceImpl(
891894
val dataset = getDatasetWithStatus(organizationId, datasetId)
892895
csmRbac.verify(dataset.getRbac(), PERMISSION_WRITE)
893896
var result = ""
894-
trx(dataset) { localDataset ->
895-
when (type) {
896-
TYPE_NODE ->
897-
graphProperties.forEach {
898-
result +=
899-
query(
900-
localDataset,
901-
"MATCH (a {id:'${it.name}'}) SET a = {id:'${it.name}',${it.params}} RETURN a")
902-
.toJsonString()
903-
}
904-
TYPE_RELATIONSHIP ->
905-
graphProperties.forEach {
906-
result +=
907-
query(
908-
dataset,
909-
"MATCH ()-[r {id:'${it.name}'}]-() SET r = {id:'${it.name}', " +
910-
"${it.params}} RETURN r")
911-
.toJsonString()
912-
}
913-
else -> throw CsmResourceNotFoundException("Bad Type : $type")
914-
}
897+
when (type) {
898+
TYPE_NODE ->
899+
graphProperties.forEach {
900+
result +=
901+
query(
902+
dataset,
903+
"MATCH (a {id:'${it.name}'}) SET a = {id:'${it.name}',${it.params}} RETURN a")
904+
.toJsonString()
905+
}
906+
TYPE_RELATIONSHIP ->
907+
graphProperties.forEach {
908+
result +=
909+
query(
910+
dataset,
911+
"MATCH ()-[r {id:'${it.name}'}]-() SET r = {id:'${it.name}', " +
912+
"${it.params}} RETURN r")
913+
.toJsonString()
914+
}
915+
else -> throw CsmResourceNotFoundException("Bad Type : $type")
915916
}
916917
return result
917918
}
@@ -924,13 +925,11 @@ class DatasetServiceImpl(
924925
) {
925926
val dataset = getDatasetWithStatus(organizationId, datasetId)
926927
csmRbac.verify(dataset.getRbac(), PERMISSION_WRITE)
927-
return trx(dataset) { localDataset ->
928-
when (type) {
929-
TYPE_NODE -> ids.forEach { query(localDataset, "MATCH (a) WHERE a.id='$it' DELETE a") }
930-
TYPE_RELATIONSHIP ->
931-
ids.forEach { query(localDataset, "MATCH ()-[r]-() WHERE r.id='$it' DELETE r") }
932-
else -> throw CsmResourceNotFoundException("Bad Type : $type")
933-
}
928+
return when (type) {
929+
TYPE_NODE -> ids.forEach { query(dataset, "MATCH (a) WHERE a.id='$it' DELETE a") }
930+
TYPE_RELATIONSHIP ->
931+
ids.forEach { query(dataset, "MATCH ()-[r]-() WHERE r.id='$it' DELETE r") }
932+
else -> throw CsmResourceNotFoundException("Bad Type : $type")
934933
}
935934
}
936935

0 commit comments

Comments
 (0)