Skip to content

Commit 81817f6

Browse files
committed
fix: maxPerJobConcurrency was not respected because status of pending chunks was not updated in redis/local queue
Signed-off-by: dmitrii.bocharov <[email protected]>
1 parent df527ba commit 81817f6

File tree

6 files changed

+78
-1
lines changed

6 files changed

+78
-1
lines changed

backend/app/src/test/kotlin/io/tolgee/batch/AbstractBatchJobsGeneralTest.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,4 +343,19 @@ abstract class AbstractBatchJobsGeneralTest :
343343
.id.assert
344344
.isNotEqualTo(anotherJobId)
345345
}
346+
347+
@Test
348+
fun `mt job respects maxPerJobConcurrency`() {
349+
val mtJob = util.runMtJob(100)
350+
351+
// by now mt jobs have maxPerJobConcurrency = 1 and this won't probably change,
352+
// because if parallelized we will start hitting rate limits on OpenAI and even get over it.
353+
// At the beginning, we only check that the organization has availableCredits > 0 and
354+
// only when we have the result from OpenAI, we are able to calculate how many credits to charge the organization.
355+
val maxConcurrency = 1
356+
util.assertAllowedMaxPerJobConcurrency(mtJob, maxConcurrency)
357+
util.assertMaxPerJobConcurrencyIsLessThanOrEqualTo(maxConcurrency)
358+
util.waitForJobSuccess(mtJob)
359+
util.assertJobUnlocked()
360+
}
346361
}

backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import io.tolgee.batch.processors.DeleteKeysChunkProcessor
77
import io.tolgee.batch.processors.PreTranslationByTmChunkProcessor
88
import io.tolgee.batch.request.AutomationBjRequest
99
import io.tolgee.batch.request.DeleteKeysRequest
10+
import io.tolgee.batch.request.MachineTranslationRequest
1011
import io.tolgee.batch.request.PreTranslationByTmRequest
1112
import io.tolgee.batch.state.BatchJobStateProvider
1213
import io.tolgee.component.CurrentDateProvider
@@ -43,6 +44,7 @@ import org.springframework.context.ApplicationContext
4344
import org.springframework.transaction.PlatformTransactionManager
4445
import java.time.Duration
4546
import java.util.Date
47+
import kotlin.apply
4648
import kotlin.coroutines.CoroutineContext
4749

4850
class BatchJobTestUtil(
@@ -447,6 +449,49 @@ class BatchJobTestUtil(
447449
}
448450
}
449451

452+
fun runMtJob(
453+
keyCount: Int,
454+
author: UserAccount = testData.user,
455+
): BatchJob {
456+
return executeInNewTransaction(transactionManager) {
457+
batchJobService.startJob(
458+
request =
459+
MachineTranslationRequest().apply {
460+
keyIds = (1L..keyCount).map { it }
461+
targetLanguageIds =
462+
listOf(
463+
testData.projectBuilder
464+
.getLanguageByTag("cs")!!
465+
.self.id,
466+
)
467+
},
468+
project = testData.projectBuilder.self,
469+
author = author,
470+
type = BatchJobType.MACHINE_TRANSLATE,
471+
isHidden = false,
472+
)
473+
}
474+
}
475+
476+
fun assertAllowedMaxPerJobConcurrency(
477+
job: BatchJob,
478+
maxConcurrency: Int,
479+
) {
480+
batchJobService
481+
.getJobDto(job.id)
482+
.maxPerJobConcurrency.assert
483+
.isEqualTo(maxConcurrency)
484+
}
485+
486+
fun assertMaxPerJobConcurrencyIsLessThanOrEqualTo(maxConcurrency: Int) {
487+
waitFor(pollTime = 100) {
488+
batchJobConcurrentLauncher.runningJobs.assert
489+
.size()
490+
.isLessThanOrEqualTo(maxConcurrency)
491+
batchJobChunkExecutionQueue.size == 0
492+
}
493+
}
494+
450495
fun getSingleJob(): BatchJob = entityManager.createQuery("""from BatchJob""", BatchJob::class.java).singleResult
451496

452497
private val batchJobProjectLockingManager: BatchJobProjectLockingManager

backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,12 +263,16 @@ class BatchJobConcurrentLauncher(
263263

264264
private fun ExecutionQueueItem.trySetRunningState(): Boolean {
265265
return progressManager.trySetExecutionRunning(this.chunkExecutionId, this.jobId) {
266+
val maxPerJobConcurrency = batchJobService.getJobDto(this.jobId).maxPerJobConcurrency
267+
if (maxPerJobConcurrency == -1) {
268+
return@trySetExecutionRunning true
269+
}
266270
val count =
267271
it.values.count { executionState -> executionState.status == BatchJobChunkExecutionStatus.RUNNING }
268272
if (count == 0) {
269273
return@trySetExecutionRunning true
270274
}
271-
batchJobService.getJobDto(this.jobId).maxPerJobConcurrency > count
275+
maxPerJobConcurrency > count
272276
}
273277
}
274278
}

backend/data/src/main/kotlin/io/tolgee/batch/ProgressManager.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class ProgressManager(
4242
return batchJobStateProvider.updateState(batchJobId) {
4343
if (canRunFn(it)) {
4444
if (it[executionId] != null) {
45+
it[executionId]?.status = BatchJobChunkExecutionStatus.RUNNING
4546
return@updateState true
4647
}
4748
it[executionId] =

backend/data/src/main/kotlin/io/tolgee/batch/processors/MachineTranslationChunkProcessor.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ class MachineTranslationChunkProcessor(
4747
}
4848
}
4949

50+
/**
51+
* For MT jobs, it is set to 1, because it may happen that we start hitting rate limits on OpenAI,
52+
* so we may not want one job to consume them in parallel. Moreover, at the beginning, we only check, that
53+
* the organization has `availableCredits > 0` and only when we have the result from OpenAI,we are able to
54+
* calculate how many credits to charge the organization. However, if it were to be parallelized too much,
55+
* it would happen that you could get well over the limit
56+
*/
5057
override fun getMaxPerJobConcurrency(): Int {
5158
return 1
5259
}

backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJob.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ class BatchJob :
6363

6464
val chunkedTarget get() = chunkTarget(chunkSize, target)
6565

66+
/**
67+
* The maximum number of coroutines among all tolgee instances (e.g. k8s pods). The default value is -1,
68+
* which means that the concurrency is not limited.
69+
* (check `BatchJobConcurrentLauncher#ExecutionQueueItem.trySetRunningState()` for more info).
70+
*/
6671
var maxPerJobConcurrency: Int = -1
6772

6873
@Enumerated(STRING)

0 commit comments

Comments
 (0)