Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/kubernetes/helm-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ argo:
config:
csm:
platform:
twincache:
host: redis.host.changeme
password: changeme
port: 6379
username: default
useGraphModule: true
vendor: azure
argo:
base-uri: "http://argo-server:2746"
Expand Down
1 change: 1 addition & 0 deletions api/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ csm:
port: "6379"
username: "default_user"
password: "default_password"
useGraphModule: true
dataset:
# TODO: Should be way less than a thousand. See open ticket
maxResult: 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class DatasetServiceIntegrationTest : CsmRedisTestBase() {
@Autowired lateinit var solutionApiService: SolutionApiServiceInterface
@Autowired lateinit var workspaceApiService: WorkspaceApiServiceInterface
@SpykBean @Autowired lateinit var csmPlatformProperties: CsmPlatformProperties
@MockK(relaxUnitFun = true) private lateinit var eventPublisher: CsmEventPublisher
@MockK(relaxUnitFun = true) lateinit var eventPublisher: CsmEventPublisher

lateinit var connectorSaved: Connector
lateinit var dataset: Dataset
Expand Down Expand Up @@ -707,6 +707,7 @@ class DatasetServiceIntegrationTest : CsmRedisTestBase() {
@Test
fun `test uploadTwingraph status`() {
organizationSaved = organizationApiService.registerOrganization(organization)
dataset.apply { sourceType = DatasetSourceType.File }
datasetSaved = datasetApiService.createDataset(organizationSaved.id!!, dataset)
val file = this::class.java.getResource("/integrationTest.zip")?.file
val resource = ByteArrayResource(File(file!!).readBytes())
Expand All @@ -731,6 +732,7 @@ class DatasetServiceIntegrationTest : CsmRedisTestBase() {
@Test
fun `test uploadTwingraph fail set dataset status to error`() {
organizationSaved = organizationApiService.registerOrganization(organization)
dataset.apply { sourceType = DatasetSourceType.File }
datasetSaved = datasetApiService.createDataset(organizationSaved.id!!, dataset)
val file = this::class.java.getResource("/brokenGraph.zip")?.file
val resource = ByteArrayResource(File(file!!).readBytes())
Expand Down Expand Up @@ -790,6 +792,7 @@ class DatasetServiceIntegrationTest : CsmRedisTestBase() {
@Test
fun `reupload a twingraph in dataset with source type File`() {
organizationSaved = organizationApiService.registerOrganization(organization)
dataset.apply { sourceType = DatasetSourceType.File }
datasetSaved = datasetApiService.createDataset(organizationSaved.id!!, dataset)

val fileName = this::class.java.getResource("/integrationTest.zip")?.file
Expand Down Expand Up @@ -858,7 +861,7 @@ class DatasetServiceIntegrationTest : CsmRedisTestBase() {
every { getCurrentAccountIdentifier(any()) } returns CONNECTED_ADMIN_USER
organization = makeOrganizationWithRole("organization")
organizationSaved = organizationApiService.registerOrganization(organization)
makeDatasetWithRole(sourceType = DatasetSourceType.File)
dataset = makeDatasetWithRole(sourceType = DatasetSourceType.File)
datasetSaved = datasetApiService.createDataset(organizationSaved.id!!, dataset)

datasetRepository.save(datasetSaved.apply { ingestionStatus = IngestionStatusEnum.ERROR })
Expand Down Expand Up @@ -1045,7 +1048,7 @@ class DatasetServiceIntegrationTest : CsmRedisTestBase() {
fun makeDataset(
organizationId: String = organizationSaved.id!!,
parentId: String = "",
sourceType: DatasetSourceType = DatasetSourceType.File
sourceType: DatasetSourceType = DatasetSourceType.Twincache
): Dataset {
return Dataset(
id = UUID.randomUUID().toString(),
Expand All @@ -1065,7 +1068,7 @@ class DatasetServiceIntegrationTest : CsmRedisTestBase() {
parentId: String = "",
userName: String = TEST_USER_MAIL,
role: String = ROLE_ADMIN,
sourceType: DatasetSourceType = DatasetSourceType.File
sourceType: DatasetSourceType = DatasetSourceType.Twincache
): Dataset {
return Dataset(
id = UUID.randomUUID().toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1660,7 +1660,8 @@ class DatasetServiceRBACTest : CsmRedisTestBase() {

val organization = makeOrganizationWithRole(role = role)
organizationSaved = organizationApiService.registerOrganization(organization)
val dataset = makeDatasetWithRole(role = ROLE_ADMIN)
val dataset =
makeDatasetWithRole(role = ROLE_ADMIN, sourceType = DatasetSourceType.File)
datasetSaved = datasetApiService.createDataset(organizationSaved.id!!, dataset)
val fileName = this::class.java.getResource("/integrationTest.zip")?.file
val file = File(fileName!!)
Expand Down Expand Up @@ -1701,7 +1702,7 @@ class DatasetServiceRBACTest : CsmRedisTestBase() {

val organization = makeOrganizationWithRole()
organizationSaved = organizationApiService.registerOrganization(organization)
val dataset = makeDatasetWithRole(role = role)
val dataset = makeDatasetWithRole(role = role, sourceType = DatasetSourceType.File)
datasetSaved = datasetApiService.createDataset(organizationSaved.id!!, dataset)
val fileName = this::class.java.getResource("/integrationTest.zip")?.file
val file = File(fileName!!)
Expand Down Expand Up @@ -1830,9 +1831,9 @@ class DatasetServiceRBACTest : CsmRedisTestBase() {

val organization = makeOrganizationWithRole(role = role)
organizationSaved = organizationApiService.registerOrganization(organization)
val dataset = makeDatasetWithRole(role = ROLE_ADMIN)
val dataset =
makeDatasetWithRole(role = ROLE_ADMIN, sourceType = DatasetSourceType.None)
datasetSaved = datasetApiService.createDataset(organizationSaved.id!!, dataset)
materializeTwingraph()

every { getCurrentAccountIdentifier(any()) } returns TEST_USER_MAIL

Expand Down Expand Up @@ -2294,7 +2295,7 @@ class DatasetServiceRBACTest : CsmRedisTestBase() {
createTwingraph: Boolean = true
): Dataset {
dataset.apply {
if (createTwingraph) {
if (createTwingraph && !this.twingraphId.isNullOrBlank()) {
jedis.graphQuery(this.twingraphId, "CREATE (n:labelrouge)")
}
this.ingestionStatus = IngestionStatusEnum.SUCCESS
Expand All @@ -2314,7 +2315,7 @@ class DatasetServiceRBACTest : CsmRedisTestBase() {
fun makeDataset(
id: String,
name: String,
sourceType: DatasetSourceType = DatasetSourceType.File
sourceType: DatasetSourceType = DatasetSourceType.Twincache
): Dataset {
return Dataset(
id = id,
Expand Down Expand Up @@ -2365,16 +2366,17 @@ class DatasetServiceRBACTest : CsmRedisTestBase() {
parentId: String = "",
id: String = TEST_USER_MAIL,
role: String = ROLE_ADMIN,
sourceType: DatasetSourceType = DatasetSourceType.File
sourceType: DatasetSourceType = DatasetSourceType.Twincache
): Dataset {
val random = UUID.randomUUID().toString()
return Dataset(
id = UUID.randomUUID().toString(),
id = random,
name = "My datasetRbac",
organizationId = organizationId,
parentId = parentId,
ownerId = "ownerId",
connector = DatasetConnector(connectorSaved.id!!),
twingraphId = "graph",
twingraphId = "graph-${random}",
source = SourceInfo("location", "name", "path"),
tags = mutableListOf("dataset"),
sourceType = sourceType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ csm:
host: "localhost"
port: "6379"
username: "default"
useGraphModule: true
# Leave it as blank as there's no auth with test container
password:
dataset:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ import kotlinx.coroutines.launch
import org.apache.commons.compress.archivers.ArchiveStreamFactory
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVRecord
import org.apache.commons.lang3.NotImplementedException
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.event.EventListener
import org.springframework.core.io.ByteArrayResource
import org.springframework.core.io.Resource
Expand Down Expand Up @@ -130,6 +132,13 @@ class DatasetServiceImpl(
private val resourceScanner: ResourceScanner
) : CsmPhoenixService(), DatasetApiServiceInterface {

@Value("\${csm.platform.twincache.useGraphModule}") private var useGraphModule: Boolean = true

private val notImplementedExceptionMessage =
"The API is not configured to use Graph functionnalities. " +
"This endpoint is deactivated. " +
"To change that, set the API configuration entry 'csm.platform.twincache.useGraphModule' to true"

override fun findAllDatasets(organizationId: String, page: Int?, size: Int?): List<Dataset> {
organizationService.getVerifiedOrganization(organizationId)
val defaultPageSize = csmPlatformProperties.twincache.dataset.defaultPageSize
Expand Down Expand Up @@ -181,44 +190,52 @@ class DatasetServiceImpl(
dataset.takeUnless { it.name.isNullOrBlank() }
?: throw IllegalArgumentException("Name cannot be null or blank")

val datasetSourceType = dataset.sourceType
dataset.takeUnless {
dataset.sourceType in listOf(DatasetSourceType.ADT, DatasetSourceType.AzureStorage) &&
datasetSourceType in listOf(DatasetSourceType.ADT, DatasetSourceType.AzureStorage) &&
dataset.source == null
}
?: throw IllegalArgumentException(
"Source cannot be null for source type 'ADT' or 'Storage'")

val twingraphId = idGenerator.generate("twingraph")
if (dataset.sourceType != null) {
var twingraphId: String? = null

if (datasetSourceType == DatasetSourceType.Twincache && useGraphModule) {

twingraphId = idGenerator.generate("twingraph")
val twincacheConnector = getCreateTwincacheConnector()
dataset.connector =
DatasetConnector(
id = twincacheConnector.id,
parametersValues = mutableMapOf(TWINCACHE_NAME to twingraphId))
}

dataset.takeUnless { it.connector == null || dataset.connector!!.id.isNullOrBlank() }
?: throw IllegalArgumentException("Connector or its ID cannot be null or blank")

val existingConnector = connectorService.findConnectorById(dataset.connector!!.id!!)
logger.debug("Found connector: {}", existingConnector)

val createdDataset =
dataset.copy(
id = idGenerator.generate("dataset"),
twingraphId = twingraphId,
sourceType = dataset.sourceType ?: DatasetSourceType.None,
sourceType = datasetSourceType ?: DatasetSourceType.None,
source = dataset.source ?: SourceInfo("none"),
main = dataset.main ?: true,
creationDate = Instant.now().toEpochMilli(),
ingestionStatus = IngestionStatusEnum.NONE,
twincacheStatus = TwincacheStatusEnum.EMPTY,
ownerId = getCurrentAuthenticatedUserName(csmPlatformProperties),
organizationId = organizationId)
createdDataset.apply {
if (!twingraphId.isNullOrBlank()) {
this.twingraphId = twingraphId
}
}
createdDataset.setRbac(csmRbac.initSecurity(dataset.getRbac()))
createdDataset.connector!!.apply {
name = existingConnector.name
version = existingConnector.version

if (dataset.connector != null && !dataset.connector!!.id.isNullOrBlank()) {
val existingConnector = connectorService.findConnectorById(dataset.connector!!.id!!)
logger.debug("Found connector: {}", existingConnector)

createdDataset.connector!!.apply {
name = existingConnector.name
version = existingConnector.version
}
}

return datasetRepository.save(createdDataset)
Expand All @@ -229,6 +246,9 @@ class DatasetServiceImpl(
datasetId: String,
subDatasetGraphQuery: SubDatasetGraphQuery
): Dataset {

checkIfGraphFunctionalityIsAvailable()

val dataset =
getDatasetWithStatus(organizationId, datasetId, status = IngestionStatusEnum.SUCCESS)
csmRbac.verify(dataset.getRbac(), PERMISSION_CREATE_CHILDREN)
Expand Down Expand Up @@ -284,6 +304,12 @@ class DatasetServiceImpl(
return datasetSaved
}

private fun checkIfGraphFunctionalityIsAvailable() {
if (!useGraphModule) {
throw NotImplementedException(notImplementedExceptionMessage)
}
}

fun bulkQueryResult(queryBuffer: QueryBuffer, resultSet: ResultSet) {

resultSet.forEach { record: Record? ->
Expand Down Expand Up @@ -312,6 +338,8 @@ class DatasetServiceImpl(
datasetId: String,
body: Resource
): FileUploadValidation {

checkIfGraphFunctionalityIsAvailable()
val dataset = getDatasetWithStatus(organizationId, datasetId)
csmRbac.verify(dataset.getRbac(), PERMISSION_WRITE)

Expand Down Expand Up @@ -400,7 +428,7 @@ class DatasetServiceImpl(
null -> IngestionStatusEnum.NONE.value
DatasetSourceType.None -> {
var twincacheStatus = TwincacheStatusEnum.EMPTY
if (unifiedJedis.exists(dataset.twingraphId!!)) {
if (useGraphModule && unifiedJedis.exists(dataset.twingraphId!!)) {
twincacheStatus = TwincacheStatusEnum.FULL
}
datasetRepository.apply { dataset.twincacheStatus = twincacheStatus }
Expand All @@ -413,7 +441,7 @@ class DatasetServiceImpl(
}
if (dataset.ingestionStatus == IngestionStatusEnum.ERROR) {
return IngestionStatusEnum.ERROR.value
} else if (!unifiedJedis.exists(dataset.twingraphId!!)) {
} else if (useGraphModule && !unifiedJedis.exists(dataset.twingraphId!!)) {
IngestionStatusEnum.PENDING.value
} else {
dataset
Expand Down Expand Up @@ -483,6 +511,7 @@ class DatasetServiceImpl(
}

override fun refreshDataset(organizationId: String, datasetId: String): DatasetTwinGraphInfo {
checkIfGraphFunctionalityIsAvailable()
val dataset = getVerifiedDataset(organizationId, datasetId, PERMISSION_WRITE)

dataset.takeUnless { it.sourceType == DatasetSourceType.File }
Expand Down Expand Up @@ -527,6 +556,7 @@ class DatasetServiceImpl(
}

override fun rollbackRefresh(organizationId: String, datasetId: String): String {
checkIfGraphFunctionalityIsAvailable()
var dataset = getVerifiedDataset(organizationId, datasetId, PERMISSION_WRITE)

val status = getDatasetTwingraphStatus(organizationId, datasetId)
Expand All @@ -548,7 +578,7 @@ class DatasetServiceImpl(
override fun deleteDataset(organizationId: String, datasetId: String) {
val dataset = getVerifiedDataset(organizationId, datasetId, PERMISSION_DELETE)

if (unifiedJedis.exists(dataset.twingraphId!!)) {
if (useGraphModule && unifiedJedis.exists(dataset.twingraphId!!)) {
unifiedJedis.del(dataset.twingraphId!!)
}

Expand Down Expand Up @@ -618,6 +648,7 @@ class DatasetServiceImpl(
datasetId: String,
datasetTwinGraphQuery: DatasetTwinGraphQuery
): List<Any> {
checkIfGraphFunctionalityIsAvailable()
val dataset =
getDatasetWithStatus(organizationId, datasetId, status = IngestionStatusEnum.SUCCESS)

Expand Down Expand Up @@ -679,6 +710,7 @@ class DatasetServiceImpl(
twinGraphQuery: DatasetTwinGraphQuery,
body: Resource
): TwinGraphBatchResult {
checkIfGraphFunctionalityIsAvailable()
val dataset = getDatasetWithStatus(organizationId, datasetId)
csmRbac.verify(dataset.getRbac(), PERMISSION_WRITE)
resourceScanner.scanMimeTypes(body, listOf("text/csv", "text/plain"))
Expand All @@ -702,6 +734,7 @@ class DatasetServiceImpl(
datasetId: String,
datasetTwinGraphQuery: DatasetTwinGraphQuery
): DatasetTwinGraphHash {
checkIfGraphFunctionalityIsAvailable()
val dataset =
getDatasetWithStatus(organizationId, datasetId, status = IngestionStatusEnum.SUCCESS)
val bulkQueryKey = bulkQueryKey(dataset.twingraphId!!, datasetTwinGraphQuery.query, null)
Expand All @@ -724,6 +757,7 @@ class DatasetServiceImpl(
}

override fun downloadTwingraph(organizationId: String, hash: String): Resource {
checkIfGraphFunctionalityIsAvailable()
organizationService.getVerifiedOrganization(organizationId)

val bulkQueryId = bulkQueryKey(hash)
Expand Down Expand Up @@ -774,6 +808,7 @@ class DatasetServiceImpl(
type: String,
graphProperties: List<GraphProperties>
): String {
checkIfGraphFunctionalityIsAvailable()
val dataset = getDatasetWithStatus(organizationId, datasetId)
csmRbac.verify(dataset.getRbac(), PERMISSION_WRITE)
var result = ""
Expand Down Expand Up @@ -808,6 +843,7 @@ class DatasetServiceImpl(
type: String,
ids: List<String>
): String {
checkIfGraphFunctionalityIsAvailable()
val dataset = getDatasetWithStatus(organizationId, datasetId)
var result = ""
when (type) {
Expand Down Expand Up @@ -903,6 +939,7 @@ class DatasetServiceImpl(
type: String,
graphProperties: List<GraphProperties>
): String {
checkIfGraphFunctionalityIsAvailable()
val dataset = getDatasetWithStatus(organizationId, datasetId)
csmRbac.verify(dataset.getRbac(), PERMISSION_WRITE)
var result = ""
Expand Down Expand Up @@ -937,6 +974,7 @@ class DatasetServiceImpl(
type: String,
ids: List<String>
) {
checkIfGraphFunctionalityIsAvailable()
val dataset = getDatasetWithStatus(organizationId, datasetId)
csmRbac.verify(dataset.getRbac(), PERMISSION_WRITE)
return trx(dataset) { localDataset ->
Expand Down
Loading
Loading