CountDownLatch详解ITeye - 凯发娱乐

CountDownLatch详解ITeye

2019年04月01日11时08分46秒 | 作者: 运杰 | 标签: 线程,获取,假如 | 浏览: 1989

* p b Sample usage: /b Here is a pair of classes in which a group * of worker threads use two countdown latches: 这是一个用两个闭锁完结作业线程使命的实例 * [list] * li The first is a start signal that prevents any worker from proceeding * until the driver is ready for them to proceed; * li The second is a completion signal that allows the driver to wait * until all workers have completed. * [/list] 开端闭锁,用于阻挠一切线程开端作业,直到线程准备好;第二个闭锁用于等候一切的 作业线程完结使命 * pre * class Driver { // ... * void main() throws InterruptedException { * CountDownLatch startSignal = new CountDownLatch(1); * CountDownLatch doneSignal = new CountDownLatch(N); * for (int i = 0; i ++i) // create and start threads * new Thread(new Worker(startSignal, doneSignal)).start(); * doSomethingElse(); // dont let run yet * startSignal.countDown(); // let all threads proceed * doSomethingElse(); * doneSignal.await(); // wait for all to finish * class Worker implements Runnable { * private final CountDownLatch startSignal; * private final CountDownLatch doneSignal; * Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { * this.startSignal = startSignal; * this.doneSignal = doneSignal; * public void run() { * try { * startSignal.await(); * doWork(); * doneSignal.countDown(); * } catch (InterruptedException ex) {} // return; * void doWork() { ... } * /pre * p Another typical usage would be to divide a problem into N parts, * describe each part with a Runnable that executes that portion and * counts down on the latch, and queue all the Runnables to an * Executor. When all sub-parts are complete, the coordinating thread * will be able to pass through await. (When threads must repeatedly * count down in this way, instead use a {@link CyclicBarrier}.) 另一个典型的使用场景,是将一个问题分红N部分,每个部分用一个线程去履行, 履行完后,countdown,用线程池履行线程行列。当一切的分部分使命履行完, 和谐线程能够pass await。若果想重复countdown,能够用CyclicBarrier * pre * class Driver2 { // ... * void main() throws InterruptedException { * CountDownLatch doneSignal = new CountDownLatch(N); * Executor e = ... * for (int i = 0; i ++i) // create and start threads * e.execute(new WorkerRunnable(doneSignal, i)); * doneSignal.await(); // wait for all to finish * class WorkerRunnable implements Runnable { * private final CountDownLatch doneSignal; * private final int i; * WorkerRunnable(CountDownLatch doneSignal, int i) { * this.doneSignal = doneSignal; * this.i = i; * public void run() { * try { * doWork(i); * doneSignal.countDown(); * } catch (InterruptedException ex) {} // return; * void doWork() { ... } * /pre *这个不翻译了:翻译往后,没有原始的滋味。 * p Memory consistency effects: Until the count reaches * zero, actions in a thread prior to calling * {@code countDown()} * [url=package-summary.html#MemoryVisibility] i happen-before /i [/url] * actions following a successful return from a corresponding * {@code await()} in another thread. * @since 1.5 * @author Doug Lea public class CountDownLatch { * Synchronization control For CountDownLatch. * Uses AQS state to represent count. //根据AQS的内部同步器Sync private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //结构同步器,设置状况为count Sync(int count) { setState(count); //获取锁状况 int getCount() { return getState(); //测验以公正的办法,获取锁,当锁状况为0,则回来1,否则为-1 protected int tryAcquireShared(int acquires) { return (getState() 0) ? 1 : -1; //测验开释同享锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { //自旋测验开释同享锁 int c = getState(); if (c 0) //假如锁状况为0,则开释失利 return false; int nextc = c-1; //以CAS办法,修正锁状况,减1 if (compareAndSetState(c, nextc)) return nextc 0; //内部锁 private final Sync sync; * Constructs a {@code CountDownLatch} initialized with the given count. * @param count the number of times {@link #countDown} must be invoked * before threads can pass through {@link #await} * @throws IllegalArgumentException if {@code count} is negative //结构CountDownLatch public CountDownLatch(int count) { if (count 0) throw new IllegalArgumentException("count 0"); this.sync = new Sync(count); * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}. *堵塞当时线程,直到锁count为零,或许线程被中止。 * p If the current count is zero then this method returns immediately. *count为0,则办法以及回来 * p If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of two things happen: 假如count大于零,当时线程,自旋获取锁,直到获取锁,或线程中止 * [list] * li The count reaches zero due to invocations of the * {@link #countDown} method; or * li Some other thread {@linkplain Thread#interrupt interrupts} * the current thread. * [/list] * p If the current thread: * [list] * li has its interrupted status set on entry to this method; or * li is {@linkplain Thread#interrupt interrupted} while waiting, * [/list] * then {@link InterruptedException} is thrown and the current threads * interrupted status is cleared. *当线程等候时,被中止;当抛出反常时,中止位将被铲除。 * @throws InterruptedException if the current thread is interrupted * while waiting public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is {@linkplain Thread#interrupt interrupted}, * or the specified waiting time elapses. * 假如count大于零,当时线程,自旋获取锁,直到获取锁,或线程中止,或时刻超时 * p If the current count is zero then this method returns immediately * with the value {@code true}. * p If the current count is greater than zero then the current * thread becomes disabled for thread scheduling purposes and lies * dormant until one of three things happen: * [list] * li The count reaches zero due to invocations of the * {@link #countDown} method; or * li Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * li The specified waiting time elapses. * [/list] * p If the count reaches zero then the method returns with the * value {@code true}. * p If the current thread: * [list] * li has its interrupted status set on entry to this method; or * li is {@linkplain Thread#interrupt interrupted} while waiting, * [/list] * then {@link InterruptedException} is thrown and the current threads * interrupted status is cleared. * p If the specified waiting time elapses then the value {@code false} * is returned. If the time is less than or equal to zero, the method * will not wait at all. * @param timeout the maximum time to wait * @param unit the time unit of the {@code timeout} argument * @return {@code true} if the count reached zero and {@code false} * if the waiting time elapsed before the count reached zero * @throws InterruptedException if the current thread is interrupted * while waiting public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. *开释同享锁 * p If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * p If the current count equals zero then nothing happens. public void countDown() { sync.releaseShared(1); * Returns the current count.回来当时锁状况 * p This method is typically used for debugging and testing purposes. * @return the current count public long getCount() { return sync.getCount(); }
下面我么来独自看一下,await和countDown,先看await
 public void await() throws InterruptedException {
 sync.acquireSharedInterruptibly(1);
 }


//AQS
**
 * Acquires in shared mode, aborting if interrupted. Implemented
 * by first checking interrupt status, then invoking at least once
 * {@link #tryAcquireShared}, returning on success. Otherwise the
 * thread is queued, possibly repeatedly blocking and unblocking,
 * invoking {@link #tryAcquireShared} until success or the thread
 * is interrupted.
 获取同享形式锁,假如中止,则aborting,首要查看中止状况,然后自旋,
 测验获取同享锁,直到成功。假如线程因为未获取锁,进入行列,或许需求
 重复blocking and unblocking,测验获取同享锁,直到成功,或线程中止。
 * @param arg the acquire argument
 * This value is conveyed to {@link #tryAcquireShared} but is
 * otherwise uninterpreted and can represent anything
 * you like.
 * @throws InterruptedException if the current thread is interrupted
 public final void acquireSharedInterruptibly(int arg)
 throws InterruptedException {
 if (Thread.interrupted())
 //假如线程中止,则抛出中止反常
 throw new InterruptedException();
 测验获取锁,假如失利doAcquireSharedInterruptibly
 if (tryAcquireShared(arg) 0)
 doAcquireSharedInterruptibly(arg);
 //待父类扩展
 protected int tryAcquireShared(int arg) {
 throw new UnsupportedOperationException();

来看CountDownLatch-内部同步器SYNC的tryAcquireShared完成
//测验以公正的办法,获取锁,当锁状况为0,则回来1,获取成功,否则为-1,失利
 
 protected int tryAcquireShared(int acquires) {
 return (getState()  0) ? 1 : -1;
 }

再看第二步
doAcquireSharedInterruptibly(arg);

//AQS
 
/**
 * Acquires in shared interruptible mode.
 * @param arg the acquire argument
 //以同享可中止办法,获取锁
 private void doAcquireSharedInterruptibly(int arg)
 throws InterruptedException {
 //增加同享节点到同步等候行列
 final Node node = addWaiter(Node.SHARED);
 boolean failed = true;
 try {
 //自旋,测验获取锁,成功则回来
 for (;;) {
 final Node p = node.predecessor();
 /*假如节点的前驱是头节点,当时节点为第一个有用节点,
 则测验获取锁,假如获取成功*/
 if (p  head) {
 int r = tryAcquireShared(arg);
 if (r = 0) {
 /*设置当时节点为头结点,假如需求唤醒后继节点线程,则unpark
 后继节点线程,假如状况为0,则是指状况为PROPAGATE,告诉后继节点
 锁已开释。*/
 setHeadAndPropagate(node, r);
 p.next = null; // help GC
 failed = false;
 return;
 /*假如前驱不是头结点,则判别测验获取失利,是否应该park,
 假如是,则park,查看是否应该中止,当时线程,假如是,则中止
 当时线程。*/
 if (shouldParkAfterFailedAcquire(p, node) 
 parkAndCheckInterrupt())
 throw new InterruptedException();
 } finally {
 if (failed)
 //获取锁进程,失利则,移除线程节点
 cancelAcquire(node);
 }

咱们再来看一下
setHeadAndPropagate(node, r);

这一句是什么意思?
 
/**
 * Sets head of queue, and checks if successor may be waiting
 * in shared mode, if so propagating if either propagate 0 or
 * PROPAGATE status was set.
 *设置行列的头结点,查看后继节点是否在等候同享锁,成功回去则回来1,
 所以这儿propagate1
 * @param node the node
 * @param propagate the return value from a tryAcquireShared
 private void setHeadAndPropagate(Node node, int propagate) {
 Node h = head; // Record old head for check below
 //当节点获取锁,成功则设置为头结点
 setHead(node);
 * Try to signal next queued node if:
 * Propagation was indicated by caller,
 * or was recorded (as h.waitStatus) by a previous operation
 * (note: this uses sign-check of waitStatus because
 * PROPAGATE status may transition to SIGNAL.)
 唤醒后继节点
 * and
 * The next node is waiting in shared mode,
 * or we dont know, because it appears null
 * The conservatism in both of these checks may cause
 * unnecessary wake-ups, but only when there are multiple
 * racing acquires/releases, so most need signals now or soon
 * anyway.
 if (propagate 0 || h  null || h.waitStatus 0) {
 Node s = node.next;
 if (s  null || s.isShared())
 doReleaseShared();                          
			
版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表凯发娱乐立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章