View Javadoc

1   // ========================================================================
2   // Copyright 2006 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at 
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.cometd;
16  
17  import java.util.Arrays;
18  import java.util.Collection;
19  import java.util.List;
20  import java.util.concurrent.ConcurrentHashMap;
21  import java.util.concurrent.ConcurrentMap;
22  
23  import org.cometd.Channel;
24  import org.cometd.ChannelListener;
25  import org.cometd.Client;
26  import org.cometd.DataFilter;
27  import org.cometd.Message;
28  import org.cometd.SubscriptionListener;
29  import org.mortbay.log.Log;
30  import org.mortbay.util.LazyList;
31  
32  
33  /* ------------------------------------------------------------ */
34  /** A Bayuex Channel
35   * 
36   * @author gregw
37   *
38   */
39  public class ChannelImpl implements Channel
40  {
41      protected AbstractBayeux _bayeux;
42      private ClientImpl[] _subscribers=new ClientImpl[0]; // copy on write
43      private DataFilter[] _dataFilters=new DataFilter[0]; // copy on write
44      private SubscriptionListener[] _subscriptionListeners=new SubscriptionListener[0]; // copy on write
45      private ChannelId _id;
46      private ConcurrentMap<String,ChannelImpl> _children = new ConcurrentHashMap<String, ChannelImpl>();
47      private ChannelImpl _wild;
48      private ChannelImpl _wildWild;
49      private boolean _persistent;
50      private int _split;
51  
52      /* ------------------------------------------------------------ */
53      ChannelImpl(String id,AbstractBayeux bayeux)
54      {
55          _id=new ChannelId(id);
56          _bayeux=bayeux;
57      }
58  
59      /* ------------------------------------------------------------ */
60      public void addChild(ChannelImpl channel)
61      {
62          ChannelId child=channel.getChannelId();
63          if (!_id.isParentOf(child))
64          {
65              throw new IllegalArgumentException(_id+" not parent of "+child);
66          }
67          
68          String next = child.getSegment(_id.depth());
69  
70          if ((child.depth()-_id.depth())==1)
71          {
72              // add the channel to this channels
73              ChannelImpl old = _children.putIfAbsent(next,channel);
74  
75              if (old!=null)
76                  throw new IllegalArgumentException("Already Exists");
77  
78              if (ChannelId.WILD.equals(next))
79                  _wild=channel;
80              else if (ChannelId.WILDWILD.equals(next))
81                  _wildWild=channel;
82                  
83          }
84          else
85          {
86              ChannelImpl branch=_children.get(next);
87                  branch=(ChannelImpl)_bayeux.getChannel((_id.depth()==0?"/":(_id.toString()+"/"))+next,true);
88              
89              branch.addChild(channel);
90          }
91  
92          _bayeux.addChannel(channel);
93      }
94      
95      /* ------------------------------------------------------------ */
96      /**
97       * @param filter
98       */
99      public void addDataFilter(DataFilter filter)
100     {
101         synchronized(this)
102         {
103             _dataFilters=(DataFilter[])LazyList.addToArray(_dataFilters,filter,null);
104         }
105     }
106 
107     /* ------------------------------------------------------------ */
108     /* ------------------------------------------------------------ */
109     /**
110      * @return
111      */
112     public ChannelId getChannelId()
113     {
114         return _id;
115     }
116     
117     /* ------------------------------------------------------------ */
118     public ChannelImpl getChild(ChannelId id)
119     {
120         String next=id.getSegment(_id.depth());
121         if (next==null)
122             return null;
123         
124         ChannelImpl channel = _children.get(next);
125         
126         if (channel==null || channel.getChannelId().depth()==id.depth())
127         {
128             return channel;
129         }
130         return channel.getChild(id);
131     }
132 
133     /* ------------------------------------------------------------ */
134      public void getChannels(List<Channel> list)
135      {
136          list.add(this);
137          for (ChannelImpl channel: _children.values())
138              channel.getChannels(list);
139      }
140 
141      /* ------------------------------------------------------------ */
142      public int getChannelCount()
143      {
144          int count = 1;
145          
146          for(ChannelImpl channel: _children.values())
147              count += channel.getChannelCount();
148          
149          return count;
150      }
151      
152     /* ------------------------------------------------------------ */
153     /**
154      * @return
155      */
156     public String getId()
157     {
158         return _id.toString();
159     }
160 
161     
162     /* ------------------------------------------------------------ */
163     public boolean isPersistent()
164     {
165         return _persistent;
166     }
167 
168     /* ------------------------------------------------------------ */
169     public void publish(Client fromClient, Object data, String msgId)
170     {
171         _bayeux.doPublish(getChannelId(),fromClient,data,msgId);   
172     }
173     
174     /* ------------------------------------------------------------ */
175     public boolean remove()
176     {
177         return _bayeux.removeChannel(this);
178     }
179     
180     /* ------------------------------------------------------------ */
181     public boolean doRemove(ChannelImpl channel)
182     {
183         ChannelId channelId = channel.getChannelId();
184         String key = channelId.getSegment(channelId.depth()-1);
185         if (_children.containsKey(key))
186         {
187             ChannelImpl child = _children.get(key);
188             
189             synchronized (this)
190             {
191                 synchronized (child)
192                 {
193                     if (!child.isPersistent() && child.getSubscriberCount()==0 && child.getChannelCount()==1)
194                     {
195                         _children.remove(key);
196                         return true;
197                     }
198                     else
199                         return false;
200                 }
201                 
202             }
203         }
204         else
205         {
206             for (ChannelImpl child : _children.values())
207             {
208                 if (child.doRemove(channel))
209                     return true;
210             }
211         }
212         return false;
213     }
214     
215     /* ------------------------------------------------------------ */
216     /* ------------------------------------------------------------ */
217     /**
218      * @param filter
219      */
220     public DataFilter removeDataFilter(DataFilter filter)
221     {
222         synchronized(this)
223         {
224             _dataFilters=(DataFilter[])LazyList.removeFromArray(_dataFilters,filter);
225             return filter;
226         }
227     }
228 
229     /* ------------------------------------------------------------ */
230     public void setPersistent(boolean persistent)
231     {
232         _persistent=persistent;
233     }
234 
235     /* ------------------------------------------------------------ */
236     /**
237      * @param client
238      */
239     public void subscribe(Client client)
240     {
241         if (!(client instanceof ClientImpl))
242             throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
243         
244         synchronized (this)
245         {
246             for (ClientImpl c : _subscribers)
247             {
248                 if (client.equals(c))
249                     return;
250             }
251             _subscribers=(ClientImpl[])LazyList.addToArray(_subscribers,client,null);
252             
253             for (SubscriptionListener l : _subscriptionListeners)
254                 l.subscribed(client, this);
255         }
256         
257         ((ClientImpl)client).addSubscription(this);
258     }
259 
260     /* ------------------------------------------------------------ */
261     @Override
262     public String toString()
263     {
264         return _id.toString();
265     }
266 
267     /* ------------------------------------------------------------ */
268     /**
269      * @param client
270      */
271     public void unsubscribe(Client client)
272     {
273         if (!(client instanceof ClientImpl))
274             throw new IllegalArgumentException("Client instance not obtained from Bayeux.newClient()");
275         ((ClientImpl)client).removeSubscription(this);
276         synchronized(this)
277         {
278             _subscribers=(ClientImpl[])LazyList.removeFromArray(_subscribers,client);
279             
280             for (SubscriptionListener l : _subscriptionListeners)
281                 l.unsubscribed(client,this);
282             
283             if (!_persistent && _subscribers.length==0 && _children.size()==0)
284                 remove();
285         }
286     }
287 
288     /* ------------------------------------------------------------ */
289     protected void doDelivery(ChannelId to, Client from, Message msg)
290     {
291         int tail = to.depth()-_id.depth();
292         
293         Object data = msg.getData();
294         Object old = data;
295         
296         DataFilter[] filters=null;
297         
298         try
299         {
300             switch(tail)
301             {
302                 case 0:      
303                 {
304                     synchronized(this)
305                     {
306                         filters=_dataFilters;
307                     }
308                     for (DataFilter filter: filters)
309                         data=filter.filter(from,this,data);
310                 }
311                 break;
312 
313                 case 1:
314                     if (_wild!=null)  
315                     {
316                         synchronized(_wild)
317                         {
318                             filters=_wild._dataFilters;
319                         }
320                         for (DataFilter filter: filters)
321                             data=filter.filter(from,this,data);
322                     }
323 
324                 default:
325                     if (_wildWild!=null)  
326                     {
327                         synchronized(_wildWild)
328                         {
329                             filters=_wildWild._dataFilters;
330                         }
331                         for (DataFilter filter: filters)
332                         {
333                             data=filter.filter(from,this,data);
334                         }
335                     }
336             }
337         }
338         catch (IllegalStateException e)
339         {
340             Log.debug(e);
341             return;
342         }
343         if (data!=old)
344             msg.put(AbstractBayeux.DATA_FIELD,data);
345         
346         ClientImpl[] subscribers;
347 
348         switch(tail)
349         {
350             case 0:
351                 synchronized (this)
352                 {
353                     subscribers=_subscribers;
354                     _split++;
355                 }
356                 if (subscribers.length>0)
357                 {
358                     // fair delivery 
359                     int split=_split%_subscribers.length;
360                     for (int i=split;i<subscribers.length;i++)
361                         subscribers[i].doDelivery(from,msg);
362                     for (int i=0;i<split;i++)
363                         subscribers[i].doDelivery(from,msg);
364                 }                
365                 break;
366 
367             case 1:
368                 if (_wild!=null)
369                 {
370                     synchronized (_wild)
371                     {
372                         subscribers=_wild._subscribers;
373                     }
374                     for (ClientImpl client: subscribers)
375                     {
376                         client.doDelivery(from,msg);
377                     }
378                 }
379 
380             default:
381             {
382                 if (_wildWild!=null)
383                 {
384                     synchronized (_wildWild)
385                     {
386                         subscribers=_wildWild._subscribers;
387                     }
388                     for (ClientImpl client: subscribers)
389                     {
390                         client.doDelivery(from,msg);
391                     }
392                 }
393                 String next = to.getSegment(_id.depth());
394                 ChannelImpl channel = _children.get(next);
395                 if (channel!=null)
396                     channel.doDelivery(to,from,msg);
397             }
398         }
399     }
400 
401     /* ------------------------------------------------------------ */
402     public Collection<Client> getSubscribers()
403     {
404         synchronized(this)
405         {
406             return Arrays.asList((Client[])_subscribers);
407         }
408     }
409 
410     /* ------------------------------------------------------------ */
411     public int getSubscriberCount()
412     {
413         synchronized(this)
414         {
415             return _subscribers.length;
416         }
417     }
418 
419 
420     /* ------------------------------------------------------------ */
421     /* (non-Javadoc)
422      * @see dojox.cometd.Channel#getFilters()
423      */
424     public Collection<DataFilter> getDataFilters()
425     {
426         synchronized(this)
427         {
428             return Arrays.asList(_dataFilters);
429         }
430     }
431 
432     /* ------------------------------------------------------------ */
433     public void addListener(ChannelListener listener)
434     {
435         synchronized(this)
436         {
437             if (listener instanceof SubscriptionListener)
438                 _subscriptionListeners=(SubscriptionListener[])LazyList.addToArray(_subscriptionListeners,listener,null);
439         }
440     }
441     
442 }