Lock介绍

有梦想的搬砖人 / 123 /

ChatGPT 可用网址,仅供交流学习使用,如对您有所帮助,请收藏并推荐给需要的朋友。
https://ckai.xyz

Lock介绍

Lock是juc(java.util.concurrent)包下面的一个接口类,是作者Doug Lea定义的api规范,主要接口有

api 说明
void lock() 获取锁。 如果锁不可用,则出于线程调度目的,当前线程将被禁用,并处于休眠状态,直到获取锁为止。
void lockInterruptibly() throws InterruptedException 除非当前线程被中断,否则获取锁。 如果锁可用,则获取锁并立即返回。 如果锁不可用,则出于线程调度目的,当前线程将被禁用,并处于休眠状态,直到发生以下两种情况之一: 锁被当前线程获取; 或者 其他线程中断当前线程,支持中断获取锁。 如果当前线程: 在进入此方法时设置其中断状态; 或者 获取锁时中断,支持中断获取锁, 然后抛出InterruptedException并清除当前线程的中断状态。
boolean tryLock() 仅当调用时锁空闲时才获取锁。 如果锁可用,则获取锁并立即返回 true 值。 如果锁不可用,则此方法将立即返回 false 值。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException 如果在给定的等待时间内锁是空闲的并且当前线程没有被中断,则获取锁。 如果锁可用,此方法立即返回 true 值。 如果锁不可用,则出于线程调度目的,当前线程将被禁用,并处于休眠状态,直到发生以下三种情况之一: 锁被当前线程获取; 或者 其他线程中断当前线程,支持中断获取锁; 或者 指定的等待时间已过 如果获取了锁,则返回 true 值。 如果当前线程: 在进入此方法时设置其中断状态; 或者 获取锁时中断,支持中断获取锁, 然后抛出InterruptedException并清除当前线程的中断状态。 如果指定的等待时间已过,则返回值 false。 如果时间小于或等于零,则该方法根本不会等待。
void unlock() 释放锁
Condition newCondition() 返回绑定到此 Lock 实例的新 Condition 实例。 在等待条件之前,当前线程必须持有锁。 对 Condition.await() 的调用将在等待之前自动释放锁,并在等待返回之前重新获取锁。

Condition介绍

Condition也是juc包下的一个接口类,需要在线程持有Lock的状态下操作该接口下的方法,主要接口有

api 说明
void await() throws InterruptedException 导致当前线程等待,直到收到信号或中断。 与此条件关联的锁被自动释放,当前线程出于线程调度目的而被禁用,并处于休眠状态,直到发生以下四种情况之一: 其他线程调用该Condition的signal方法,并且当前线程恰好被选为要唤醒的线程; 或者 其他一些线程为此条件调用 signalAll 方法; 或者 其他线程中断当前线程,支持线程挂起中断; 或者 发生“虚假唤醒”。 在所有情况下,在此方法返回之前,当前线程必须重新获取与此条件关联的锁。 当线程返回时,保证持有该锁。 如果当前线程: 在进入此方法时设置其中断状态; 或者 等待时被中断,支持线程挂起中断, 然后抛出InterruptedException并清除当前线程的中断状态。 在第一种情况下,没有指定是否在释放锁之前进行中断测试。
void lockInterruptibly() throws InterruptedException 除非当前线程被中断,否则获取锁。 如果锁可用,则获取锁并立即返回。 如果锁不可用,则出于线程调度目的,当前线程将被禁用,并处于休眠状态,直到发生以下两种情况之一: 锁被当前线程获取; 或者 其他线程中断当前线程,支持中断获取锁。 如果当前线程: 在进入此方法时设置其中断状态; 或者 获取锁时中断,支持中断获取锁, 然后抛出InterruptedException并清除当前线程的中断状态。
void awaitUninterruptibly() 导致当前线程等待,直到收到信号。 与此条件关联的锁被自动释放,当前线程出于线程调度目的而被禁用,并处于休眠状态,直到发生以下三种情况之一: 其他线程调用该Condition的signal方法,并且当前线程恰好被选为要唤醒的线程; 或者 其他一些线程为此条件调用 signalAll 方法; 或者 发生“虚假唤醒”。 在所有情况下,在此方法返回之前,当前线程必须重新获取与此条件关联的锁。 当线程返回时,保证持有该锁。 如果当前线程在进入该方法时设置了中断状态,或者在等待时被中断,它将继续等待,直到发出信号。 当它最终从此方法返回时,其中断状态仍将被设置。
long awaitNanos(long nanosTimeout) throws InterruptedException 使当前线程等待,直到收到信号或中断,或者指定的等待时间过去。 与此条件关联的锁被自动释放,当前线程出于线程调度目的而被禁用,并处于休眠状态,直到发生以下五种情况之一: 其他线程调用该Condition的signal方法,并且当前线程恰好被选为要唤醒的线程; 或者 其他一些线程为此条件调用 signalAll 方法; 或者 其他线程中断当前线程,支持线程挂起中断; 或者 规定的等待时间已过; 或者 发生“虚假唤醒”。 在所有情况下,在此方法返回之前,当前线程必须重新获取与此条件关联的锁。 当线程返回时,保证持有该锁。 如果当前线程: 在进入此方法时设置其中断状态; 或者 等待时被中断,支持线程挂起中断。
boolean await(long time, TimeUnit unit) throws InterruptedException 使当前线程等待,直到收到信号或中断,或者指定的等待时间过去。
boolean awaitUntil(Date deadline) throws InterruptedException 导致当前线程等待,直到收到信号或中断,或者指定的截止时间过去。 与此条件关联的锁被自动释放,当前线程出于线程调度目的而被禁用,并处于休眠状态,直到发生以下五种情况之一: 其他线程调用该Condition的signal方法,并且当前线程恰好被选为要唤醒的线程; 或者 其他一些线程为此条件调用 signalAll 方法; 或者 其他线程中断当前线程,支持线程挂起中断; 或者 规定的期限已过; 或者 发生“虚假唤醒”。 在所有情况下,在此方法返回之前,当前线程必须重新获取与此条件关联的锁。 当线程返回时,保证持有该锁。 如果当前线程: 在进入此方法时设置其中断状态; 或者 等待时被中断,支持线程挂起中断。
void signal() 唤醒一个等待线程。 如果有任何线程在此条件下等待,则选择一个线程来唤醒。 该线程必须在从等待返回之前重新获取锁。
void signalAll() 唤醒所有等待线程。 如果任何线程正在等待这种情况,那么它们都会被唤醒。 每个线程必须重新获取锁才能从等待返回。

作用

Lock通过lock,trylock和unlock接口操作,可以保证在多线程环境下的代码块的同步执行,保证执行结果的正确性.通过Condition的await和signal,signallAll操作,保证同步代码块中的条件得到满足才能被执行完成.提示代码如下:


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockAndConditionDemo {
    private List<String> containers = new ArrayList<>();
    private int capacity = 10;

    private Lock lock = new ReentrantLock();
    private Condition emptyCondition = lock.newCondition();
    private Condition fullCondition = lock.newCondition();

    public void add(String product) {
        lock.lock();
        try {
            while (containers.size() == capacity) {
                fullCondition.await();
            }
            containers.add(product);
            emptyCondition.signalAll();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    public String take() {
        lock.lock();
        String product = null;
        try {
            while (containers.size() == 0) {
                emptyCondition.await();
            }
            product = containers.remove(0);
            fullCondition.signalAll();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();

        }
        return product;
    }
}

ReentrantLock介绍

ReentrantLock也是juc包下的一个具体类,它实现了Lock的接口.通常也用它来作为Lock接口的具体实现类,内部通过静态内部类Sync继承AQS(AbstractQueuedSynchronizer)父类来实现Lock的lock方法.
AQS内部是一个同步等待队列,它通过state(状态计数位)和CAS(compareAndSetState)乐观锁的方式实现同步,底层是通过unsafe的本地方法unsafe.compareAndSwapInt来保证操作的原子性的.队列是通过属性head,tail和静态内部类Node的属性prev,next来实现的双向队列,通过enq方法将新包装的Thread的Node节点添加到末尾.
简要分析下ReentrantLock.lock()的源代码:

//方法一:
public void lock() {
    //通过sync内部类lock方法,默认为非公平
    sync.lock();
}
//方法二:
final void lock() {
    //通过cas乐观锁方式设置锁对象属性state的值,默认值为0
    if (compareAndSetState(0, 1))
        //成功设置为1进入此方法
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //失败进入此方法
        acquire(1);
}

//方法三:
protected final boolean compareAndSetState(int expect, int update) {
    // 通过unsafe保证操作原子性
    //stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));state的属性的地址值
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

方法四:
//就是一个set设置属性的方法,该类AbstractOwnableSynchronizer是AQS的父类
//为真到方法四执行完成lock方法就完成了
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

方法五:
public final void acquire(int arg) {
    //tryAcquire方法再次尝试获取state的值,获取成功就退出,不成功执行acquireQueued方法
    //addWaiter方法
    //acquireQueued方法
    //selfInterrupt方法
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

方法六:
//调用nonfairTryAcquire方法
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

方法七:
final boolean nonfairTryAcquire(int acquires) {
    //获取当前线程
    final Thread current = Thread.currentThread();
    //获取AQS当前属性state值
    int c = getState();
    //如果c变为0了,说明第一次CAS操作的state值已经被获取锁的线程修改为0了
    if (c == 0) {
        //再次调用方法三
        if (compareAndSetState(0, acquires)) {
            //成功则再次调用方法四,同样lock方法也执行完成了,true之后就不会执行&&后面的方法了
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果不为0,判断当前线程是否和锁对象保存的thread属性一致,如果相等说明是同一个线程,之前的锁还没有释放的情况,可以重入
    else if (current == getExclusiveOwnerThread()) {
        //当前state值+1,如果超过了int的最大值,则到达了重入的边界值抛出异常中断该线程后续执行
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //设置state属性值后lock方法也执行完了
        setState(nextc);
        return true;
    }
    return false;
}

方法八:
//方法参数为static final Node EXCLUSIVE = null;
private Node addWaiter(Node mode) {
    //创建AQS的静态内部类的node实例       
    // Node(Thread thread, Node mode) {     
            // Used by addWaiter
    //        this.nextWaiter = mode;
    //        this.thread = thread;
    //    }
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        //新加入的节点的上一个节点为该lock对象的AQS的对尾
        node.prev = pred;
        //通过CAS的方式保证AQS的对尾节点设置为新节点,成功则将原对尾节点的下一个节点为新加入的节点返回该节点,然后开始执行acquireQueued方法,不成功进入enq方法
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 将当前节点添加到AQS队列的末尾并设置尾节点尾当前节点
    enq(node);
    return node;
}

方法九:
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //获取新创建节点的前一个节点对象
            final Node p = node.predecessor();
            //如果上一个节点为头节点,则说明前面已经没有等待node了,则再次执行方法七
            if (p == head && tryAcquire(arg)) {
                //成功则设置该node为头节点,断开原先头节点指向该节点的引用链接
                //不执行cancelAcquire方法
                //不执行selfInterrupt方法,同时lock方法也执行完成了
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //方法七没有设置成功,则执行阻塞该线程的方法
            //成功则执行阻塞和检查线程是否被中断的方法
            //线程是被中断的则打断标识位设置为true,继续循环执行,否则打断标志为false继续执行
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            //非正常退出循环会执行获取失败会执行取消获取节点方法
            cancelAcquire(node);
    }
}

方法十:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取上一个节点的等待状态
    int ws = pred.waitStatus;
    //如果为-1则为true退出,执行暂停和检查打断方法parkAndCheckInterrupt()
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    //如果大于0,则找到上一个节点之前的节点的等待状态值大于0的前节点
    //并将该节点的下一个节点引用设置为该节点,退出继续循环执行上个方法
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        //如果ws值为0,则通过cas设置等待状态的值为-1,退出继续循环执行上个方法
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

方法十一:
private final boolean parkAndCheckInterrupt() {
    //线程阻塞方法
    LockSupport.park(this);
    //获取线程是否被打断的状态返回,返回之后线程状态会被重置
    return Thread.interrupted();
}

方法十二:
public static void park(Object blocker) {
    //获取当前线程
    Thread t = Thread.currentThread();
    //通过unsafe类将线程parkBlock设置为该lock对象
    setBlocker(t, blocker);
    //UNSAFE.park是阻塞线程的方法
    UNSAFE.park(false, 0L);
    //重新将线程parkBlock设置为null
    setBlocker(t, null);
}

方法十三:
private static void setBlocker(Thread t, Object arg) {
    // Even though volatile, hotspot doesn't need a write barrier here.
    //对应parkBlockerOffset放置lock对象
    UNSAFE.putObject(t, parkBlockerOffset, arg);
}

方法十四:
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    //当前节点为空就退出
    if (node == null)
        return;

    //异常退出将节点线程引用置空
    node.thread = null;

    // Skip cancelled predecessors
    //取到该节点的上个节点的等待状态小于等于0
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    //当前节点的等待状态设置为1(取消状态)
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    将当前节点是否和尾节点相等,相等则将尾节点设置为当前节点的上个节点,上个节点的下一个节点设置为空
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        前节点不等于头节点并且前节点等待状态不为-1或者将前节点的等待状态设置为-1成功并且前节点的线程属性不为空,当前节点的下一个节点不为空且等待状态小于等于0,CAS设置前节点的下一个节点为当前节点的下一个节点
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            //否则唤醒一个节点的线程
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

方法十五:
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    //获取获取当前节点的等待状态
    int ws = node.waitStatus;
    //小于0则进行CAS状态设置为0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    //当前节点的下一个节点如果为空,或者下一个等待状态大于0,s设置为null,
    if (s == null || s.waitStatus > 0) {
        s = null;
        //从尾节点开始便利到当前节点,取最靠近当前节点的且等待状态小于等于0的赋值给s
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //如果s不为空,则唤醒该节点的线程方法
    if (s != null)
        LockSupport.unpark(s.thread);
}

方法十六:
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        //尾节点是否为空,为空则通过CAS设置头节点,并将头节点设置给尾节点,再次循环
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //不为空则将当前节点的上一个节点引用设置为尾节点,通过cas方式将尾节点设置为当前节点,原尾节点的下一个节点设置为当前节点,并返回上一个节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

总结

ReentrantLock通过CAS保证原子操作,通过AQS将阻塞线程放置在队列中等待,从头取节点线程,放置进来的线程节点在对尾.


作者
有梦想的搬砖人
许可协议
CC BY 4.0
发布于
2023-09-24
修改于
2024-07-27
Bonnie image
尚未登录