|
33 | 33 | import org.apache.hadoop.hive.metastore.api.LockState; |
34 | 34 | import org.apache.hadoop.hive.metastore.api.LockType; |
35 | 35 | import org.apache.thrift.TException; |
| 36 | +import org.slf4j.Logger; |
| 37 | +import org.slf4j.LoggerFactory; |
36 | 38 |
|
37 | 39 | import java.net.InetAddress; |
38 | 40 | import java.net.UnknownHostException; |
39 | | -import java.time.Duration; |
40 | 41 | import java.util.Collections; |
41 | 42 | import java.util.concurrent.Callable; |
42 | 43 |
|
|
46 | 47 | /** Hive {@link CatalogLock}. */ |
47 | 48 | public class HiveCatalogLock implements CatalogLock { |
48 | 49 |
|
| 50 | + private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogLock.class); |
| 51 | + |
49 | 52 | static final String LOCK_IDENTIFIER = "hive"; |
50 | 53 |
|
51 | 54 | private final ClientPool<IMetaStoreClient, TException> clients; |
@@ -77,43 +80,60 @@ private long lock(String database, String table) |
77 | 80 | new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database); |
78 | 81 | lockComponent.setTablename(table); |
79 | 82 | lockComponent.unsetOperationType(); |
| 83 | + |
| 84 | + long startMs = System.currentTimeMillis(); |
80 | 85 | final LockRequest lockRequest = |
81 | 86 | new LockRequest( |
82 | 87 | Collections.singletonList(lockComponent), |
83 | 88 | System.getProperty("user.name"), |
84 | 89 | InetAddress.getLocalHost().getHostName()); |
85 | 90 | LockResponse lockResponse = clients.run(client -> client.lock(lockRequest)); |
| 91 | + long lockId = lockResponse.getLockid(); |
86 | 92 |
|
87 | 93 | long nextSleep = 50; |
88 | | - long startRetry = System.currentTimeMillis(); |
89 | | - while (lockResponse.getState() == LockState.WAITING) { |
90 | | - nextSleep *= 2; |
91 | | - if (nextSleep > checkMaxSleep) { |
92 | | - nextSleep = checkMaxSleep; |
93 | | - } |
94 | | - Thread.sleep(nextSleep); |
95 | 94 |
|
96 | | - final LockResponse tempLockResponse = lockResponse; |
97 | | - lockResponse = clients.run(client -> client.checkLock(tempLockResponse.getLockid())); |
98 | | - if (System.currentTimeMillis() - startRetry > acquireTimeout) { |
99 | | - break; |
| 95 | + try { |
| 96 | + while (lockResponse.getState() == LockState.WAITING) { |
| 97 | + long elapsed = System.currentTimeMillis() - startMs; |
| 98 | + if (elapsed >= acquireTimeout) { |
| 99 | + break; |
| 100 | + } |
| 101 | + |
| 102 | + nextSleep = Math.min(nextSleep * 2, checkMaxSleep); |
| 103 | + Thread.sleep(nextSleep); |
| 104 | + |
| 105 | + lockResponse = clients.run(client -> client.checkLock(lockId)); |
| 106 | + } |
| 107 | + } finally { |
| 108 | + if (lockResponse.getState() != LockState.ACQUIRED) { |
| 109 | + // unlock if not acquired |
| 110 | + unlock(lockId); |
100 | 111 | } |
101 | 112 | } |
102 | | - long retryDuration = System.currentTimeMillis() - startRetry; |
103 | 113 |
|
104 | | - if (lockResponse.getState() != LockState.ACQUIRED) { |
105 | | - if (lockResponse.getState() == LockState.WAITING) { |
106 | | - final LockResponse tempLockResponse = lockResponse; |
107 | | - clients.execute(client -> client.unlock(tempLockResponse.getLockid())); |
108 | | - } |
109 | | - throw new RuntimeException( |
110 | | - "Acquire lock failed with time: " + Duration.ofMillis(retryDuration)); |
| 114 | + LockState lockState = lockResponse.getState(); |
| 115 | + long duration = System.currentTimeMillis() - startMs; |
| 116 | + String msg = |
| 117 | + String.format( |
| 118 | + "for table %s.%s (lockId=%d) after %dms. Final lock state: %s", |
| 119 | + database, table, lockId, duration, lockState); |
| 120 | + LOG.info("Acquire lock {}", msg); |
| 121 | + if (lockState == LockState.ACQUIRED) { |
| 122 | + return lockId; |
111 | 123 | } |
112 | | - return lockResponse.getLockid(); |
| 124 | + |
| 125 | + throw new RuntimeException("Acquire lock failed " + msg); |
113 | 126 | } |
114 | 127 |
|
115 | | - private void unlock(long lockId) throws TException, InterruptedException { |
116 | | - clients.execute(client -> client.unlock(lockId)); |
| 128 | + private void unlock(long lockId) { |
| 129 | + if (lockId <= 0) { |
| 130 | + return; |
| 131 | + } |
| 132 | + try { |
| 133 | + clients.execute(client -> client.unlock(lockId)); |
| 134 | + } catch (Exception e) { |
| 135 | + LOG.warn("Unlock failed for lockId={}", lockId, e); |
| 136 | + } |
117 | 137 | } |
118 | 138 |
|
119 | 139 | @Override |
|
0 commit comments