diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5850fca --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +target/ +redis-3.2.6/ diff --git a/pom.xml b/pom.xml index c84f0f5..1fcc757 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,13 @@ ${jedis-version} + + + javax.annotation + javax.annotation-api + 1.3.2 + + ch.qos.logback @@ -89,6 +96,14 @@ test + + + org.mockito + mockito-core + 3.12.4 + test + + diff --git a/redis-3.2.6.tar.gz b/redis-3.2.6.tar.gz new file mode 100644 index 0000000..b7ff762 Binary files /dev/null and b/redis-3.2.6.tar.gz differ diff --git a/src/main/java/com/baidu/fsg/dlock/DistributedReadWriteLock.java b/src/main/java/com/baidu/fsg/dlock/DistributedReadWriteLock.java new file mode 100644 index 0000000..6351db3 --- /dev/null +++ b/src/main/java/com/baidu/fsg/dlock/DistributedReadWriteLock.java @@ -0,0 +1,444 @@ +/* + * Copyright (c) 2017 Baidu, Inc. All Rights Reserve. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.baidu.fsg.dlock; + +import com.baidu.fsg.dlock.domain.DLockConfig; +import com.baidu.fsg.dlock.processor.DLockProcessor; +import com.baidu.fsg.dlock.utils.NetUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReadWriteLock; + +/** + * A distributed, reentrant read-write lock implementation based on Redis. + * This implementation is adapted from the original {@link DistributedReentrantLock} + * and the JDK's {@code ReentrantReadWriteLock}. + * + * @author Jules (AI Assistant) + * @author yutianbao + */ +public class DistributedReadWriteLock implements ReadWriteLock { + + private final ReadLock readLock; + private final WriteLock writeLock; + + final DLockConfig lockConfig; + final DLockProcessor lockProcessor; + final Sync sync; + + // Lua scripts loaded from resources + private static final String SCRIPT_ACQUIRE_READ = loadScript("acquire_read.lua"); + private static final String SCRIPT_RELEASE_READ = loadScript("release_read.lua"); + private static final String SCRIPT_ACQUIRE_WRITE = loadScript("acquire_write.lua"); + private static final String SCRIPT_RELEASE_WRITE = loadScript("release_write.lua"); + + public DistributedReadWriteLock(DLockConfig lockConfig, DLockProcessor lockProcessor) { + this.lockConfig = lockConfig; + this.lockProcessor = lockProcessor; + this.sync = new NonfairSync(); + this.readLock = new ReadLock(this); + this.writeLock = new WriteLock(this); + } + + private static String loadScript(String scriptName) { + try (InputStream is = DistributedReadWriteLock.class.getResourceAsStream("lua/" + scriptName)) { + if (is == null) throw new IOException("Script not found: " + scriptName); + byte[] bytes = new byte[is.available()]; + is.read(bytes); + return new String(bytes, StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException("Failed to load Lua script: " + scriptName, e); + } + } + + @Override + public Lock readLock() { + return readLock; + } + + @Override + public Lock writeLock() { + return writeLock; + } + + static final class Node { + final AtomicReference prev = new AtomicReference<>(); + final AtomicReference next = new AtomicReference<>(); + final Thread thread; + final boolean isShared; // true for read, false for write + Node(Thread thread, boolean isShared) { + this.thread = thread; + this.isShared = isShared; + } + } + + abstract class Sync { + private final AtomicReference head = new AtomicReference<>(); + private final AtomicReference tail = new AtomicReference<>(); + private final AtomicReference retryLockRef = new AtomicReference<>(); + private final AtomicReference expandLockRef = new AtomicReference<>(); + + private Thread exclusiveOwnerThread; + private int writeHoldCount; + + private final ThreadLocal readHoldCount = new ThreadLocal<>(); + private volatile int readLockCount; + + protected final boolean tryAcquireWrite() { + Thread current = Thread.currentThread(); + if (current == this.exclusiveOwnerThread) { + writeHoldCount++; + return true; + } + + if (this.exclusiveOwnerThread != null || this.readLockCount > 0) { + return false; + } + + Object res = lockProcessor.eval(SCRIPT_ACQUIRE_WRITE, + Collections.singletonList(lockConfig.getLockUniqueKey()), + Arrays.asList(generateLocker(), String.valueOf(lockConfig.getMillisLease()))); + + if (res instanceof Number && ((Number) res).longValue() == 1) { + this.exclusiveOwnerThread = current; + this.writeHoldCount = 1; + startThreads(); + return true; + } + return false; + } + + protected final boolean tryReleaseWrite() { + if (Thread.currentThread() != this.exclusiveOwnerThread) { + throw new IllegalMonitorStateException(); + } + writeHoldCount--; + if (writeHoldCount > 0) { + return false; // Still held + } + + Object res = lockProcessor.eval(SCRIPT_RELEASE_WRITE, + Collections.singletonList(lockConfig.getLockUniqueKey()), + Collections.singletonList(generateLocker())); + + if (res != null) { + this.exclusiveOwnerThread = null; + shutdownThreads(); + unparkSuccessors(); + return true; // Released + } + return false; + } + + protected final int tryAcquireRead() { + Thread current = Thread.currentThread(); + if (this.exclusiveOwnerThread != null && this.exclusiveOwnerThread != current) { + return -1; // Held by writer + } + if (this.exclusiveOwnerThread == current) { + Integer rhc = readHoldCount.get(); + readHoldCount.set(rhc == null ? 1 : rhc + 1); + readLockCount++; + return 1; + } + Integer rhc = readHoldCount.get(); + if (rhc != null && rhc > 0) { + readHoldCount.set(rhc + 1); + readLockCount++; + return 1; + } + + Object res = lockProcessor.eval(SCRIPT_ACQUIRE_READ, + Collections.singletonList(lockConfig.getLockUniqueKey()), + Arrays.asList(generateLocker(), String.valueOf(lockConfig.getMillisLease()))); + + if (res instanceof Number && ((Number) res).longValue() > 0) { + readHoldCount.set(1); + readLockCount++; + startThreads(); + return 1; + } + return -1; + } + + protected final boolean tryReleaseRead() { + Thread current = Thread.currentThread(); + Integer rhc = readHoldCount.get(); + if (rhc == null || rhc == 0) { + throw new IllegalMonitorStateException(); + } + + rhc--; + readHoldCount.set(rhc); + readLockCount--; + + if (rhc > 0) { + return false; + } + if (exclusiveOwnerThread == current) { + return false; + } + + Object res = lockProcessor.eval(SCRIPT_RELEASE_READ, + Collections.singletonList(lockConfig.getLockUniqueKey()), + Collections.singletonList(generateLocker())); + + if (res instanceof Number && ((Number) res).longValue() == 0) { + shutdownThreads(); + unparkSuccessors(); + } + return true; + } + + final void acquireQueued(final Node node) { + for (;;) { + final Node p = node.prev.get(); + if (p == head.get()) { + if (node.isShared) { + if (tryAcquireRead() >= 0) { + setHeadAndPropagate(node); + p.next.set(null); + break; + } + } else { + if (tryAcquireWrite()) { + head.set(node); + p.next.set(null); + break; + } + } + } + if (shouldParkAfterFailedAcquire(p, node)) + LockSupport.park(this); + } + } + + private boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + startRetryThread(); + return true; + } + + private void setHeadAndPropagate(Node node) { + head.set(node); + unparkSuccessors(); + } + + private void unparkSuccessors() { + Node h = head.get(); + if (h != null) { + Node s = h.next.get(); + // If the next waiter is a reader, unpark it. That thread, upon acquiring the lock, + // will then unpark the next reader, creating a cascading wake-up. + // If the next is a writer, just unpark it. + if (s != null && s.thread != null) { + LockSupport.unpark(s.thread); + } + } + } + + private Node addWaiter(boolean isShared) { + Node node = new Node(Thread.currentThread(), isShared); + Node pred = tail.get(); + if (pred != null) { + node.prev.set(pred); + if (tail.compareAndSet(pred, node)) { + pred.next.set(node); + return node; + } + } + enq(node); + return node; + } + + private Node enq(final Node node) { + for (;;) { + Node t = tail.get(); + if (t == null) { + Node h = new Node(null, false); + h.next.set(node); + node.prev.set(h); + if (head.compareAndSet(null, h)) { + tail.set(node); + return h; + } + } else { + node.prev.set(t); + if (tail.compareAndSet(t, node)) { + t.next.set(node); + return t; + } + } + } + } + + abstract class LockThread extends Thread { + final Object sync = new Object(); + final int delay; + final int retryInterval; + final AtomicInteger startState = new AtomicInteger(0); + private boolean shouldShutdown = false; + + LockThread(String name, int delay, int retryInterval) { + setDaemon(true); + this.delay = delay; + this.retryInterval = retryInterval; + setName(name + "-" + getId()); + } + + @Override + public void run() { + while (!shouldShutdown) { + synchronized (sync) { + try { + execute(); + sync.wait(retryInterval); + } catch (InterruptedException e) { + shouldShutdown = true; + } + } + } + beforeShutdown(); + } + + abstract void execute() throws InterruptedException; + void beforeShutdown() {} + } + + private class ExpandLockLeaseThread extends LockThread { + ExpandLockLeaseThread(int delay, int retryInterval) { + super("RW-ExpandLockLeaseThread", delay, retryInterval); + } + @Override + void execute() { + if (exclusiveOwnerThread != null) { + lockProcessor.eval(SCRIPT_ACQUIRE_WRITE, + Collections.singletonList(lockConfig.getLockUniqueKey()), + Arrays.asList(generateLocker(), String.valueOf(lockConfig.getMillisLease()))); + } else if (readLockCount > 0) { + lockProcessor.eval(SCRIPT_ACQUIRE_READ, + Collections.singletonList(lockConfig.getLockUniqueKey()), + Arrays.asList(generateLocker(), String.valueOf(lockConfig.getMillisLease()))); + } + } + @Override + void beforeShutdown() { + expandLockRef.compareAndSet(this, null); + } + } + + private class RetryLockThread extends LockThread { + RetryLockThread(int delay, int retryInterval) { + super("RW-RetryLockThread", delay, retryInterval); + } + @Override + void execute() { + if (lockProcessor.isLockFree(lockConfig.getLockUniqueKey())) { + unparkSuccessors(); + } + } + @Override + void beforeShutdown() { + retryLockRef.compareAndSet(this, null); + } + } + + private void startThreads() { + startExpandLockLeaseThread(); + shutdownRetryThread(); + } + + private void shutdownThreads() { + shutdownExpandThread(); + } + + private void startExpandLockLeaseThread() { + ExpandLockLeaseThread t = expandLockRef.get(); + if (t == null || t.getState() == Thread.State.TERMINATED) { + int retryInterval = (int) (lockConfig.getMillisLease() * 0.75); + expandLockRef.compareAndSet(t, new ExpandLockLeaseThread(1, retryInterval)); + t = expandLockRef.get(); + if (t.startState.compareAndSet(0, 1)) { + t.start(); + } + } + } + + private void shutdownExpandThread() { + ExpandLockLeaseThread t = expandLockRef.get(); + if (t != null && t.isAlive()) { + t.interrupt(); + } + } + + private void startRetryThread() { + RetryLockThread t = retryLockRef.get(); + if (t == null || t.getState() == Thread.State.TERMINATED) { + retryLockRef.compareAndSet(t, new RetryLockThread((int) (lockConfig.getMillisLease() / 10), (int) (lockConfig.getMillisLease() / 6))); + t = retryLockRef.get(); + if (t.startState.compareAndSet(0, 1)) { + t.start(); + } + } + } + + private void shutdownRetryThread() { + RetryLockThread t = retryLockRef.get(); + if (t != null && t.isAlive()) { + t.interrupt(); + } + } + + private String generateLocker() { + return NetUtils.getLocalAddress() + "-" + Thread.currentThread().getId(); + } + } + + final class NonfairSync extends Sync { + // Add non-fair specific logic here if any in the future + } + + public class ReadLock implements Lock { + private final Sync sync; + public ReadLock(DistributedReadWriteLock lock) { this.sync = lock.sync; } + @Override public void lock() { sync.acquireQueued(sync.addWaiter(true)); } + @Override public boolean tryLock() { return sync.tryAcquireRead() >= 0; } + @Override public void unlock() { sync.tryReleaseRead(); } + @Override public Condition newCondition() { throw new UnsupportedOperationException(); } + @Override public void lockInterruptibly() { throw new UnsupportedOperationException(); } + @Override public boolean tryLock(long time, TimeUnit unit) { throw new UnsupportedOperationException(); } + } + + public class WriteLock implements Lock { + private final Sync sync; + public WriteLock(DistributedReadWriteLock lock) { this.sync = lock.sync; } + @Override public void lock() { sync.acquireQueued(sync.addWaiter(false)); } + @Override public boolean tryLock() { return sync.tryAcquireWrite(); } + @Override public void unlock() { sync.tryReleaseWrite(); } + @Override public Condition newCondition() { throw new UnsupportedOperationException(); } + @Override public void lockInterruptibly() { throw new UnsupportedOperationException(); } + @Override public boolean tryLock(long time, TimeUnit unit) { throw new UnsupportedOperationException(); } + } +} diff --git a/src/main/java/com/baidu/fsg/dlock/processor/DLockProcessor.java b/src/main/java/com/baidu/fsg/dlock/processor/DLockProcessor.java index 208b7ce..5afc74e 100644 --- a/src/main/java/com/baidu/fsg/dlock/processor/DLockProcessor.java +++ b/src/main/java/com/baidu/fsg/dlock/processor/DLockProcessor.java @@ -1,96 +1,107 @@ -/* - * Copyright (c) 2017 Baidu, Inc. All Rights Reserve. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.baidu.fsg.dlock.processor; - -import com.baidu.fsg.dlock.domain.DLockConfig; -import com.baidu.fsg.dlock.domain.DLockEntity; - -/** - * The distributed lock processor interface for retrieving and updating lock status - * to persistent system(such as Redis/DB). - * - * @author chenguoqing - */ -public interface DLockProcessor { - - /** - * Retrieve the {@link DLockEntity} by the unique key - * - * @param uniqueKey - * @return - */ - DLockEntity load(String uniqueKey); - - /** - * The method implements the "lock" syntax
- *
  • DB
  • - * The implementations should update the (lockStatus,locker,lockTime) with - * DB record lock under the condition (lockStatus=0)

    - * - *

  • Redis
  • - * The implementations should set unique key, value(locker), and expire time - * - * @param newLock - * @param lockConfig - * @throw OptimisticLockingFailureException - */ - void updateForLock(DLockEntity newLock, DLockConfig lockConfig); - - /** - * The method implements the "lock" syntax with existing expire lock.
    - *
  • DB
  • - * The implementations should update - * (lockStatus,locker,lockTime) with DB line lock under the condition (lockStatus=1 && locker==expireLock.locker)

    - * - *

  • Redis
  • - * The implementation is unsupported because of the Redis expire mechanism. - * - * @param expireLock - * @param dbLock - * @param lockConfig - */ - void updateForLockWithExpire(DLockEntity expireLock, DLockEntity dbLock, DLockConfig lockConfig); - - /** - * Expand the lock expire time. It should be protected with DB line lock, it only modify the lockTime field. - * - * @param newLeaseLock - * @param lockConfig - */ - void expandLockExpire(DLockEntity newLeaseLock, DLockConfig lockConfig); - - /** - * The method implements the "unlock" syntax.
    - * - *
  • DB
  • - * The implementations should should reset the lock status to INITIAL, and clear locker, - * lockTime fields with optimistic lock condition(lockStatus,locker). The operation should be protected with - * DB line lock.

    - * - *
  • Redis
  • - * The implementation should remove key with the right value(locker). - */ - void updateForUnlock(DLockEntity currentLock, DLockConfig lockConfig); - - /** - * Whether the lock is free(released or expired) - * - * @param uniqueKey key - * @return true if lock is released - */ - boolean isLockFree(String uniqueKey); - -} +/* + * Copyright (c) 2017 Baidu, Inc. All Rights Reserve. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.baidu.fsg.dlock.processor; + +import com.baidu.fsg.dlock.domain.DLockConfig; +import com.baidu.fsg.dlock.domain.DLockEntity; + +import java.util.List; + +/** + * The distributed lock processor interface for retrieving and updating lock status + * to persistent system(such as Redis/DB). + * + * @author chenguoqing + */ +public interface DLockProcessor { + + /** + * Retrieve the {@link DLockEntity} by the unique key + * + * @param uniqueKey + * @return + */ + DLockEntity load(String uniqueKey); + + /** + * The method implements the "lock" syntax
    + *
  • DB
  • + * The implementations should update the (lockStatus,locker,lockTime) with + * DB record lock under the condition (lockStatus=0)

    + * + *

  • Redis
  • + * The implementations should set unique key, value(locker), and expire time + * + * @param newLock + * @param lockConfig + * @throw OptimisticLockingFailureException + */ + void updateForLock(DLockEntity newLock, DLockConfig lockConfig); + + /** + * The method implements the "lock" syntax with existing expire lock.
    + *
  • DB
  • + * The implementations should update + * (lockStatus,locker,lockTime) with DB line lock under the condition (lockStatus=1 && locker==expireLock.locker)

    + * + *

  • Redis
  • + * The implementation is unsupported because of the Redis expire mechanism. + * + * @param expireLock + * @param dbLock + * @param lockConfig + */ + void updateForLockWithExpire(DLockEntity expireLock, DLockEntity dbLock, DLockConfig lockConfig); + + /** + * Expand the lock expire time. It should be protected with DB line lock, it only modify the lockTime field. + * + * @param newLeaseLock + * @param lockConfig + */ + void expandLockExpire(DLockEntity newLeaseLock, DLockConfig lockConfig); + + /** + * The method implements the "unlock" syntax.
    + * + *
  • DB
  • + * The implementations should should reset the lock status to INITIAL, and clear locker, + * lockTime fields with optimistic lock condition(lockStatus,locker). The operation should be protected with + * DB line lock.

    + * + *
  • Redis
  • + * The implementation should remove key with the right value(locker). + */ + void updateForUnlock(DLockEntity currentLock, DLockConfig lockConfig); + + /** + * Whether the lock is free(released or expired) + * + * @param uniqueKey key + * @return true if lock is released + */ + boolean isLockFree(String uniqueKey); + + /** + * Evaluate lua script. + * + * @param script the script content + * @param keys the keys + * @param args the args + * @return the result of the script + */ + Object eval(String script, List keys, List args); +} diff --git a/src/main/java/com/baidu/fsg/dlock/processor/impl/RedisLockProcessor.java b/src/main/java/com/baidu/fsg/dlock/processor/impl/RedisLockProcessor.java index 88dd0b9..52cdda5 100644 --- a/src/main/java/com/baidu/fsg/dlock/processor/impl/RedisLockProcessor.java +++ b/src/main/java/com/baidu/fsg/dlock/processor/impl/RedisLockProcessor.java @@ -1,203 +1,214 @@ -/* - * Copyright (c) 2017 Baidu, Inc. All Rights Reserve. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.baidu.fsg.dlock.processor.impl; - -import java.util.Arrays; - -import javax.annotation.Resource; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -import com.baidu.fsg.dlock.domain.DLockConfig; -import com.baidu.fsg.dlock.domain.DLockEntity; -import com.baidu.fsg.dlock.domain.DLockStatus; -import com.baidu.fsg.dlock.exception.OptimisticLockingException; -import com.baidu.fsg.dlock.exception.RedisProcessException; -import com.baidu.fsg.dlock.jedis.JedisClient; -import com.baidu.fsg.dlock.processor.DLockProcessor; - -/** - * The implement of {@link DLockProcessor}. Command set(with NX & PX) & Lua script is used for atomic operations. - * Redis version must be greater than 2.6.12

    - * - * DataModel:
    - * Key: LockUniqueKey, Value: Locker(IP + ThreadID), Expire: lease duration(ms). - * - * @author yutianbao - */ -@Service -public class RedisLockProcessor implements DLockProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockProcessor.class); - - /** - * Redis command & result code constant - */ - private static final String SET_ARG_NOT_EXIST = "NX"; - private static final String SET_ARG_EXPIRE = "PX"; - private static final String RES_OK = "OK"; - - @Resource - private JedisClient jedisClient; - - /** - * Load by unique key. For redis implement, you can find locker & status from the result entity. - * - * @param uniqueKey key - * @throws RedisProcessException if catch any exception from {@link redis.clients.jedis.Jedis} - */ - @Override - public DLockEntity load(String uniqueKey) throws RedisProcessException { - // GET command - String locker; - try { - locker = jedisClient.get(uniqueKey); - } catch (Exception e) { - LOGGER.warn("Exception occurred by GET command for key:" + uniqueKey, e); - throw new RedisProcessException("Exception occurred by GET command for key:" + uniqueKey, e); - } - - if (locker == null) { - return null; - } - - // build entity - DLockEntity lockEntity = new DLockEntity(); - lockEntity.setLocker(locker); - lockEntity.setLockStatus(DLockStatus.PROCESSING); - - return lockEntity; - } - - /** - * Update for lock using redis SET(NX, PX) command. - * - * @param newLock with locker in it - * @param lockConfig - * @throws RedisProcessException Redis command execute exception - * @throws OptimisticLockingException the lock is hold by the other request. - */ - @Override - public void updateForLock(DLockEntity newLock, DLockConfig lockConfig) - throws RedisProcessException, OptimisticLockingException { - // SET(NX, PX) command - String lockRes; - try { - lockRes = jedisClient.set(lockConfig.getLockUniqueKey(), newLock.getLocker(), SET_ARG_NOT_EXIST, - SET_ARG_EXPIRE, lockConfig.getMillisLease()); - - } catch (Exception e) { - LOGGER.warn("Exception occurred by SET(NX, PX) command for key:" + lockConfig.getLockUniqueKey(), e); - throw new RedisProcessException( - "Exception occurred by SET(NX, PX) command for key:" + lockConfig.getLockUniqueKey(), e); - } - - if (!RES_OK.equals(lockRes)) { - LOGGER.warn("Fail to get lock for key:{} ,locker={}", lockConfig.getLockUniqueKey(), newLock.getLocker()); - throw new OptimisticLockingException( - "Fail to get lock for key:" + lockConfig.getLockUniqueKey() + " ,locker=" + newLock.getLocker()); - } - } - - /** - * The redis expire mechanism guaranteed the expired key is removed automatic. - * It is not necessary to check condition(status=1 && expire=true) - */ - @Override - public void updateForLockWithExpire(DLockEntity expireLock, DLockEntity dbLock, DLockConfig lockConfig) { - throw new UnsupportedOperationException("updateForLockWithExpire is not supported"); - } - - /** - * Extend lease for lock with lua script. - * - * @param leaseLock with locker in it - * @param lockConfig - * @throws RedisProcessException if catch any exception from {@link redis.clients.jedis.Jedis} - * @throws OptimisticLockingException if the lock is released or be hold by another one. - */ - @Override - public void expandLockExpire(DLockEntity leaseLock, DLockConfig lockConfig) - throws RedisProcessException, OptimisticLockingException { - // Expire if key is existed and equal with the specified value(locker). - String leaseScript = "if (redis.call('get', KEYS[1]) == ARGV[1]) then " - + " return redis.call('pexpire', KEYS[1], ARGV[2]); " - + "else" - + " return nil; " - + "end; "; - - Object leaseRes; - try { - leaseRes = jedisClient.eval(leaseScript, Arrays.asList(lockConfig.getLockUniqueKey()), - Arrays.asList(leaseLock.getLocker(), lockConfig.getMillisLease() + "")); - } catch (Exception e) { - LOGGER.warn("Exception occurred by ExpandLease lua script for key:" + lockConfig.getLockUniqueKey(), e); - throw new RedisProcessException( - "Exception occurred by ExpandLease lua script for key:" + lockConfig.getLockUniqueKey(), e); - } - - // null means lua return nil (the lock is released or be hold by the other request) - if (leaseRes == null) { - LOGGER.warn("Fail to lease for key:{} ,locker={}", lockConfig.getLockUniqueKey(), leaseLock.getLocker()); - throw new OptimisticLockingException( - "Fail to lease for key:" + lockConfig.getLockUniqueKey() + " ,locker=" + leaseLock.getLocker()); - } - } - - /** - * Release lock using lua script. - * - * @param currentLock with locker in it - * @param lockConfig - * @throws RedisProcessException if catch any exception from {@link redis.clients.jedis.Jedis} - * @throws OptimisticLockingException if the lock is released or be hold by another one. - */ - @Override - public void updateForUnlock(DLockEntity currentLock, DLockConfig lockConfig) - throws RedisProcessException, OptimisticLockingException { - // Delete if key is existed and equal with the specified value(locker). - String unlockScript = "if (redis.call('get', KEYS[1]) == ARGV[1]) then " - + " return redis.call('del', KEYS[1]); " - + "else " - + " return nil; " - + "end;"; - - Object unlockRes; - try { - unlockRes = jedisClient.eval(unlockScript, Arrays.asList(lockConfig.getLockUniqueKey()), - Arrays.asList(currentLock.getLocker())); - } catch (Exception e) { - LOGGER.warn("Exception occurred by Unlock lua script for key:" + lockConfig.getLockUniqueKey(), e); - throw new RedisProcessException( - "Exception occurred by Unlock lua script for key:" + lockConfig.getLockUniqueKey(), e); - } - - // null means lua return nil (the lock is released or be hold by the other request) - if (unlockRes == null) { - LOGGER.warn("Fail to unlock for key:{} ,locker={}", lockConfig.getLockUniqueKey(), currentLock.getLocker()); - throw new OptimisticLockingException("Fail to unlock for key:" + lockConfig.getLockUniqueKey() - + ",locker=" + currentLock.getLocker()); - } - } - - @Override - public boolean isLockFree(String uniqueKey) { - DLockEntity locked = this.load(uniqueKey); - return locked == null; - } - -} +/* + * Copyright (c) 2017 Baidu, Inc. All Rights Reserve. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.baidu.fsg.dlock.processor.impl; + +import java.util.Arrays; +import java.util.List; + +import javax.annotation.Resource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import com.baidu.fsg.dlock.domain.DLockConfig; +import com.baidu.fsg.dlock.domain.DLockEntity; +import com.baidu.fsg.dlock.domain.DLockStatus; +import com.baidu.fsg.dlock.exception.OptimisticLockingException; +import com.baidu.fsg.dlock.exception.RedisProcessException; +import com.baidu.fsg.dlock.jedis.JedisClient; +import com.baidu.fsg.dlock.processor.DLockProcessor; + +/** + * The implement of {@link DLockProcessor}. Command set(with NX & PX) & Lua script is used for atomic operations. + * Redis version must be greater than 2.6.12

    + * + * DataModel:
    + * Key: LockUniqueKey, Value: Locker(IP + ThreadID), Expire: lease duration(ms). + * + * @author yutianbao + */ +@Service +public class RedisLockProcessor implements DLockProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(RedisLockProcessor.class); + + /** + * Redis command & result code constant + */ + private static final String SET_ARG_NOT_EXIST = "NX"; + private static final String SET_ARG_EXPIRE = "PX"; + private static final String RES_OK = "OK"; + + @Resource + private JedisClient jedisClient; + + /** + * Load by unique key. For redis implement, you can find locker & status from the result entity. + * + * @param uniqueKey key + * @throws RedisProcessException if catch any exception from {@link redis.clients.jedis.Jedis} + */ + @Override + public DLockEntity load(String uniqueKey) throws RedisProcessException { + // GET command + String locker; + try { + locker = jedisClient.get(uniqueKey); + } catch (Exception e) { + LOGGER.warn("Exception occurred by GET command for key:" + uniqueKey, e); + throw new RedisProcessException("Exception occurred by GET command for key:" + uniqueKey, e); + } + + if (locker == null) { + return null; + } + + // build entity + DLockEntity lockEntity = new DLockEntity(); + lockEntity.setLocker(locker); + lockEntity.setLockStatus(DLockStatus.PROCESSING); + + return lockEntity; + } + + /** + * Update for lock using redis SET(NX, PX) command. + * + * @param newLock with locker in it + * @param lockConfig + * @throws RedisProcessException Redis command execute exception + * @throws OptimisticLockingException the lock is hold by the other request. + */ + @Override + public void updateForLock(DLockEntity newLock, DLockConfig lockConfig) + throws RedisProcessException, OptimisticLockingException { + // SET(NX, PX) command + String lockRes; + try { + lockRes = jedisClient.set(lockConfig.getLockUniqueKey(), newLock.getLocker(), SET_ARG_NOT_EXIST, + SET_ARG_EXPIRE, lockConfig.getMillisLease()); + + } catch (Exception e) { + LOGGER.warn("Exception occurred by SET(NX, PX) command for key:" + lockConfig.getLockUniqueKey(), e); + throw new RedisProcessException( + "Exception occurred by SET(NX, PX) command for key:" + lockConfig.getLockUniqueKey(), e); + } + + if (!RES_OK.equals(lockRes)) { + LOGGER.warn("Fail to get lock for key:{} ,locker={}", lockConfig.getLockUniqueKey(), newLock.getLocker()); + throw new OptimisticLockingException( + "Fail to get lock for key:" + lockConfig.getLockUniqueKey() + " ,locker=" + newLock.getLocker()); + } + } + + /** + * The redis expire mechanism guaranteed the expired key is removed automatic. + * It is not necessary to check condition(status=1 && expire=true) + */ + @Override + public void updateForLockWithExpire(DLockEntity expireLock, DLockEntity dbLock, DLockConfig lockConfig) { + throw new UnsupportedOperationException("updateForLockWithExpire is not supported"); + } + + /** + * Extend lease for lock with lua script. + * + * @param leaseLock with locker in it + * @param lockConfig + * @throws RedisProcessException if catch any exception from {@link redis.clients.jedis.Jedis} + * @throws OptimisticLockingException if the lock is released or be hold by another one. + */ + @Override + public void expandLockExpire(DLockEntity leaseLock, DLockConfig lockConfig) + throws RedisProcessException, OptimisticLockingException { + // Expire if key is existed and equal with the specified value(locker). + String leaseScript = "if (redis.call('get', KEYS[1]) == ARGV[1]) then " + + " return redis.call('pexpire', KEYS[1], ARGV[2]); " + + "else" + + " return nil; " + + "end; "; + + Object leaseRes; + try { + leaseRes = jedisClient.eval(leaseScript, Arrays.asList(lockConfig.getLockUniqueKey()), + Arrays.asList(leaseLock.getLocker(), lockConfig.getMillisLease() + "")); + } catch (Exception e) { + LOGGER.warn("Exception occurred by ExpandLease lua script for key:" + lockConfig.getLockUniqueKey(), e); + throw new RedisProcessException( + "Exception occurred by ExpandLease lua script for key:" + lockConfig.getLockUniqueKey(), e); + } + + // null means lua return nil (the lock is released or be hold by the other request) + if (leaseRes == null) { + LOGGER.warn("Fail to lease for key:{} ,locker={}", lockConfig.getLockUniqueKey(), leaseLock.getLocker()); + throw new OptimisticLockingException( + "Fail to lease for key:" + lockConfig.getLockUniqueKey() + " ,locker=" + leaseLock.getLocker()); + } + } + + /** + * Release lock using lua script. + * + * @param currentLock with locker in it + * @param lockConfig + * @throws RedisProcessException if catch any exception from {@link redis.clients.jedis.Jedis} + * @throws OptimisticLockingException if the lock is released or be hold by another one. + */ + @Override + public void updateForUnlock(DLockEntity currentLock, DLockConfig lockConfig) + throws RedisProcessException, OptimisticLockingException { + // Delete if key is existed and equal with the specified value(locker). + String unlockScript = "if (redis.call('get', KEYS[1]) == ARGV[1]) then " + + " return redis.call('del', KEYS[1]); " + + "else " + + " return nil; " + + "end;"; + + Object unlockRes; + try { + unlockRes = jedisClient.eval(unlockScript, Arrays.asList(lockConfig.getLockUniqueKey()), + Arrays.asList(currentLock.getLocker())); + } catch (Exception e) { + LOGGER.warn("Exception occurred by Unlock lua script for key:" + lockConfig.getLockUniqueKey(), e); + throw new RedisProcessException( + "Exception occurred by Unlock lua script for key:" + lockConfig.getLockUniqueKey(), e); + } + + // null means lua return nil (the lock is released or be hold by the other request) + if (unlockRes == null) { + LOGGER.warn("Fail to unlock for key:{} ,locker={}", lockConfig.getLockUniqueKey(), currentLock.getLocker()); + throw new OptimisticLockingException("Fail to unlock for key:" + lockConfig.getLockUniqueKey() + + ",locker=" + currentLock.getLocker()); + } + } + + @Override + public boolean isLockFree(String uniqueKey) { + DLockEntity locked = this.load(uniqueKey); + return locked == null; + } + + @Override + public Object eval(String script, List keys, List args) { + try { + return jedisClient.eval(script, keys, args); + } catch (Exception e) { + LOGGER.warn("Exception occurred by eval lua script for keys:" + keys, e); + throw new RedisProcessException( + "Exception occurred by eval lua script for keys:" + keys, e); + } + } +} diff --git a/src/main/resources/com/baidu/fsg/dlock/lua/acquire_read.lua b/src/main/resources/com/baidu/fsg/dlock/lua/acquire_read.lua new file mode 100644 index 0000000..f796fac --- /dev/null +++ b/src/main/resources/com/baidu/fsg/dlock/lua/acquire_read.lua @@ -0,0 +1,27 @@ +-- Acquire Read Lock +-- KEYS[1] lock key +-- ARGV[1] client id +-- ARGV[2] lease time in milliseconds +-- return reentry count on success, 0 on failure + +local lockKey = KEYS[1] +local clientId = ARGV[1] +local leaseTime = ARGV[2] + +local writer = redis.call('hget', lockKey, 'writer') +if writer then + return 0 -- Write lock is held, cannot acquire read lock +end + +-- Acquire or re-enter read lock +local reentry = redis.call('hincrby', lockKey, clientId, 1) + +-- If this is the first read lock, the key might not have an expiry +if reentry == 1 and redis.call('hlen', lockKey) == 1 then + redis.call('pexpire', lockKey, leaseTime) +else + -- Refresh lease for all lock holders + redis.call('pexpire', lockKey, leaseTime) +end + +return reentry diff --git a/src/main/resources/com/baidu/fsg/dlock/lua/acquire_write.lua b/src/main/resources/com/baidu/fsg/dlock/lua/acquire_write.lua new file mode 100644 index 0000000..886c1a9 --- /dev/null +++ b/src/main/resources/com/baidu/fsg/dlock/lua/acquire_write.lua @@ -0,0 +1,29 @@ +-- Acquire Write Lock +-- KEYS[1] lock key +-- ARGV[1] client id +-- ARGV[2] lease time in milliseconds +-- return 1 for new lock, >1 for reentrant lock, 0 for failure + +local lockKey = KEYS[1] +local clientId = ARGV[1] +local leaseTime = ARGV[2] + +local writer = redis.call('hget', lockKey, 'writer') + +-- Handle reentrant write lock +if writer and writer == clientId then + local reentry = redis.call('hincrby', lockKey, 'reentry', 1) + redis.call('pexpire', lockKey, leaseTime) + return reentry +end + +-- Fail if lock is held by another writer or any readers +if redis.call('hlen', lockKey) > 0 then + return 0 +end + +-- Acquire new lock +redis.call('hset', lockKey, 'writer', clientId) +redis.call('hset', lockKey, 'reentry', 1) +redis.call('pexpire', lockKey, leaseTime) +return 1 diff --git a/src/main/resources/com/baidu/fsg/dlock/lua/release_read.lua b/src/main/resources/com/baidu/fsg/dlock/lua/release_read.lua new file mode 100644 index 0000000..78cf3d3 --- /dev/null +++ b/src/main/resources/com/baidu/fsg/dlock/lua/release_read.lua @@ -0,0 +1,25 @@ +-- Release Read Lock +-- KEYS[1] lock key +-- ARGV[1] client id +-- return number of remaining locks, nil if not a reader + +local lockKey = KEYS[1] +local clientId = ARGV[1] + +if redis.call('hexists', lockKey, clientId) == 0 then + return nil +end + +local reentry = tonumber(redis.call('hincrby', lockKey, clientId, -1)) + +if reentry > 0 then + return redis.call('hlen', lockKey) +else + redis.call('hdel', lockKey, clientId) + local remaining = redis.call('hlen', lockKey) + if remaining == 0 then + redis.call('del', lockKey) + -- In a more advanced version, we could publish a message here to unpark writers + end + return remaining +end diff --git a/src/main/resources/com/baidu/fsg/dlock/lua/release_write.lua b/src/main/resources/com/baidu/fsg/dlock/lua/release_write.lua new file mode 100644 index 0000000..2c02200 --- /dev/null +++ b/src/main/resources/com/baidu/fsg/dlock/lua/release_write.lua @@ -0,0 +1,24 @@ +-- Release Write Lock +-- KEYS[1] lock key +-- ARGV[1] client id +-- return reentry count if still held, 0 if released, nil if not owner + +local lockKey = KEYS[1] +local clientId = ARGV[1] + +local writer = redis.call('hget', lockKey, 'writer') +if not writer or writer ~= clientId then + return nil +end + +local reentry = tonumber(redis.call('hincrby', lockKey, 'reentry', -1)) + +if reentry > 0 then + -- Still held + return reentry +else + -- Released + redis.call('del', lockKey) + -- In a more advanced version, we could publish a message here to unpark waiters + return 0 +end diff --git a/src/test/java/com/baidu/fsg/dlock/DistributedReadWriteLockTest.java b/src/test/java/com/baidu/fsg/dlock/DistributedReadWriteLockTest.java new file mode 100644 index 0000000..710c115 --- /dev/null +++ b/src/test/java/com/baidu/fsg/dlock/DistributedReadWriteLockTest.java @@ -0,0 +1,175 @@ +/* + * Copyright (c) 2017 Baidu, Inc. All Rights Reserve. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.baidu.fsg.dlock; + +import com.baidu.fsg.dlock.domain.DLockConfig; +import com.baidu.fsg.dlock.processor.DLockProcessor; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.Lock; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DistributedReadWriteLockTest { + + @Mock + private DLockProcessor lockProcessor; + + private ReadWriteLock readWriteLock; + + @Before + public void setup() { + DLockConfig lockConfig = new DLockConfig("RW_LOCK", "TEST", 5000, TimeUnit.MILLISECONDS); + readWriteLock = new DistributedReadWriteLock(lockConfig, lockProcessor); + } + + @Test + public void testSimpleWriteLock() { + Lock writeLock = readWriteLock.writeLock(); + // Simulate successful lock and unlock + when(lockProcessor.eval(anyString(), anyList(), anyList())) + .thenReturn(1L) // acquire + .thenReturn(0L); // release + + writeLock.lock(); + writeLock.unlock(); + // Test completes if no deadlock or exception occurs + } + + @Test + public void testSimpleReadLock() { + Lock readLock = readWriteLock.readLock(); + // Simulate successful lock and unlock + when(lockProcessor.eval(anyString(), anyList(), anyList())) + .thenReturn(1L) // acquire + .thenReturn(0L); // release + + readLock.lock(); + readLock.unlock(); + // Test completes if no deadlock or exception occurs + } + + @Test + public void testWriteLockBlocksReadLock() throws InterruptedException { + Lock writeLock = readWriteLock.writeLock(); + Lock readLock = readWriteLock.readLock(); + AtomicBoolean readLockAcquired = new AtomicBoolean(false); + + // Writer acquires the lock + when(lockProcessor.eval(anyString(), anyList(), anyList())).thenReturn(1L); + writeLock.lock(); + + // Now, when reader tries, it should fail (return 0) + // And when writer releases, it succeeds (return 0) + // And when reader tries again, it succeeds (return 1) + when(lockProcessor.eval(anyString(), anyList(), anyList())) + .thenReturn(0L) // read lock fails + .thenReturn(0L) // write lock releases + .thenReturn(1L); // read lock acquires + + Thread t = new Thread(() -> { + readLock.lock(); // This will block until write lock is released + readLockAcquired.set(true); + readLock.unlock(); + }); + t.start(); + + Thread.sleep(200); // Give the thread time to block + Assert.assertFalse("Read lock should not be acquired while write lock is held", readLockAcquired.get()); + + writeLock.unlock(); + t.join(1000); // Wait for the thread to finish + Assert.assertTrue("Read lock should be acquired after write lock is released", readLockAcquired.get()); + } + + @Test + public void testReadLockBlocksWriteLock() throws InterruptedException { + Lock writeLock = readWriteLock.writeLock(); + Lock readLock = readWriteLock.readLock(); + AtomicBoolean writeLockAcquired = new AtomicBoolean(false); + + // Reader acquires the lock + when(lockProcessor.eval(anyString(), anyList(), anyList())).thenReturn(1L); + readLock.lock(); + + // Now, when writer tries, it should fail (return 0) + when(lockProcessor.eval(anyString(), anyList(), anyList())).thenReturn(0L); + // When reader releases, it succeeds (return 0) + // And when writer tries again, it succeeds (return 1) + // Also mock isLockFree for the retry thread + when(lockProcessor.isLockFree(anyString())).thenReturn(false, true); + + Thread t = new Thread(() -> { + writeLock.lock(); // This will block + writeLockAcquired.set(true); + writeLock.unlock(); + }); + t.start(); + + Thread.sleep(200); + Assert.assertFalse("Write lock should not be acquired while read lock is held", writeLockAcquired.get()); + + readLock.unlock(); + + // After unlock, the writer thread should be unparked and acquire the lock + when(lockProcessor.eval(anyString(), anyList(), anyList())).thenReturn(1L); + + t.join(1000); + Assert.assertTrue("Write lock should be acquired after read lock is released", writeLockAcquired.get()); + } + + @Test + public void testMultipleReadersAllowed() throws InterruptedException { + Lock readLock1 = readWriteLock.readLock(); + Lock readLock2 = readWriteLock.readLock(); + + // Simulate both read locks being acquired successfully + when(lockProcessor.eval(anyString(), anyList(), anyList())).thenReturn(1L, 2L); + + final CountDownLatch latch = new CountDownLatch(2); + + Thread t1 = new Thread(() -> { + readLock1.lock(); + latch.countDown(); + readLock1.unlock(); + }); + + Thread t2 = new Thread(() -> { + readLock2.lock(); + latch.countDown(); + readLock2.unlock(); + }); + + t1.start(); + t2.start(); + + boolean completed = latch.await(2, TimeUnit.SECONDS); + Assert.assertTrue("Both threads should acquire read locks concurrently", completed); + } +}