Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
postgres:
image: postgres:15
env:
POSTGRES_PASSWORD: postgres
POSTGRES_PASSWORD: changeme
POSTGRES_DB: atum_db
options: >-
--health-cmd pg_isready
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class AtumContext private[agent] (
val retrievedAD = agent.updateAdditionalData(this.atumPartitions, currAdditionalDataSubmit)

// Could be different from the one that was submitted. Replacing, just to have the most fresh copy possible.
this.additionalData = retrievedAD.data.map { case (k, v) => (k, v.map(_.value)) }
this.additionalData = retrievedAD.data.map { case (k, v) => (k, v.map(_.value)) }.toMap
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn't be needed

Copy link
Collaborator Author

@salamonpavel salamonpavel Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is needed as now the data come from multisuccessresponse as a sequence

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry looked at wrong class

this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,7 @@ package za.co.absa.atum.agent.dispatcher

import com.typesafe.config.Config
import org.apache.spark.internal.Logging
import za.co.absa.atum.model.dto.{
AdditionalDataDTO,
AdditionalDataItemDTO,
AdditionalDataPatchDTO,
AtumContextDTO,
CheckpointDTO,
PartitioningDTO,
PartitioningSubmitDTO
}
import za.co.absa.atum.model.dto._

/**
* dispatcher useful for development, testing and debugging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,7 @@
package za.co.absa.atum.agent.dispatcher

import com.typesafe.config.Config
import za.co.absa.atum.model.dto.{
AdditionalDataDTO,
AdditionalDataPatchDTO,
AtumContextDTO,
CheckpointDTO,
PartitioningDTO,
PartitioningSubmitDTO
}
import za.co.absa.atum.model.dto._

/**
* This class provides a contract for different dispatchers. It has a constructor foe eventual creation via reflection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import sttp.capabilities
import sttp.client3._
import sttp.model.Uri
import sttp.client3.okhttp.OkHttpSyncBackend
import sttp.model.StatusCode
import za.co.absa.atum.agent.exception.AtumAgentException.HttpException
import za.co.absa.atum.model.ApiPaths
import za.co.absa.atum.model.dto._
Expand All @@ -33,7 +34,6 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {

private val serverUrl: String = config.getString(UrlKey)

private val apiV1 = s"/${ApiPaths.Api}/${ApiPaths.V1}"
private val apiV2 = s"/${ApiPaths.Api}/${ApiPaths.V2}"

private val getPartitioningIdEndpoint = Uri.unsafeParse(s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}")
Expand Down Expand Up @@ -64,8 +64,8 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {

val response = backend.send(request)

response.code.code match {
case 404 => None
response.code match {
case StatusCode.NotFound => None
case _ => Some(handleResponseBody(response).as[SingleSuccessResponse[PartitioningWithIdDTO]].data)
}
}
Expand All @@ -74,6 +74,8 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
val parentPartitioningIdOpt = partitioning.parentPartitioning.map(getPartitioningId)
val partitioningWithIdOpt = getPartitioning(partitioning.partitioning)


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary empty lines


val newPartitioningWithIdDTO = partitioningWithIdOpt.getOrElse {
val endpoint = Uri.unsafeParse(s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}")
val request = commonAtumRequest
Expand Down Expand Up @@ -104,27 +106,40 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
)
val req = commonAtumRequest.get(endpoint)
val resp = backend.send(req)
handleResponseBody(resp)
.as[SingleSuccessResponse[AdditionalDataDTO.Data]]
.data
.map(item => item._1 -> item._2.map(_.value))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary empty lines


handleResponseBody(resp).as[MultiSuccessResponse[AdditionalDataItemV2DTO]]
}

AtumContextDTO(
partitioning = newPartitioningWithIdDTO.partitioning,
measures = measures,
additionalData = additionalData
additionalData = additionalData.data.map(item => item.key -> item.value).toMap
)
}

override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
val endpoint = Uri.unsafeParse(s"$serverUrl$apiV1/${ApiPaths.V1Paths.CreateCheckpoint}")
val partitioningId = getPartitioningId(checkpoint.partitioning)

val checkpointV2DTO = CheckpointV2DTO(
id = checkpoint.id,
name = checkpoint.name,
author = checkpoint.author,
measuredByAtumAgent = checkpoint.measuredByAtumAgent,
processStartTime = checkpoint.processStartTime,
processEndTime = checkpoint.processEndTime,
measurements = checkpoint.measurements,
properties = checkpoint.properties
)

val endpoint = Uri.unsafeParse(
s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}/$partitioningId/${ApiPaths.V2Paths.Checkpoints}"
)
val request = commonAtumRequest
.post(endpoint)
.body(checkpoint.asJsonString)
.body(checkpointV2DTO.asJsonString)

val response = backend.send(request)

handleResponseBody(response)
}

Expand All @@ -145,7 +160,14 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {

val response = backend.send(request)

handleResponseBody(response).as[SingleSuccessResponse[AdditionalDataDTO]].data
val data: AdditionalDataDTO.Data = handleResponseBody(response).as[MultiSuccessResponse[AdditionalDataItemV2DTO]]
.data
.map( item => item.value match {
case Some(_) => item.key -> Some(AdditionalDataItemDTO(item.value.get, item.author))
case None => item.key -> None
}).toMap

AdditionalDataDTO(data)
}

private def handleResponseBody(response: Response[Either[String, String]]): String = {
Expand Down
2 changes: 1 addition & 1 deletion agent/src/test/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
atum.dispatcher.type="console"

# The REST API URI of the atum server
# atum.dispatcher.http.url="http://localhost:8080"
#atum.dispatcher.http.url="http://localhost:8080"

# Maximum number of dispatch captures to keep in memory
# atum.dispatcher.capture.capture-limit=1000 # 0 means no limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,28 @@ class AgentServerCompatibilityTests extends DBTestSuite {
val rdd = spark.sparkContext.parallelize(testDataForRDD)
val df = spark.createDataFrame(rdd, testDataSchema)

// Atum Agent preparation - Agent configured to work with HTTP Dispatcher and service on localhost
val agent: AtumAgent = new AtumAgent {
override val dispatcher: HttpDispatcher = new HttpDispatcher(
ConfigFactory
.empty()
.withValue("atum.dispatcher.type", ConfigValueFactory.fromAnyRef("http"))
.withValue("atum.dispatcher.http.url", ConfigValueFactory.fromAnyRef("http://localhost:8080"))
)
}

// Atum Context stuff preparation - Partitioning, Measures, Additional Data, Checkpoint
val domainAtumPartitioning = ListMap(
"partition1" -> "valueFromTest1",
"partition2" -> "valueFromTest2"
)
val domainAtumContext = agent.getOrCreateAtumContext(domainAtumPartitioning)

domainAtumContext.addMeasure(RecordCount())
domainAtumContext.addAdditionalData("author", "Laco")
domainAtumContext.addAdditionalData(Map("author" -> "LacoNew", "version" -> "1.0"))

domainAtumContext.createCheckpoint("checkPointNameCount", df)
// Atum Agent preparation - Agent configured to work with HTTP Dispatcher and service on localhost
val agent: AtumAgent = new AtumAgent {
override val dispatcher: HttpDispatcher = new HttpDispatcher(
ConfigFactory
.empty()
.withValue("atum.dispatcher.type", ConfigValueFactory.fromAnyRef("http"))
.withValue("atum.dispatcher.http.url", ConfigValueFactory.fromAnyRef("http://localhost:8080"))
)
}

// Atum Context stuff preparation - Partitioning, Measures, Additional Data, Checkpoint
val domainAtumPartitioning = ListMap(
"partition1" -> "valueFromTest1",
"partition2" -> "valueFromTest2"
)
val domainAtumContext = agent.getOrCreateAtumContext(domainAtumPartitioning)

domainAtumContext.addMeasure(RecordCount())
domainAtumContext.addAdditionalData("author", "Laco")
domainAtumContext.addAdditionalData(Map("author" -> "LacoNew", "version" -> "1.0"))

domainAtumContext.createCheckpoint("checkPointNameCount", df)

// DB Check, data should be written in the DB
table("runs.partitionings").all() { partitioningsResult =>
Expand All @@ -94,35 +94,45 @@ class AgentServerCompatibilityTests extends DBTestSuite {
}

table("runs.additional_data").all() { adResult =>
val expectedMap = Map(
"author" -> "LacoNew",
"version" -> "1.0"
)

assert(adResult.hasNext)
val row = adResult.next()

assert(row.getString("ad_name").contains("author"))
assert(row.getString("ad_value").contains("LacoNew"))
val adName1 = row.getString("ad_name").get
val adValue1 = row.getString("ad_value").get

assert(expectedMap(adName1) == adValue1)

assert(adResult.hasNext)
val row2 = adResult.next()

assert(row2.getString("ad_name").contains("version"))
assert(row2.getString("ad_value").contains("1.0"))
val adName2 = row2.getString("ad_name").get
val adValue2 = row2.getString("ad_value").get

assert(expectedMap(adName2) == adValue2)

assert(!adResult.hasNext)
}

table("runs.additional_data_history").all() { adHistResult =>
table("runs.additional_data_history").all() { adHistResult =>
assert(adHistResult.hasNext)
val row = adHistResult.next()

assert(row.getString("ad_name").contains("author"))
assert(row.getString("ad_value").contains("Laco"))

assert(!adHistResult.hasNext)
assert(!adHistResult.hasNext) // failing when executed multiple times
}

table("runs.measure_definitions").all() { measureDefResult =>
assert(measureDefResult.hasNext)
val row = measureDefResult.next()

assert(row.getString("measure_name").contains("*"))
assert(row.getString("measure_name").contains("count"))
assert(row.getString("measured_columns").contains("{}"))

assert(!measureDefResult.hasNext)
Expand All @@ -135,7 +145,5 @@ class AgentServerCompatibilityTests extends DBTestSuite {
assert(row.getJsonB("measurement_value").contains(expectedMeasurement))
assert(!measurementsResult.hasNext)
}

// TODO Truncate data potentially, Balta might support this in the near future
}
}
Loading