Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,12 @@ class TriggerRunnerEvent(
val runnerId: String
) : CsmRequestResponseEvent<String>(publisher)

class AskRunnerStatusEvent(
publisher: Any,
val organizationId: String,
val workspaceId: String,
val runnerId: String
) : CsmRequestResponseEvent<String>(publisher)

class RunnerDeleted(
publisher: Any,
val organizationId: String,
val workspaceId: String,
val runnerId: String
val runnerId: String,
val datasetParameterId: String
) : CsmEvent(publisher)

class UpdateRunnerStatus(
Expand All @@ -30,3 +24,10 @@ class UpdateRunnerStatus(
val runnerId: String,
val lastRunId: String,
) : CsmRequestResponseEvent<String>(publisher)

class GetRunnerAttachedToDataset(
publisher: Any,
val organizationId: String,
val workspaceId: String,
val datasetId: String
) : CsmRequestResponseEvent<String>(publisher)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package com.cosmotech.dataset.service

import com.cosmotech.common.config.CsmPlatformProperties
import com.cosmotech.common.config.existTable
import com.cosmotech.common.events.CsmEventPublisher
import com.cosmotech.common.events.GetRunnerAttachedToDataset
import com.cosmotech.common.exceptions.CsmAccessForbiddenException
import com.cosmotech.common.exceptions.CsmResourceNotFoundException
import com.cosmotech.common.rbac.ROLE_ADMIN
Expand Down Expand Up @@ -44,6 +46,7 @@ import com.cosmotech.workspace.domain.WorkspaceAccessControl
import com.cosmotech.workspace.domain.WorkspaceCreateRequest
import com.cosmotech.workspace.domain.WorkspaceSecurity
import com.cosmotech.workspace.domain.WorkspaceSolution
import com.ninjasquad.springmockk.SpykBean
import com.redis.om.spring.indexing.RediSearchIndexer
import io.awspring.cloud.s3.S3Template
import io.mockk.every
Expand Down Expand Up @@ -107,6 +110,7 @@ class DatasetServiceIntegrationTest() : CsmTestBase() {
@Autowired lateinit var s3Template: S3Template
@Autowired lateinit var resourceLoader: ResourceLoader
@Autowired lateinit var writerJdbcTemplate: JdbcTemplate
@SpykBean @Autowired private lateinit var eventPublisher: CsmEventPublisher

lateinit var organization: OrganizationCreateRequest
lateinit var workspace: WorkspaceCreateRequest
Expand Down Expand Up @@ -700,6 +704,66 @@ class DatasetServiceIntegrationTest() : CsmTestBase() {
assertFalse(s3Template.objectExists(csmPlatformProperties.s3.bucketName, fileKeyPath))
}

@Test
fun `test deleteDataset when its a Runner dataset parameter`() {

val datasetPartName = "Customers list"
val datasetPartDescription = "List of customers"
val datasetPartTags = mutableListOf("part", "public", "customers")
val datasetPartCreateRequest =
DatasetPartCreateRequest(
name = datasetPartName,
sourceName = CUSTOMER_SOURCE_FILE_NAME,
description = datasetPartDescription,
tags = datasetPartTags,
type = DatasetPartTypeEnum.File)

val datasetName = "Customer Dataset"
val datasetDescription = "Dataset for customers"
val datasetTags = mutableListOf("dataset", "public", "customers")
val datasetCreateRequest =
DatasetCreateRequest(
name = datasetName,
description = datasetDescription,
tags = datasetTags,
parts = mutableListOf(datasetPartCreateRequest))

val resourceTestFile = resourceLoader.getResource("classpath:/$CUSTOMER_SOURCE_FILE_NAME").file

val fileToSend = FileInputStream(resourceTestFile)

val mockMultipartFile =
MockMultipartFile(
"files",
CUSTOMER_SOURCE_FILE_NAME,
MediaType.MULTIPART_FORM_DATA_VALUE,
IOUtils.toByteArray(fileToSend))

val datasetId =
datasetApiService
.createDataset(
organizationSaved.id,
workspaceSaved.id,
datasetCreateRequest,
arrayOf(mockMultipartFile))
.id

val fakeRunnerId = "r-XXXXXX"
every { eventPublisher.publishEvent(any()) } answers
{
firstArg<GetRunnerAttachedToDataset>().response = fakeRunnerId
}

val exception =
assertThrows<IllegalArgumentException> {
datasetApiService.deleteDataset(organizationSaved.id, workspaceSaved.id, datasetId)
}

assertEquals(
"Dataset $datasetId is defined as a runner dataset ($fakeRunnerId). It cannot be deleted",
exception.message)
}

@Test
fun `test getDataset with no dataset part`() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package com.cosmotech.dataset.service

import com.cosmotech.common.CsmPhoenixService
import com.cosmotech.common.config.DATASET_INPUTS_SCHEMA
import com.cosmotech.common.events.GetRunnerAttachedToDataset
import com.cosmotech.common.events.RunnerDeleted
import com.cosmotech.common.exceptions.CsmResourceNotFoundException
import com.cosmotech.common.id.generateId
import com.cosmotech.common.rbac.CsmRbac
Expand Down Expand Up @@ -48,6 +50,7 @@ import org.apache.commons.lang3.StringUtils
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection
import org.postgresql.util.PSQLException
import org.springframework.context.event.EventListener
import org.springframework.core.io.ByteArrayResource
import org.springframework.core.io.Resource
import org.springframework.jdbc.core.JdbcTemplate
Expand Down Expand Up @@ -193,6 +196,17 @@ class DatasetServiceImpl(

override fun deleteDataset(organizationId: String, workspaceId: String, datasetId: String) {
val dataset = getVerifiedDataset(organizationId, workspaceId, datasetId, PERMISSION_DELETE)

val getRunnerAttachedToDatasetEvent =
GetRunnerAttachedToDataset(this, organizationId, workspaceId, datasetId)

eventPublisher.publishEvent(getRunnerAttachedToDatasetEvent)

val datasetAttachedToRunnerId = getRunnerAttachedToDatasetEvent.response
require(datasetAttachedToRunnerId == null || datasetAttachedToRunnerId.isEmpty()) {
"Dataset $datasetId is defined as a runner dataset ($datasetAttachedToRunnerId). It cannot be deleted"
}

datasetRepository.delete(dataset)
dataset.parts.forEach {
datasetPartRepository.delete(it)
Expand Down Expand Up @@ -935,6 +949,25 @@ class DatasetServiceImpl(
return datasetList
}

@EventListener(RunnerDeleted::class)
fun onRunnerDeleted(runnerDeletedEvent: RunnerDeleted) {
val organizationId = runnerDeletedEvent.organizationId
val workspaceId = runnerDeletedEvent.workspaceId
val datasetParameterId = runnerDeletedEvent.datasetParameterId

val dataset =
datasetRepository.findBy(organizationId, workspaceId, datasetParameterId).orElseThrow {
CsmResourceNotFoundException(
"Dataset $datasetParameterId not found in organization $organizationId and workspace $workspaceId")
}

datasetRepository.delete(dataset)
dataset.parts.forEach {
datasetPartRepository.delete(it)
datasetPartManagementFactory.removeData(it)
}
}

private fun validDatasetPartCreateRequest(
datasetPartCreateRequest: DatasetPartCreateRequest,
file: MultipartFile
Expand Down
2 changes: 1 addition & 1 deletion doc/Models/Runner.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
| **solutionName** | **String** | the Solution name | [optional] [default to null] |
| **runTemplateName** | **String** | the Solution Run Template name associated with this Runner | [optional] [default to null] |
| **additionalData** | [**Map**](AnyType.md) | Free form additional data | [optional] [default to null] |
| **datasets** | [**RunnerDatasets**](RunnerDatasets.md) | | [default to null] |
| **datasets** | [**RunnerDatasets**](RunnerDatasets.md) | definition of datasets used by the runner | [default to null] |
| **runSizing** | [**RunnerResourceSizing**](RunnerResourceSizing.md) | | [optional] [default to null] |
| **parametersValues** | [**List**](RunnerRunTemplateParameterValue.md) | the list of Solution Run Template parameters values | [default to null] |
| **lastRunInfo** | [**LastRunInfo**](LastRunInfo.md) | | [default to null] |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.cosmotech.runner.service
import com.cosmotech.common.config.CsmPlatformProperties
import com.cosmotech.common.containerregistry.ContainerRegistryService
import com.cosmotech.common.events.CsmEventPublisher
import com.cosmotech.common.events.GetRunnerAttachedToDataset
import com.cosmotech.common.events.HasRunningRuns
import com.cosmotech.common.events.RunStart
import com.cosmotech.common.events.UpdateRunnerStatus
Expand Down Expand Up @@ -2157,6 +2158,34 @@ class RunnerServiceIntegrationTest : CsmTestBase() {
assertEquals(0, datasetParameterFromChildRunner.parts.size)
}

@Test
fun `test onGetRunnerAttachedToDataset behaviour`() {

logger.info(
"should create a new Runner and retrieve parameter varType from solution ignoring the one declared")
val newRunner =
makeRunnerCreateRequest(
name = "NewRunner",
datasetList = mutableListOf(datasetSaved.id),
parametersValues =
mutableListOf(
RunnerRunTemplateParameterValue(
parameterId = "param1", value = "7", varType = "ignored_var_type")))
val newRunnerSaved =
runnerApiService.createRunner(organizationSaved.id, workspaceSaved.id, newRunner)

assertNotNull(newRunnerSaved)
assertNotNull(newRunnerSaved.datasets)
val datasetParameterId = newRunnerSaved.datasets.parameter

val getAttachedRunnerToDataset =
GetRunnerAttachedToDataset(
this, organizationSaved.id, workspaceSaved.id, datasetParameterId)
eventPublisher.publishEvent(getAttachedRunnerToDataset)

assertEquals(newRunnerSaved.id, getAttachedRunnerToDataset.response)
}

fun makeDataset(
name: String = "name",
): DatasetCreateRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,6 @@ interface RunnerRepository : RedisDocumentRepository<Runner, String> {
@Sanitize @Param("runnerId") runnerId: String
): Optional<Runner>

@Query(
"(@organizationId:{\$organizationId} @workspaceId:{\$workspaceId}) @validationStatus:\$validationStatus")
fun findByValidationStatus(
@Sanitize @Param("organizationId") organizationId: String,
@Sanitize @Param("workspaceId") workspaceId: String,
@Sanitize @Param("validationStatus") validationStatus: String,
pageable: Pageable
): Page<Runner>

@Query(
"(@organizationId:{\$organizationId} @workspaceId:{\$workspaceId}) " +
"@validationStatus:\$validationStatus \$securityConstraint")
fun findByValidationStatusAndSecurity(
@Sanitize @Param("organizationId") organizationId: String,
@Sanitize @Param("workspaceId") workspaceId: String,
@Sanitize @Param("validationStatus") validationStatus: String,
@SecurityConstraint @Param("securityConstraint") securityConstraint: String,
pageable: Pageable
): Page<Runner>

@Query("(@organizationId:{\$organizationId} @workspaceId:{\$workspaceId} @parentId:{\$parentId})")
fun findByParentId(
@Sanitize @Param("organizationId") organizationId: String,
Expand All @@ -51,22 +31,6 @@ interface RunnerRepository : RedisDocumentRepository<Runner, String> {
pageable: Pageable
): Page<Runner>

@Query("(@organizationId:{\$organizationId})")
fun findByOrganizationId(
@Sanitize @Param("organizationId") organizationId: String,
pageable: Pageable
): Page<Runner>

@Query(
"(@organizationId:{\$organizationId} @workspaceId:{\$workspaceId} @parentId:{\$parentId}) \$securityConstraint")
fun findByParentIdAndSecurity(
@Sanitize @Param("organizationId") organizationId: String,
@Sanitize @Param("workspaceId") workspaceId: String,
@Sanitize @Param("parentId") parentId: String,
@SecurityConstraint @Param("securityConstraint") securityConstraint: String,
pageable: Pageable
): Page<Runner>

@Query("(@organizationId:{\$organizationId} @workspaceId:{\$workspaceId})")
fun findByWorkspaceId(
@Sanitize @Param("organizationId") organizationId: String,
Expand All @@ -81,4 +45,12 @@ interface RunnerRepository : RedisDocumentRepository<Runner, String> {
@SecurityConstraint @Param("securityConstraint") securityConstraint: String,
pageable: Pageable
): Page<Runner>

@Query(
"@organizationId:{\$organizationId} @workspaceId:{\$workspaceId} @datasets_parameter:{\$datasetId}")
fun findByOrganizationIdAndWorkspaceIdAndDatasetsParameterValue(
@Sanitize @Param("organizationId") organizationId: String,
@Sanitize @Param("workspaceId") workspaceId: String,
@Sanitize @Param("datasetId") datasetId: String
): Optional<Runner>
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package com.cosmotech.runner.service

import com.cosmotech.common.config.CsmPlatformProperties
import com.cosmotech.common.events.GetRunnerAttachedToDataset
import com.cosmotech.common.events.RunDeleted
import com.cosmotech.common.rbac.PERMISSION_CREATE_CHILDREN
import com.cosmotech.common.rbac.PERMISSION_DELETE
Expand Down Expand Up @@ -269,4 +270,17 @@ internal class RunnerApiServiceImpl(
runnerService.getInstance(runDeleted.runnerId)
}
}

@EventListener(GetRunnerAttachedToDataset::class)
fun onGetAttachedRunnerToDataset(getRunnerAttachedToDataset: GetRunnerAttachedToDataset) {
val organizationId = getRunnerAttachedToDataset.organizationId
val workspaceId = getRunnerAttachedToDataset.workspaceId
val datasetId = getRunnerAttachedToDataset.datasetId
val runnerService = getRunnerService().inOrganization(organizationId).inWorkspace(workspaceId)

val runnerId =
runnerService.findRunnerByDatasetParameter(organizationId, workspaceId, datasetId)?.id

getRunnerAttachedToDataset.response = runnerId
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import com.cosmotech.workspace.domain.Workspace
import com.cosmotech.workspace.service.toGenericSecurity
import java.time.Instant
import kotlin.collections.mutableListOf
import kotlin.jvm.optionals.getOrNull
import org.springframework.context.annotation.Scope
import org.springframework.data.domain.PageRequest
import org.springframework.data.domain.Pageable
Expand Down Expand Up @@ -140,10 +141,11 @@ class RunnerService(
newRoots.forEach { updateChildrenRootId(parent = it, newRootId = it.id) }

// Notify the deletion
val runnerDeleted = RunnerDeleted(this, runner.organizationId, runner.workspaceId, runner.id)
val runnerDeleted =
RunnerDeleted(
this, runner.organizationId, runner.workspaceId, runner.id, runner.datasets.parameter)
this.eventPublisher.publishEvent(runnerDeleted)
datasetApiService.deleteDataset(
runner.organizationId, runner.workspaceId, runner.datasets.parameter)

return runnerRepository.delete(runnerInstance.getRunnerDataObjet())
}

Expand Down Expand Up @@ -175,6 +177,17 @@ class RunnerService(
}
}

fun findRunnerByDatasetParameter(
organizationId: String,
workspaceId: String,
datasetId: String
): Runner? {
return runnerRepository
.findByOrganizationIdAndWorkspaceIdAndDatasetsParameterValue(
organizationId, workspaceId, datasetId)
.getOrNull()
}

fun saveInstance(runnerInstance: RunnerInstance): Runner {
return runnerRepository.save(runnerInstance.getRunnerDataObjet())
}
Expand Down
5 changes: 4 additions & 1 deletion runner/src/main/openapi/runner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,10 @@ components:
description: Free form additional data
additionalProperties: true
datasets:
x-field-extra-annotation: "@com.redis.om.spring.annotations.Indexed"
description: definition of datasets used by the runner
$ref: "#/components/schemas/RunnerDatasets"
allOf:
- $ref: "#/components/schemas/RunnerDatasets"
runSizing:
description: definition of resources needed for the runner run
$ref: "#/components/schemas/RunnerResourceSizing"
Expand Down Expand Up @@ -907,6 +909,7 @@ components:
items:
type: string
parameter:
x-field-extra-annotation: "@com.redis.om.spring.annotations.Indexed"
description: The dataset id used for dataset parameters on current Runner
type: string
parameters:
Expand Down