Skip to content

Commit 7fa6b3e

Browse files
committed
#714 Release all locks when Pramen job exits abnormally.
1 parent 1a7f048 commit 7fa6b3e

File tree

8 files changed

+94
-7
lines changed

8 files changed

+94
-7
lines changed

pramen/api/src/main/scala/za/co/absa/pramen/api/lock/TokenLock.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,6 @@ trait TokenLock extends AutoCloseable {
4141
def tryAcquire(): Boolean
4242

4343
def release(): Unit
44+
45+
def token: String
4446
}

pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockAllow.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package za.co.absa.pramen.core.lock
1818

1919
import za.co.absa.pramen.api.lock.TokenLock
2020

21-
class TokenLockAllow extends TokenLock {
21+
class TokenLockAllow(override val token: String) extends TokenLock {
2222
override def tryAcquire(): Boolean = true
2323

2424
override def release(): Unit = {}

pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import scala.util.control.NonFatal
3232
*
3333
* @param token the unique identifier for the lock (across multiple JVM processes and Spark jobs).
3434
*/
35-
abstract class TokenLockBase(token: String) extends TokenLock {
35+
abstract class TokenLockBase(override val token: String) extends TokenLock {
3636
import TokenLockBase._
3737

3838
private val log = LoggerFactory.getLogger(this.getClass)
@@ -65,7 +65,7 @@ abstract class TokenLockBase(token: String) extends TokenLock {
6565
* Note: Unlike standard lock implementations, this returns false even when the current instance already owns the lock.
6666
*/
6767
override def tryAcquire(): Boolean = synchronized {
68-
if (lockAcquired) {
68+
val isAcquired = if (lockAcquired) {
6969
false
7070
} else {
7171
if (tryAcquireGuardLock(lockAcquireRetries, 0)) {
@@ -79,6 +79,12 @@ abstract class TokenLockBase(token: String) extends TokenLock {
7979
false
8080
}
8181
}
82+
83+
if (isAcquired) {
84+
TokenLockRegistry.registerLock(this)
85+
}
86+
87+
isAcquired
8288
}
8389

8490
override def release(): Unit = {
@@ -96,6 +102,7 @@ abstract class TokenLockBase(token: String) extends TokenLock {
96102
watcherThreadOpt = None
97103
releaseGuardLock()
98104
JvmUtils.safeRemoveShutdownHook(shutdownHook)
105+
TokenLockRegistry.unregisterLock(this)
99106
log.info(s"Lock released: '$escapedToken'.")
100107
}
101108
}
@@ -104,7 +111,7 @@ abstract class TokenLockBase(token: String) extends TokenLock {
104111
release()
105112
}
106113

107-
protected def isAcquired: Boolean = synchronized {
114+
private[core] def isAcquired: Boolean = synchronized {
108115
lockAcquired
109116
}
110117

pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryAllow.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@ import za.co.absa.pramen.api.lock.{TokenLock, TokenLockFactory}
2020

2121
class TokenLockFactoryAllow extends TokenLockFactory {
2222
override def getLock(token: String): TokenLock = {
23-
new TokenLockAllow
23+
new TokenLockAllow(token)
2424
}
2525
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2022 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.pramen.core.lock
18+
19+
import org.slf4j.LoggerFactory
20+
import za.co.absa.pramen.api.lock.TokenLock
21+
22+
import java.util.concurrent.locks.ReentrantLock
23+
import scala.collection.mutable.ListBuffer
24+
import scala.util.control.NonFatal
25+
26+
object TokenLockRegistry {
27+
private val log = LoggerFactory.getLogger(this.getClass)
28+
private var currentLocks = new ListBuffer[TokenLock]
29+
private val registryLock = new ReentrantLock()
30+
31+
private[core] def registerLock(lock: TokenLock): Unit = {
32+
registryLock.lock()
33+
try {
34+
currentLocks += lock
35+
} finally {
36+
registryLock.unlock()
37+
}
38+
}
39+
40+
private[core] def unregisterLock(lock: TokenLock): Unit = {
41+
registryLock.lock()
42+
try {
43+
currentLocks -= lock
44+
} finally {
45+
registryLock.unlock()
46+
}
47+
}
48+
49+
private[core] def releaseAllLocks(): Unit = {
50+
registryLock.lock()
51+
try {
52+
// Making a copy because the `l.release()` can call `unregisterLock()` modifying
53+
// the mutable list buffer while iterating.
54+
val currentListCopy = currentLocks.toList
55+
currentListCopy.foreach { l =>
56+
try {
57+
l.release()
58+
} catch {
59+
case NonFatal(ex) => log.warn(s"Unable to release the lock: ${l.token}")
60+
}
61+
}
62+
currentLocks.clear()
63+
} finally {
64+
registryLock.unlock()
65+
}
66+
}
67+
68+
}

pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import za.co.absa.pramen.core.app.config.RuntimeConfig.{DRY_RUN, EMAIL_IF_NO_CHA
2626
import za.co.absa.pramen.core.app.config.{HookConfig, RuntimeConfig}
2727
import za.co.absa.pramen.core.config.Keys.{GOOD_THROUGHPUT_RPS, WARN_THROUGHPUT_RPS}
2828
import za.co.absa.pramen.core.exceptions.OsSignalException
29+
import za.co.absa.pramen.core.lock.TokenLockRegistry
2930
import za.co.absa.pramen.core.metastore.peristence.{TransientJobManager, TransientTableManager}
3031
import za.co.absa.pramen.core.notify.PipelineNotificationTargetFactory
3132
import za.co.absa.pramen.core.notify.pipeline.{PipelineNotification, PipelineNotificationEmail}
@@ -209,6 +210,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
209210
runCustomShutdownHook()
210211
removeSignalHandlers()
211212
sendNotificationEmail()
213+
TokenLockRegistry.releaseAllLocks()
212214
}
213215

214216
private lazy val shutdownHook = new Thread() {

pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/lock/TokenLockMock.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package za.co.absa.pramen.core.mocks.lock
1818

1919
import za.co.absa.pramen.api.lock.TokenLock
2020

21-
class TokenLockMock extends TokenLock {
21+
class TokenLockMock(override val token: String = "mock") extends TokenLock {
2222
var acquired = false
2323

2424
override def tryAcquire(): Boolean = this.synchronized {

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.scalatest.wordspec.AnyWordSpec
2121
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
2222
import za.co.absa.pramen.api.lock.TokenLock
2323
import za.co.absa.pramen.core.fixtures.RelationalDbFixture
24-
import za.co.absa.pramen.core.lock.TokenLockJdbc
24+
import za.co.absa.pramen.core.lock.{TokenLockBase, TokenLockJdbc, TokenLockRegistry}
2525
import za.co.absa.pramen.core.rdb.{PramenDb, RdbJdbc}
2626
import za.co.absa.pramen.core.reader.model.JdbcConfig
2727
import za.co.absa.pramen.core.utils.UsingUtils
@@ -98,6 +98,14 @@ class TokenLockJdbcSuite extends AnyWordSpec with RelationalDbFixture with Befor
9898
lock1.release()
9999
}
100100
}
101+
102+
"lock registry releases all locks" in {
103+
val lock1 = getLock("token1")
104+
assert(lock1.tryAcquire())
105+
106+
TokenLockRegistry.releaseAllLocks()
107+
assert(!lock1.asInstanceOf[TokenLockBase].isAcquired)
108+
}
101109
}
102110

103111
private def getLock(token: String): TokenLock = {

0 commit comments

Comments
 (0)