Skip to content

Commit 4576034

Browse files
committed
create partitioning (atum context) using v2 endpoints
1 parent 50f6d9c commit 4576034

File tree

4 files changed

+251
-18
lines changed

4 files changed

+251
-18
lines changed

agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala

Lines changed: 67 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,9 @@ import sttp.client3.okhttp.OkHttpSyncBackend
2525
import za.co.absa.atum.agent.exception.AtumAgentException.HttpException
2626
import za.co.absa.atum.model.ApiPaths
2727
import za.co.absa.atum.model.dto._
28-
import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse
28+
import za.co.absa.atum.model.envelopes.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
2929
import za.co.absa.atum.model.utils.JsonSyntaxExtensions._
3030

31-
3231
class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
3332
import HttpDispatcher._
3433

@@ -37,45 +36,99 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
3736
private val apiV1 = s"/${ApiPaths.Api}/${ApiPaths.V1}"
3837
private val apiV2 = s"/${ApiPaths.Api}/${ApiPaths.V2}"
3938

40-
private val createPartitioningEndpoint = Uri.unsafeParse(s"$serverUrl$apiV1/${ApiPaths.V1Paths.CreatePartitioning}")
4139
private val createCheckpointEndpoint = Uri.unsafeParse(s"$serverUrl$apiV1/${ApiPaths.V1Paths.CreateCheckpoint}")
4240

4341
private val getPartitioningIdEndpoint = Uri.unsafeParse(s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}")
4442
private def createAdditionalDataEndpoint(partitioningId: Long): Uri =
45-
Uri.unsafeParse(s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}/$partitioningId/${ApiPaths.V2Paths.AdditionalData}")
43+
Uri.unsafeParse(
44+
s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}/$partitioningId/${ApiPaths.V2Paths.AdditionalData}"
45+
)
46+
private val postPartitioningEndpoint = Uri.unsafeParse(s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}")
47+
private def createGetPartitioningMeasuresEndpoint(partitioningId: Long) = Uri.unsafeParse(
48+
s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}/$partitioningId/${ApiPaths.V2Paths.Measures}"
49+
)
50+
private def createGetPartitioningAdditionalDataEndpoint(partitioningId: Long) = Uri.unsafeParse(
51+
s"$serverUrl$apiV2/${ApiPaths.V2Paths.Partitionings}/$partitioningId/${ApiPaths.V2Paths.AdditionalData}"
52+
)
4653

4754
private val commonAtumRequest = basicRequest
4855
.header("Content-Type", "application/json")
4956
.response(asString)
5057

51-
private val backend: SttpBackend[Identity, capabilities.WebSockets] = OkHttpSyncBackend()
58+
private[dispatcher] val backend: SttpBackend[Identity, capabilities.WebSockets] = OkHttpSyncBackend()
5259

5360
logInfo("using http dispatcher")
5461
logInfo(s"serverUrl $serverUrl")
5562

5663
/**
5764
* This method is used to get the partitioning ID from the server.
5865
* @param partitioning: Partitioning to obtain ID for.
59-
* @return Long ID of the partitioning.
66+
* @return Option[Long] ID of the partitioning.
6067
*/
68+
private[dispatcher] def getPartitioningId(partitioning: PartitioningDTO): Option[Long] = {
69+
val encodedPartitioning = partitioning.asBase64EncodedJsonString
70+
val request = commonAtumRequest.get(getPartitioningIdEndpoint.addParam("partitioning", encodedPartitioning))
71+
72+
val response = backend.send(request)
73+
74+
response.code.code match {
75+
case 404 => None
76+
case _ =>
77+
Some(handleResponseBody(response).as[SingleSuccessResponse[PartitioningWithIdDTO]].data.id)
78+
}
79+
}
6180

62-
private[dispatcher] def getPartitioningId(partitioning: PartitioningDTO): Long = {
81+
private[dispatcher] def getPartitioning(partitioning: PartitioningDTO): Option[PartitioningWithIdDTO] = {
6382
val encodedPartitioning = partitioning.asBase64EncodedJsonString
6483
val request = commonAtumRequest.get(getPartitioningIdEndpoint.addParam("partitioning", encodedPartitioning))
6584

6685
val response = backend.send(request)
6786

68-
handleResponseBody(response).as[SingleSuccessResponse[PartitioningWithIdDTO]].data.id
87+
response.code.code match {
88+
case 404 => None
89+
case _ => Some(handleResponseBody(response).as[SingleSuccessResponse[PartitioningWithIdDTO]].data)
90+
}
6991
}
7092

93+
// should be probably renamed, suggestions welcomed :)
7194
override protected[agent] def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = {
72-
val request = commonAtumRequest
73-
.post(createPartitioningEndpoint)
74-
.body(partitioning.asJsonString)
95+
val parentPartitioningIdOpt = partitioning.parentPartitioning.flatMap(getPartitioningId)
96+
val partitioningWithIdOpt = getPartitioning(partitioning.partitioning)
97+
98+
val newPartitioningWithIdDTO = partitioningWithIdOpt.getOrElse {
99+
val request = commonAtumRequest
100+
.post(postPartitioningEndpoint)
101+
.body(
102+
PartitioningSubmitV2DTO(
103+
partitioning.partitioning,
104+
parentPartitioningIdOpt,
105+
partitioning.authorIfNew
106+
).asJsonString
107+
)
108+
val response = backend.send(request)
109+
handleResponseBody(response).as[SingleSuccessResponse[PartitioningWithIdDTO]].data
110+
}
75111

76-
val response = backend.send(request)
112+
val measures = {
113+
val req = commonAtumRequest.get(createGetPartitioningMeasuresEndpoint(newPartitioningWithIdDTO.id))
114+
val resp = backend.send(req)
115+
handleResponseBody(resp).as[MultiSuccessResponse[MeasureDTO]].data.toSet
116+
}
117+
118+
val additionalData = {
119+
val req = commonAtumRequest.get(createGetPartitioningAdditionalDataEndpoint(newPartitioningWithIdDTO.id))
120+
val resp = backend.send(req)
121+
handleResponseBody(resp)
122+
.as[SingleSuccessResponse[AdditionalDataDTO.Data]]
123+
.data
124+
.map(item => item._1 -> item._2.map(_.value))
125+
}
77126

78-
handleResponseBody(response).as[AtumContextDTO]
127+
AtumContextDTO(
128+
partitioning = newPartitioningWithIdDTO.partitioning,
129+
measures = measures,
130+
additionalData = additionalData
131+
)
79132
}
80133

81134
override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
@@ -96,7 +149,7 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
96149
log.debug(s"Got partitioning ID: '$partitioningId'")
97150

98151
val request = commonAtumRequest
99-
.patch(createAdditionalDataEndpoint(partitioningId))
152+
.patch(createAdditionalDataEndpoint(partitioningId.get))
100153
.body(additionalDataPatchDTO.asJsonString)
101154

102155
val response = backend.send(request)

agent/src/test/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcherUnitTests.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ class CapturingDispatcherUnitTests extends AnyWordSpec with Matchers {
5050
kvs.map { case (k, v) => PartitionDTO(k, v) }
5151
}
5252

53-
private def createPartitionSubmit(partition: PartitioningDTO, parent: Option[PartitioningDTO] = None): PartitioningSubmitDTO = {
53+
private def createPartitionSubmit(
54+
partition: PartitioningDTO,
55+
parent: Option[PartitioningDTO] = None
56+
): PartitioningSubmitDTO = {
5457
PartitioningSubmitDTO(
5558
partitioning = partition,
5659
parentPartitioning = parent,
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package za.co.absa.atum.agent.dispatcher
2+
3+
import org.scalatest.flatspec.AnyFlatSpec
4+
import org.scalatest.matchers.should.Matchers
5+
import org.scalatest.BeforeAndAfterEach
6+
import org.mockito.Mockito._
7+
import org.mockito.ArgumentMatchers._
8+
import sttp.client3._
9+
import sttp.model.StatusCode
10+
import com.typesafe.config.Config
11+
import sttp.capabilities
12+
import za.co.absa.atum.model.dto._
13+
import za.co.absa.atum.model.envelopes.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
14+
import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax
15+
16+
class HttpDispatcherUnitTests extends AnyFlatSpec with Matchers with BeforeAndAfterEach {
17+
18+
var mockBackend: SttpBackend[Identity, sttp.capabilities.WebSockets] = _
19+
var mockConfig: Config = _
20+
val serverUrl = "http://test-server"
21+
22+
val testPartitioningDTO: Seq[PartitionDTO] = Seq(PartitionDTO("k", "v"))
23+
val testPartitioningSubmitDTO = PartitioningSubmitDTO(testPartitioningDTO, None, "author")
24+
val createdPartitioningWithId = PartitioningWithIdDTO(123L, testPartitioningDTO, "author")
25+
val measures: Seq[MeasureDTO] = Seq(MeasureDTO("m1", Seq("c1")), MeasureDTO("m2", Seq("c2")))
26+
val additionalData: Map[String, Option[AdditionalDataItemDTO]] = Map(
27+
"key1" -> Some(AdditionalDataItemDTO("val1", "author1")),
28+
"key2" -> None
29+
)
30+
31+
def encodedPartitioning(partitioning: Seq[PartitionDTO]): String =
32+
partitioning.asBase64EncodedJsonString
33+
34+
override def beforeEach(): Unit = {
35+
mockBackend = mock(classOf[SttpBackend[Identity, sttp.capabilities.WebSockets]])
36+
mockConfig = mock(classOf[Config])
37+
when(mockConfig.getString(any[String])).thenReturn(serverUrl)
38+
}
39+
40+
def dispatcher: HttpDispatcher = new HttpDispatcher(mockConfig) {
41+
override private[dispatcher] val backend = mockBackend
42+
}
43+
44+
def isGetPartitioningRequest(
45+
partitioning: Seq[PartitionDTO]
46+
): Request[Either[String, String], capabilities.WebSockets] =
47+
argThat(new org.mockito.ArgumentMatcher[Request[Either[String, String], sttp.capabilities.WebSockets]] {
48+
override def matches(req: Request[Either[String, String], sttp.capabilities.WebSockets]): Boolean =
49+
req != null &&
50+
req.method.method == "GET" &&
51+
req.uri.path.mkString.contains("partitionings") &&
52+
req.uri.params.toSeq.exists { case (k, v) =>
53+
k == "partitioning" && v == encodedPartitioning(partitioning)
54+
}
55+
})
56+
57+
def isPostPartitioningRequest: Request[Either[String, String], capabilities.WebSockets] =
58+
argThat(new org.mockito.ArgumentMatcher[Request[Either[String, String], sttp.capabilities.WebSockets]] {
59+
override def matches(req: Request[Either[String, String], sttp.capabilities.WebSockets]): Boolean =
60+
req != null && req.method.method == "POST" && req.uri.path.mkString.contains("partitionings")
61+
})
62+
63+
def isGetMeasuresRequest: Request[Either[String, String], capabilities.WebSockets] =
64+
argThat(new org.mockito.ArgumentMatcher[Request[Either[String, String], sttp.capabilities.WebSockets]] {
65+
override def matches(req: Request[Either[String, String], sttp.capabilities.WebSockets]): Boolean =
66+
req != null && req.method.method == "GET" && req.uri.path.mkString.contains("measures")
67+
})
68+
69+
def isGetAdditionalDataRequest: Request[Either[String, String], capabilities.WebSockets] =
70+
argThat(new org.mockito.ArgumentMatcher[Request[Either[String, String], sttp.capabilities.WebSockets]] {
71+
override def matches(req: Request[Either[String, String], sttp.capabilities.WebSockets]): Boolean =
72+
req != null && req.method.method == "GET" && req.uri.path.mkString.contains("additional-data")
73+
})
74+
75+
def stubGetPartitioning(partitioning: Seq[PartitionDTO], response: Response[Either[String, String]]): Unit =
76+
when(mockBackend.send(isGetPartitioningRequest(partitioning))).thenReturn(response)
77+
78+
def stubPostPartitioning(response: Response[Either[String, String]]): Unit =
79+
when(mockBackend.send(isPostPartitioningRequest)).thenReturn(response)
80+
81+
def stubGetMeasures(response: Response[Either[String, String]]): Unit =
82+
when(mockBackend.send(isGetMeasuresRequest)).thenReturn(response)
83+
84+
def stubGetAdditionalData(response: Response[Either[String, String]]): Unit =
85+
when(mockBackend.send(isGetAdditionalDataRequest)).thenReturn(response)
86+
87+
"createPartitioning" should "create and return AtumContextDTO when partitioning does not exist" in {
88+
val parentPartitioning = Seq(PartitionDTO("parentK", "parentV"))
89+
val parentPartitioningId = 999L
90+
val parentPartitioningWithId = PartitioningWithIdDTO(parentPartitioningId, parentPartitioning, "parentAuthor")
91+
val getParentResponse = Response(
92+
Right(SingleSuccessResponse(parentPartitioningWithId).asJsonString): Either[String, String],
93+
StatusCode.Ok
94+
)
95+
val getPartitioningResponse = Response(Left("Not found"): Either[String, String], StatusCode.NotFound)
96+
val postPartitioningResponse = Response(
97+
Right(SingleSuccessResponse(createdPartitioningWithId).asJsonString): Either[String, String],
98+
StatusCode.Created
99+
)
100+
val measuresResponse =
101+
Response(Right(MultiSuccessResponse(measures).asJsonString): Either[String, String], StatusCode.Ok)
102+
val additionalDataResponse =
103+
Response(Right(SingleSuccessResponse(additionalData).asJsonString): Either[String, String], StatusCode.Ok)
104+
105+
stubGetPartitioning(parentPartitioning, getParentResponse)
106+
stubGetPartitioning(testPartitioningDTO, getPartitioningResponse)
107+
stubPostPartitioning(postPartitioningResponse)
108+
stubGetMeasures(measuresResponse)
109+
stubGetAdditionalData(additionalDataResponse)
110+
111+
val dispatcherWithMocks = dispatcher
112+
val result = dispatcherWithMocks.createPartitioning(
113+
PartitioningSubmitDTO(testPartitioningDTO, Some(parentPartitioning), "author")
114+
)
115+
116+
result.partitioning shouldBe createdPartitioningWithId.partitioning
117+
result.measures shouldBe measures.toSet
118+
result.additionalData should contain key "key1"
119+
result.additionalData should contain key "key2"
120+
result.additionalData("key1") shouldBe Some("val1")
121+
result.additionalData("key2") shouldBe None
122+
}
123+
124+
it should "return AtumContextDTO for existing partitioning without creating a new one" in {
125+
val existingPartitioningWithId = PartitioningWithIdDTO(123L, testPartitioningDTO, "author")
126+
val getPartitioningResponse = Response(
127+
Right(SingleSuccessResponse(existingPartitioningWithId).asJsonString): Either[String, String],
128+
StatusCode.Ok
129+
)
130+
val measuresResponse =
131+
Response(Right(MultiSuccessResponse(measures).asJsonString): Either[String, String], StatusCode.Ok)
132+
val additionalDataResponse =
133+
Response(Right(SingleSuccessResponse(additionalData).asJsonString): Either[String, String], StatusCode.Ok)
134+
135+
stubGetPartitioning(testPartitioningDTO, getPartitioningResponse)
136+
stubGetMeasures(measuresResponse)
137+
stubGetAdditionalData(additionalDataResponse)
138+
139+
val dispatcherWithMocks = dispatcher
140+
val result = dispatcherWithMocks.createPartitioning(testPartitioningSubmitDTO)
141+
142+
result.partitioning shouldBe existingPartitioningWithId.partitioning
143+
result.measures shouldBe measures.toSet
144+
result.additionalData should contain key "key1"
145+
result.additionalData should contain key "key2"
146+
result.additionalData("key1") shouldBe Some("val1")
147+
result.additionalData("key2") shouldBe None
148+
149+
verify(mockBackend, never()).send(isPostPartitioningRequest)
150+
}
151+
152+
it should "handle empty measures and additional data for a new partitioning" in {
153+
val getPartitioningResponse = Response(Left("Not found"): Either[String, String], StatusCode.NotFound)
154+
val postPartitioningResponse = Response(
155+
Right(SingleSuccessResponse(createdPartitioningWithId).asJsonString): Either[String, String],
156+
StatusCode.Created
157+
)
158+
val measuresResponse =
159+
Response(Right(MultiSuccessResponse(Seq.empty[MeasureDTO]).asJsonString): Either[String, String], StatusCode.Ok)
160+
val additionalDataResponse = Response(
161+
Right(SingleSuccessResponse(Map.empty[String, Option[AdditionalDataItemDTO]]).asJsonString): Either[
162+
String,
163+
String
164+
],
165+
StatusCode.Ok
166+
)
167+
168+
stubGetPartitioning(testPartitioningDTO, getPartitioningResponse)
169+
stubPostPartitioning(postPartitioningResponse)
170+
stubGetMeasures(measuresResponse)
171+
stubGetAdditionalData(additionalDataResponse)
172+
173+
val dispatcherWithMocks = dispatcher
174+
val result = dispatcherWithMocks.createPartitioning(testPartitioningSubmitDTO)
175+
176+
result.partitioning shouldBe createdPartitioningWithId.partitioning
177+
result.measures shouldBe empty
178+
result.additionalData shouldBe empty
179+
}
180+
}

server/src/main/scala/za/co/absa/atum/server/api/v2/http/Endpoints.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ object Endpoints extends BaseEndpoints {
9191
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
9292
}
9393

94-
//** 1 **//
9594
val getPartitioningCheckpointEndpoint
9695
: PublicEndpoint[(Long, UUID, Boolean), ErrorResponse, SingleSuccessResponse[CheckpointV2DTO], Any] = {
9796
apiV2.get
@@ -102,7 +101,6 @@ object Endpoints extends BaseEndpoints {
102101
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
103102
}
104103

105-
//** 2 **//
106104
val getPartitioningCheckpointsEndpoint
107105
: PublicEndpoint[(Long, Int, Long, Option[String], Boolean), ErrorResponse, PaginatedResponse[
108106
CheckpointV2DTO
@@ -119,7 +117,6 @@ object Endpoints extends BaseEndpoints {
119117
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
120118
}
121119

122-
//** 3 **//
123120
val getFlowCheckpointsEndpoint
124121
: PublicEndpoint[(Long, Int, Long, Option[String], Boolean), ErrorResponse, PaginatedResponse[
125122
CheckpointWithPartitioningDTO

0 commit comments

Comments
 (0)