View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.group;
17  
18  import static java.util.concurrent.TimeUnit.*;
19  
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.Collections;
23  import java.util.Iterator;
24  import java.util.LinkedHashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.jboss.netty.channel.Channel;
30  import org.jboss.netty.channel.ChannelFuture;
31  import org.jboss.netty.channel.ChannelFutureListener;
32  import org.jboss.netty.logging.InternalLogger;
33  import org.jboss.netty.logging.InternalLoggerFactory;
34  import org.jboss.netty.util.internal.IoWorkerRunnable;
35  
36  /**
37   * The default {@link ChannelGroupFuture} implementation.
38   *
39   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
40   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
41   *
42   * @version $Rev: 2191 $, $Date: 2010-02-19 18:18:10 +0900 (Fri, 19 Feb 2010) $
43   */
44  public class DefaultChannelGroupFuture implements ChannelGroupFuture {
45  
46      private static final InternalLogger logger =
47          InternalLoggerFactory.getInstance(DefaultChannelGroupFuture.class);
48  
49      private final ChannelGroup group;
50      final Map<Integer, ChannelFuture> futures;
51      private ChannelGroupFutureListener firstListener;
52      private List<ChannelGroupFutureListener> otherListeners;
53      private boolean done;
54      int successCount;
55      int failureCount;
56      private int waiters;
57  
58      private final ChannelFutureListener childListener = new ChannelFutureListener() {
59          public void operationComplete(ChannelFuture future) throws Exception {
60              boolean success = future.isSuccess();
61              boolean callSetDone = false;
62              synchronized (DefaultChannelGroupFuture.this) {
63                  if (success) {
64                      successCount ++;
65                  } else {
66                      failureCount ++;
67                  }
68  
69                  callSetDone = successCount + failureCount == futures.size();
70                  assert successCount + failureCount <= futures.size();
71              }
72  
73              if (callSetDone) {
74                  setDone();
75              }
76          }
77      };
78  
79      /**
80       * Creates a new instance.
81       */
82      public DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures) {
83          if (group == null) {
84              throw new NullPointerException("group");
85          }
86          if (futures == null) {
87              throw new NullPointerException("futures");
88          }
89  
90          this.group = group;
91  
92          Map<Integer, ChannelFuture> futureMap = new LinkedHashMap<Integer, ChannelFuture>();
93          for (ChannelFuture f: futures) {
94              futureMap.put(f.getChannel().getId(), f);
95          }
96  
97          this.futures = Collections.unmodifiableMap(futureMap);
98  
99          for (ChannelFuture f: this.futures.values()) {
100             f.addListener(childListener);
101         }
102 
103         // Done on arrival?
104         if (this.futures.isEmpty()) {
105             setDone();
106         }
107     }
108 
109     DefaultChannelGroupFuture(ChannelGroup group, Map<Integer, ChannelFuture> futures) {
110         this.group = group;
111         this.futures = Collections.unmodifiableMap(futures);
112         for (ChannelFuture f: this.futures.values()) {
113             f.addListener(childListener);
114         }
115 
116         // Done on arrival?
117         if (this.futures.isEmpty()) {
118             setDone();
119         }
120     }
121 
122     public ChannelGroup getGroup() {
123         return group;
124     }
125 
126     public ChannelFuture find(Integer channelId) {
127         return futures.get(channelId);
128     }
129 
130     public ChannelFuture find(Channel channel) {
131         return futures.get(channel.getId());
132     }
133 
134     public Iterator<ChannelFuture> iterator() {
135         return futures.values().iterator();
136     }
137 
138     public synchronized boolean isDone() {
139         return done;
140     }
141 
142     public synchronized boolean isCompleteSuccess() {
143         return successCount == futures.size();
144     }
145 
146     public synchronized boolean isPartialSuccess() {
147         return !futures.isEmpty() && successCount != 0;
148     }
149 
150     public synchronized boolean isPartialFailure() {
151         return !futures.isEmpty() && failureCount != 0;
152     }
153 
154     public synchronized boolean isCompleteFailure() {
155         return failureCount == futures.size();
156     }
157 
158     public void addListener(ChannelGroupFutureListener listener) {
159         if (listener == null) {
160             throw new NullPointerException("listener");
161         }
162 
163         boolean notifyNow = false;
164         synchronized (this) {
165             if (done) {
166                 notifyNow = true;
167             } else {
168                 if (firstListener == null) {
169                     firstListener = listener;
170                 } else {
171                     if (otherListeners == null) {
172                         otherListeners = new ArrayList<ChannelGroupFutureListener>(1);
173                     }
174                     otherListeners.add(listener);
175                 }
176             }
177         }
178 
179         if (notifyNow) {
180             notifyListener(listener);
181         }
182     }
183 
184     public void removeListener(ChannelGroupFutureListener listener) {
185         if (listener == null) {
186             throw new NullPointerException("listener");
187         }
188 
189         synchronized (this) {
190             if (!done) {
191                 if (listener == firstListener) {
192                     if (otherListeners != null && !otherListeners.isEmpty()) {
193                         firstListener = otherListeners.remove(0);
194                     } else {
195                         firstListener = null;
196                     }
197                 } else if (otherListeners != null) {
198                     otherListeners.remove(listener);
199                 }
200             }
201         }
202     }
203 
204     public ChannelGroupFuture await() throws InterruptedException {
205         if (Thread.interrupted()) {
206             throw new InterruptedException();
207         }
208 
209         synchronized (this) {
210             while (!done) {
211                 checkDeadLock();
212                 waiters++;
213                 try {
214                     this.wait();
215                 } finally {
216                     waiters--;
217                 }
218             }
219         }
220         return this;
221     }
222 
223     public boolean await(long timeout, TimeUnit unit)
224             throws InterruptedException {
225         return await0(unit.toNanos(timeout), true);
226     }
227 
228     public boolean await(long timeoutMillis) throws InterruptedException {
229         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
230     }
231 
232     public ChannelGroupFuture awaitUninterruptibly() {
233         boolean interrupted = false;
234         synchronized (this) {
235             while (!done) {
236                 checkDeadLock();
237                 waiters++;
238                 try {
239                     this.wait();
240                 } catch (InterruptedException e) {
241                     interrupted = true;
242                 } finally {
243                     waiters--;
244                 }
245             }
246         }
247 
248         if (interrupted) {
249             Thread.currentThread().interrupt();
250         }
251 
252         return this;
253     }
254 
255     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
256         try {
257             return await0(unit.toNanos(timeout), false);
258         } catch (InterruptedException e) {
259             throw new InternalError();
260         }
261     }
262 
263     public boolean awaitUninterruptibly(long timeoutMillis) {
264         try {
265             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
266         } catch (InterruptedException e) {
267             throw new InternalError();
268         }
269     }
270 
271     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
272         if (interruptable && Thread.interrupted()) {
273             throw new InterruptedException();
274         }
275 
276         long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
277         long waitTime = timeoutNanos;
278         boolean interrupted = false;
279 
280         try {
281             synchronized (this) {
282                 if (done) {
283                     return done;
284                 } else if (waitTime <= 0) {
285                     return done;
286                 }
287 
288                 checkDeadLock();
289                 waiters++;
290                 try {
291                     for (;;) {
292                         try {
293                             this.wait(waitTime / 1000000, (int) (waitTime % 1000000));
294                         } catch (InterruptedException e) {
295                             if (interruptable) {
296                                 throw e;
297                             } else {
298                                 interrupted = true;
299                             }
300                         }
301 
302                         if (done) {
303                             return true;
304                         } else {
305                             waitTime = timeoutNanos - (System.nanoTime() - startTime);
306                             if (waitTime <= 0) {
307                                 return done;
308                             }
309                         }
310                     }
311                 } finally {
312                     waiters--;
313                 }
314             }
315         } finally {
316             if (interrupted) {
317                 Thread.currentThread().interrupt();
318             }
319         }
320     }
321 
322     private void checkDeadLock() {
323         if (IoWorkerRunnable.IN_IO_THREAD.get()) {
324             throw new IllegalStateException(
325                     "await*() in I/O thread causes a dead lock or " +
326                     "sudden performance drop. Use addListener() instead or " +
327                     "call await*() from a different thread.");
328         }
329     }
330 
331     boolean setDone() {
332         synchronized (this) {
333             // Allow only once.
334             if (done) {
335                 return false;
336             }
337 
338             done = true;
339             if (waiters > 0) {
340                 notifyAll();
341             }
342         }
343 
344         notifyListeners();
345         return true;
346     }
347 
348     private void notifyListeners() {
349         // This method doesn't need synchronization because:
350         // 1) This method is always called after synchronized (this) block.
351         //    Hence any listener list modification happens-before this method.
352         // 2) This method is called only when 'done' is true.  Once 'done'
353         //    becomes true, the listener list is never modified - see add/removeListener()
354         if (firstListener != null) {
355             notifyListener(firstListener);
356             firstListener = null;
357 
358             if (otherListeners != null) {
359                 for (ChannelGroupFutureListener l: otherListeners) {
360                     notifyListener(l);
361                 }
362                 otherListeners = null;
363             }
364         }
365     }
366 
367     private void notifyListener(ChannelGroupFutureListener l) {
368         try {
369             l.operationComplete(this);
370         } catch (Throwable t) {
371             logger.warn(
372                     "An exception was thrown by " +
373                     ChannelFutureListener.class.getSimpleName() + ".", t);
374         }
375     }
376 }