View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel;
17  
18  import java.util.HashMap;
19  import java.util.LinkedHashMap;
20  import java.util.Map;
21  import java.util.NoSuchElementException;
22  
23  import org.jboss.netty.logging.InternalLogger;
24  import org.jboss.netty.logging.InternalLoggerFactory;
25  
26  /**
27   * The default {@link ChannelPipeline} implementation.  It is recommended
28   * to use {@link Channels#pipeline()} to create a new {@link ChannelPipeline}
29   * instance rather than calling the constructor directly.
30   *
31   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
32   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
33   *
34   * @version $Rev: 2119 $, $Date: 2010-02-01 20:46:09 +0900 (Mon, 01 Feb 2010) $
35   *
36   */
37  public class DefaultChannelPipeline implements ChannelPipeline {
38  
39      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
40      static final ChannelSink discardingSink = new DiscardingChannelSink();
41  
42      private volatile Channel channel;
43      private volatile ChannelSink sink;
44      private volatile DefaultChannelHandlerContext head;
45      private volatile DefaultChannelHandlerContext tail;
46      private final Map<String, DefaultChannelHandlerContext> name2ctx =
47          new HashMap<String, DefaultChannelHandlerContext>(4);
48  
49      /**
50       * Creates a new empty pipeline.
51       */
52      public DefaultChannelPipeline() {
53          super();
54      }
55  
56      public Channel getChannel() {
57          return channel;
58      }
59  
60      public ChannelSink getSink() {
61          ChannelSink sink = this.sink;
62          if (sink == null) {
63              return discardingSink;
64          }
65          return sink;
66      }
67  
68      public void attach(Channel channel, ChannelSink sink) {
69          if (channel == null) {
70              throw new NullPointerException("channel");
71          }
72          if (sink == null) {
73              throw new NullPointerException("sink");
74          }
75          if (this.channel != null || this.sink != null) {
76              throw new IllegalStateException("attached already");
77          }
78          this.channel = channel;
79          this.sink = sink;
80      }
81  
82      public boolean isAttached() {
83          return sink != null;
84      }
85  
86      public synchronized void addFirst(String name, ChannelHandler handler) {
87          if (name2ctx.isEmpty()) {
88              init(name, handler);
89          } else {
90              checkDuplicateName(name);
91              DefaultChannelHandlerContext oldHead = head;
92              DefaultChannelHandlerContext newHead = new DefaultChannelHandlerContext(null, oldHead, name, handler);
93  
94              callBeforeAdd(newHead);
95  
96              oldHead.prev = newHead;
97              head = newHead;
98              name2ctx.put(name, newHead);
99  
100             callAfterAdd(newHead);
101         }
102     }
103 
104     public synchronized void addLast(String name, ChannelHandler handler) {
105         if (name2ctx.isEmpty()) {
106             init(name, handler);
107         } else {
108             checkDuplicateName(name);
109             DefaultChannelHandlerContext oldTail = tail;
110             DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);
111 
112             callBeforeAdd(newTail);
113 
114             oldTail.next = newTail;
115             tail = newTail;
116             name2ctx.put(name, newTail);
117 
118             callAfterAdd(newTail);
119         }
120     }
121 
122     public synchronized void addBefore(String baseName, String name, ChannelHandler handler) {
123         DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
124         if (ctx == head) {
125             addFirst(name, handler);
126         } else {
127             checkDuplicateName(name);
128             DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx.prev, ctx, name, handler);
129 
130             callBeforeAdd(newCtx);
131 
132             ctx.prev.next = newCtx;
133             ctx.prev = newCtx;
134             name2ctx.put(name, newCtx);
135 
136             callAfterAdd(newCtx);
137         }
138     }
139 
140     public synchronized void addAfter(String baseName, String name, ChannelHandler handler) {
141         DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
142         if (ctx == tail) {
143             addLast(name, handler);
144         } else {
145             checkDuplicateName(name);
146             DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx, ctx.next, name, handler);
147 
148             callBeforeAdd(newCtx);
149 
150             ctx.next.prev = newCtx;
151             ctx.next = newCtx;
152             name2ctx.put(name, newCtx);
153 
154             callAfterAdd(newCtx);
155         }
156     }
157 
158     public synchronized void remove(ChannelHandler handler) {
159         remove(getContextOrDie(handler));
160     }
161 
162     public synchronized ChannelHandler remove(String name) {
163         return remove(getContextOrDie(name)).getHandler();
164     }
165 
166     @SuppressWarnings("unchecked")
167     public synchronized <T extends ChannelHandler> T remove(Class<T> handlerType) {
168         return (T) remove(getContextOrDie(handlerType)).getHandler();
169     }
170 
171     private DefaultChannelHandlerContext remove(DefaultChannelHandlerContext ctx) {
172         if (head == tail) {
173             head = tail = null;
174             name2ctx.clear();
175         } else if (ctx == head) {
176             removeFirst();
177         } else if (ctx == tail) {
178             removeLast();
179         } else {
180             callBeforeRemove(ctx);
181 
182             DefaultChannelHandlerContext prev = ctx.prev;
183             DefaultChannelHandlerContext next = ctx.next;
184             prev.next = next;
185             next.prev = prev;
186             name2ctx.remove(ctx.getName());
187 
188             callAfterRemove(ctx);
189         }
190         return ctx;
191     }
192 
193     public synchronized ChannelHandler removeFirst() {
194         if (name2ctx.isEmpty()) {
195             throw new NoSuchElementException();
196         }
197 
198         DefaultChannelHandlerContext oldHead = head;
199         if (oldHead == null) {
200             throw new NoSuchElementException();
201         }
202 
203         callBeforeRemove(oldHead);
204 
205         if (oldHead.next == null) {
206             head = tail = null;
207             name2ctx.clear();
208         } else {
209             oldHead.next.prev = null;
210             head = oldHead.next;
211             name2ctx.remove(oldHead.getName());
212         }
213 
214         callAfterRemove(oldHead);
215 
216         return oldHead.getHandler();
217     }
218 
219     public synchronized ChannelHandler removeLast() {
220         if (name2ctx.isEmpty()) {
221             throw new NoSuchElementException();
222         }
223 
224         DefaultChannelHandlerContext oldTail = tail;
225         if (oldTail == null) {
226             throw new NoSuchElementException();
227         }
228 
229         callBeforeRemove(oldTail);
230 
231         if (oldTail.prev == null) {
232             head = tail = null;
233             name2ctx.clear();
234         } else {
235             oldTail.prev.next = null;
236             tail = oldTail.prev;
237             name2ctx.remove(oldTail.getName());
238         }
239 
240         callBeforeRemove(oldTail);
241 
242         return oldTail.getHandler();
243     }
244 
245     public synchronized void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
246         replace(getContextOrDie(oldHandler), newName, newHandler);
247     }
248 
249     public synchronized ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
250         return replace(getContextOrDie(oldName), newName, newHandler);
251     }
252 
253     @SuppressWarnings("unchecked")
254     public synchronized <T extends ChannelHandler> T replace(
255             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
256         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
257     }
258 
259     private ChannelHandler replace(DefaultChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
260         if (ctx == head) {
261             removeFirst();
262             addFirst(newName, newHandler);
263         } else if (ctx == tail) {
264             removeLast();
265             addLast(newName, newHandler);
266         } else {
267             boolean sameName = ctx.getName().equals(newName);
268             if (!sameName) {
269                 checkDuplicateName(newName);
270             }
271 
272             DefaultChannelHandlerContext prev = ctx.prev;
273             DefaultChannelHandlerContext next = ctx.next;
274             DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(prev, next, newName, newHandler);
275 
276             callBeforeRemove(ctx);
277             callBeforeAdd(newCtx);
278 
279             prev.next = newCtx;
280             next.prev = newCtx;
281 
282             if (!sameName) {
283                 name2ctx.remove(ctx.getName());
284                 name2ctx.put(newName, newCtx);
285             }
286 
287             ChannelHandlerLifeCycleException removeException = null;
288             ChannelHandlerLifeCycleException addException = null;
289             boolean removed = false;
290             try {
291                 callAfterRemove(ctx);
292                 removed = true;
293             } catch (ChannelHandlerLifeCycleException e) {
294                 removeException = e;
295             }
296 
297             boolean added = false;
298             try {
299                 callAfterAdd(newCtx);
300                 added = true;
301             } catch (ChannelHandlerLifeCycleException e) {
302                 addException = e;
303             }
304 
305             if (!removed && !added) {
306                 logger.warn(removeException.getMessage(), removeException);
307                 logger.warn(addException.getMessage(), addException);
308                 throw new ChannelHandlerLifeCycleException(
309                         "Both " + ctx.getHandler().getClass().getName() +
310                         ".afterRemove() and " + newCtx.getHandler().getClass().getName() +
311                         ".afterAdd() failed; see logs.");
312             } else if (!removed) {
313                 throw removeException;
314             } else if (!added) {
315                 throw addException;
316             }
317         }
318 
319         return ctx.getHandler();
320     }
321 
322     private void callBeforeAdd(ChannelHandlerContext ctx) {
323         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
324             return;
325         }
326 
327         LifeCycleAwareChannelHandler h =
328             (LifeCycleAwareChannelHandler) ctx.getHandler();
329 
330         try {
331             h.beforeAdd(ctx);
332         } catch (Throwable t) {
333             throw new ChannelHandlerLifeCycleException(
334                     h.getClass().getName() +
335                     ".beforeAdd() has thrown an exception; not adding.", t);
336         }
337     }
338 
339     private void callAfterAdd(ChannelHandlerContext ctx) {
340         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
341             return;
342         }
343 
344         LifeCycleAwareChannelHandler h =
345             (LifeCycleAwareChannelHandler) ctx.getHandler();
346 
347         try {
348             h.afterAdd(ctx);
349         } catch (Throwable t) {
350             boolean removed = false;
351             try {
352                 remove((DefaultChannelHandlerContext) ctx);
353                 removed = true;
354             } catch (Throwable t2) {
355                 logger.warn("Failed to remove a handler: " + ctx.getName(), t2);
356             }
357 
358             if (removed) {
359                 throw new ChannelHandlerLifeCycleException(
360                         h.getClass().getName() +
361                         ".afterAdd() has thrown an exception; removed.", t);
362             } else {
363                 throw new ChannelHandlerLifeCycleException(
364                         h.getClass().getName() +
365                         ".afterAdd() has thrown an exception; also failed to remove.", t);
366             }
367         }
368     }
369 
370     private void callBeforeRemove(ChannelHandlerContext ctx) {
371         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
372             return;
373         }
374 
375         LifeCycleAwareChannelHandler h =
376             (LifeCycleAwareChannelHandler) ctx.getHandler();
377 
378         try {
379             h.beforeRemove(ctx);
380         } catch (Throwable t) {
381             throw new ChannelHandlerLifeCycleException(
382                     h.getClass().getName() +
383                     ".beforeRemove() has thrown an exception; not removing.", t);
384         }
385     }
386 
387     private void callAfterRemove(ChannelHandlerContext ctx) {
388         if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
389             return;
390         }
391 
392         LifeCycleAwareChannelHandler h =
393             (LifeCycleAwareChannelHandler) ctx.getHandler();
394 
395         try {
396             h.afterRemove(ctx);
397         } catch (Throwable t) {
398             throw new ChannelHandlerLifeCycleException(
399                     h.getClass().getName() +
400                     ".afterRemove() has thrown an exception.", t);
401         }
402     }
403 
404     public synchronized ChannelHandler getFirst() {
405         DefaultChannelHandlerContext head = this.head;
406         if (head == null) {
407             return null;
408         }
409         return head.getHandler();
410     }
411 
412     public synchronized ChannelHandler getLast() {
413         DefaultChannelHandlerContext tail = this.tail;
414         if (tail == null) {
415             return null;
416         }
417         return tail.getHandler();
418     }
419 
420     public synchronized ChannelHandler get(String name) {
421         DefaultChannelHandlerContext ctx = name2ctx.get(name);
422         if (ctx == null) {
423             return null;
424         } else {
425             return ctx.getHandler();
426         }
427     }
428 
429     @SuppressWarnings("unchecked")
430     public synchronized <T extends ChannelHandler> T get(Class<T> handlerType) {
431         ChannelHandlerContext ctx = getContext(handlerType);
432         if (ctx == null) {
433             return null;
434         } else {
435             return (T) ctx.getHandler();
436         }
437     }
438 
439     public synchronized ChannelHandlerContext getContext(String name) {
440         if (name == null) {
441             throw new NullPointerException("name");
442         }
443         return name2ctx.get(name);
444     }
445 
446     public synchronized ChannelHandlerContext getContext(ChannelHandler handler) {
447         if (handler == null) {
448             throw new NullPointerException("handler");
449         }
450         if (name2ctx.isEmpty()) {
451             return null;
452         }
453         DefaultChannelHandlerContext ctx = head;
454         for (;;) {
455             if (ctx.getHandler() == handler) {
456                 return ctx;
457             }
458 
459             ctx = ctx.next;
460             if (ctx == null) {
461                 break;
462             }
463         }
464         return null;
465     }
466 
467     public synchronized ChannelHandlerContext getContext(
468             Class<? extends ChannelHandler> handlerType) {
469         if (handlerType == null) {
470             throw new NullPointerException("handlerType");
471         }
472 
473         if (name2ctx.isEmpty()) {
474             return null;
475         }
476         DefaultChannelHandlerContext ctx = head;
477         for (;;) {
478             if (handlerType.isAssignableFrom(ctx.getHandler().getClass())) {
479                 return ctx;
480             }
481 
482             ctx = ctx.next;
483             if (ctx == null) {
484                 break;
485             }
486         }
487         return null;
488     }
489 
490     public Map<String, ChannelHandler> toMap() {
491         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
492         if (name2ctx.isEmpty()) {
493             return map;
494         }
495 
496         DefaultChannelHandlerContext ctx = head;
497         for (;;) {
498             map.put(ctx.getName(), ctx.getHandler());
499             ctx = ctx.next;
500             if (ctx == null) {
501                 break;
502             }
503         }
504         return map;
505     }
506 
507     /**
508      * Returns the {@link String} representation of this pipeline.
509      */
510     @Override
511     public String toString() {
512         StringBuilder buf = new StringBuilder();
513         buf.append(getClass().getSimpleName());
514         buf.append('{');
515         DefaultChannelHandlerContext ctx = head;
516         for (;;) {
517             buf.append('(');
518             buf.append(ctx.getName());
519             buf.append(" = ");
520             buf.append(ctx.getHandler().getClass().getName());
521             buf.append(')');
522             ctx = ctx.next;
523             if (ctx == null) {
524                 break;
525             }
526             buf.append(", ");
527         }
528         buf.append('}');
529         return buf.toString();
530     }
531 
532     public void sendUpstream(ChannelEvent e) {
533         DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
534         if (head == null) {
535             logger.warn(
536                     "The pipeline contains no upstream handlers; discarding: " + e);
537             return;
538         }
539 
540         sendUpstream(head, e);
541     }
542 
543     void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
544         try {
545             ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
546         } catch (Throwable t) {
547             notifyHandlerException(e, t);
548         }
549     }
550 
551     public void sendDownstream(ChannelEvent e) {
552         DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
553         if (tail == null) {
554             try {
555                 getSink().eventSunk(this, e);
556                 return;
557             } catch (Throwable t) {
558                 notifyHandlerException(e, t);
559                 return;
560             }
561         }
562 
563         sendDownstream(tail, e);
564     }
565 
566     void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
567         try {
568             ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
569         } catch (Throwable t) {
570             notifyHandlerException(e, t);
571         }
572     }
573 
574     DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
575         if (ctx == null) {
576             return null;
577         }
578 
579         DefaultChannelHandlerContext realCtx = ctx;
580         while (!realCtx.canHandleUpstream()) {
581             realCtx = realCtx.next;
582             if (realCtx == null) {
583                 return null;
584             }
585         }
586 
587         return realCtx;
588     }
589 
590     DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
591         if (ctx == null) {
592             return null;
593         }
594 
595         DefaultChannelHandlerContext realCtx = ctx;
596         while (!realCtx.canHandleDownstream()) {
597             realCtx = realCtx.prev;
598             if (realCtx == null) {
599                 return null;
600             }
601         }
602 
603         return realCtx;
604     }
605 
606     protected void notifyHandlerException(ChannelEvent e, Throwable t) {
607         if (e instanceof ExceptionEvent) {
608             logger.warn(
609                     "An exception was thrown by a user handler " +
610                     "while handling an exception event (" + e + ")", t);
611             return;
612         }
613 
614         ChannelPipelineException pe;
615         if (t instanceof ChannelPipelineException) {
616             pe = (ChannelPipelineException) t;
617         } else {
618             pe = new ChannelPipelineException(t);
619         }
620 
621         try {
622             sink.exceptionCaught(this, e, pe);
623         } catch (Exception e1) {
624             logger.warn("An exception was thrown by an exception handler.", e1);
625         }
626     }
627 
628     private void init(String name, ChannelHandler handler) {
629         DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler);
630         callBeforeAdd(ctx);
631         head = tail = ctx;
632         name2ctx.clear();
633         name2ctx.put(name, ctx);
634         callAfterAdd(ctx);
635     }
636 
637     private void checkDuplicateName(String name) {
638         if (name2ctx.containsKey(name)) {
639             throw new IllegalArgumentException("Duplicate handler name.");
640         }
641     }
642 
643     private DefaultChannelHandlerContext getContextOrDie(String name) {
644         DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(name);
645         if (ctx == null) {
646             throw new NoSuchElementException(name);
647         } else {
648             return ctx;
649         }
650     }
651 
652     private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) {
653         DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handler);
654         if (ctx == null) {
655             throw new NoSuchElementException(handler.getClass().getName());
656         } else {
657             return ctx;
658         }
659     }
660 
661     private DefaultChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
662         DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handlerType);
663         if (ctx == null) {
664             throw new NoSuchElementException(handlerType.getName());
665         } else {
666             return ctx;
667         }
668     }
669 
670     private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
671         volatile DefaultChannelHandlerContext next;
672         volatile DefaultChannelHandlerContext prev;
673         private final String name;
674         private final ChannelHandler handler;
675         private final boolean canHandleUpstream;
676         private final boolean canHandleDownstream;
677         private volatile Object attachment;
678 
679         DefaultChannelHandlerContext(
680                 DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
681                 String name, ChannelHandler handler) {
682 
683             if (name == null) {
684                 throw new NullPointerException("name");
685             }
686             if (handler == null) {
687                 throw new NullPointerException("handler");
688             }
689             canHandleUpstream = handler instanceof ChannelUpstreamHandler;
690             canHandleDownstream = handler instanceof ChannelDownstreamHandler;
691 
692 
693             if (!canHandleUpstream && !canHandleDownstream) {
694                 throw new IllegalArgumentException(
695                         "handler must be either " +
696                         ChannelUpstreamHandler.class.getName() + " or " +
697                         ChannelDownstreamHandler.class.getName() + '.');
698             }
699 
700             this.prev = prev;
701             this.next = next;
702             this.name = name;
703             this.handler = handler;
704         }
705 
706         public Channel getChannel() {
707             return getPipeline().getChannel();
708         }
709 
710         public ChannelPipeline getPipeline() {
711             return DefaultChannelPipeline.this;
712         }
713 
714         public boolean canHandleDownstream() {
715             return canHandleDownstream;
716         }
717 
718         public boolean canHandleUpstream() {
719             return canHandleUpstream;
720         }
721 
722         public ChannelHandler getHandler() {
723             return handler;
724         }
725 
726         public String getName() {
727             return name;
728         }
729 
730         public Object getAttachment() {
731             return attachment;
732         }
733 
734         public void setAttachment(Object attachment) {
735             this.attachment = attachment;
736         }
737 
738         public void sendDownstream(ChannelEvent e) {
739             DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
740             if (prev == null) {
741                 try {
742                     getSink().eventSunk(DefaultChannelPipeline.this, e);
743                 } catch (Throwable t) {
744                     notifyHandlerException(e, t);
745                 }
746             } else {
747                 DefaultChannelPipeline.this.sendDownstream(prev, e);
748             }
749         }
750 
751         public void sendUpstream(ChannelEvent e) {
752             DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
753             if (next != null) {
754                 DefaultChannelPipeline.this.sendUpstream(next, e);
755             }
756         }
757     }
758 
759     private static final class DiscardingChannelSink implements ChannelSink {
760         DiscardingChannelSink() {
761             super();
762         }
763 
764         public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
765             logger.warn("Not attached yet; discarding: " + e);
766         }
767 
768         public void exceptionCaught(ChannelPipeline pipeline,
769                 ChannelEvent e, ChannelPipelineException cause) throws Exception {
770             throw cause;
771         }
772     }
773 }