View Javadoc

1   package org.mortbay.cometd;
2   
3   import java.io.IOException;
4   import java.io.Reader;
5   import java.util.HashSet;
6   import java.util.List;
7   import java.util.Map;
8   
9   import org.cometd.Bayeux;
10  import org.cometd.Message;
11  import org.mortbay.util.ArrayQueue;
12  import org.mortbay.util.StringMap;
13  import org.mortbay.util.ajax.JSON;
14  
15  
16  public class MessagePool 
17  {
18      final private ArrayQueue<MessageImpl> _messagePool;
19      final private ArrayQueue<JSON.ReaderSource> _readerPool;
20  
21      /* ------------------------------------------------------------ */
22      public MessagePool()
23      {
24          this(50);
25      }
26  
27      /* ------------------------------------------------------------ */
28      public MessagePool(int capacity)
29      {
30          _messagePool=new ArrayQueue<MessageImpl>(capacity);
31          _readerPool=new ArrayQueue<JSON.ReaderSource>(capacity);
32      }
33      
34      /* ------------------------------------------------------------ */
35      /**
36       * @return the {@link JSON} instance used to convert data and ext fields
37       */
38      public JSON getJSON()
39      {
40          return _json;
41      }
42  
43      /* ------------------------------------------------------------ */
44      /**
45       * @param json the {@link JSON} instance used to convert data and ext fields
46       */
47      public void setJSON(JSON json)
48      {
49          _json=json;
50      }
51  
52      /* ------------------------------------------------------------ */
53      /**
54       * @return the {@link JSON} instance used to convert bayeux messages
55       */
56      public JSON getMsgJSON()
57      {
58          return _msgJSON;
59      }
60  
61      /* ------------------------------------------------------------ */
62      /**
63       * @param msgJSON the {@link JSON} instance used to convert bayeux messages
64       */
65      public void setMsgJSON(JSON msgJSON)
66      {
67          _msgJSON=msgJSON;
68      }
69  
70      /* ------------------------------------------------------------ */
71      /**
72       * @return the {@link JSON} instance used to convert batches of bayeux messages
73       */
74      public JSON getBatchJSON()
75      {
76          return _batchJSON;
77      }
78  
79      /* ------------------------------------------------------------ */
80      /**
81       * @param batchJSON the {@link JSON} instance used to convert batches of bayeux messages
82       */
83      public void setBatchJSON(JSON batchJSON)
84      {
85          _batchJSON=batchJSON;
86      }
87  
88  
89      /* ------------------------------------------------------------ */
90      public MessageImpl newMessage()
91      {
92          MessageImpl message=_messagePool.poll();
93          if (message==null)
94              message=new MessageImpl(this);
95          message.incRef();
96          return message;
97      }
98  
99      /* ------------------------------------------------------------ */
100     public MessageImpl newMessage(Message associated)
101     {
102         MessageImpl message=_messagePool.poll();
103         if (message==null)
104             message=new MessageImpl(this);
105         message.incRef();
106         if (associated!=null)
107             message.setAssociated(associated);
108         return message;
109     }
110 
111     /* ------------------------------------------------------------ */
112     public void recycleMessage(MessageImpl message)
113     {
114         message.clear();
115         _messagePool.offer(message);
116     }
117 
118     /* ------------------------------------------------------------ */
119     public Message[] parse(Reader reader) throws IOException
120     {
121         JSON.ReaderSource source =_readerPool.poll();
122         if (source==null)
123             source=new JSON.ReaderSource(reader);
124         else
125             source.setReader(reader);
126         
127         Object batch=_batchJSON.parse(source);
128         _readerPool.offer(source);
129 
130         if (batch==null)
131             return new Message[0]; 
132         if (batch.getClass().isArray())
133             return (Message[])batch;
134         return new Message[]{(Message)batch};
135     }
136 
137     /* ------------------------------------------------------------ */
138     public Message[] parse(String s) throws IOException
139     {
140         Object batch=_batchJSON.parse(new JSON.StringSource(s));
141         if (batch==null)
142             return new Message[0]; 
143         if (batch.getClass().isArray())
144             return (Message[])batch;
145         return new Message[]{(Message)batch};
146     }
147 
148     /* ------------------------------------------------------------ */
149     public void parseTo(String fodder, List<Message> messages)
150     {
151         Object batch=_batchJSON.parse(new JSON.StringSource(fodder));
152         if (batch==null)
153             return;
154         if (batch.getClass().isArray())
155         {
156             Message[] msgs=(Message[])batch;
157             for (int m=0;m<msgs.length;m++)
158                 messages.add(msgs[m]);
159         }
160         else
161             messages.add((Message)batch);
162     }
163 
164     /* ------------------------------------------------------------ */
165     /* ------------------------------------------------------------ */
166     private StringMap _fieldStrings = new StringMap();
167     private StringMap _valueStrings = new StringMap();
168     {
169         _fieldStrings.put(Bayeux.ADVICE_FIELD,Bayeux.ADVICE_FIELD);
170         _fieldStrings.put(Bayeux.CHANNEL_FIELD,Bayeux.CHANNEL_FIELD);
171         _fieldStrings.put(Bayeux.CLIENT_FIELD,Bayeux.CLIENT_FIELD);
172         _fieldStrings.put("connectionType","connectionType");
173         _fieldStrings.put(Bayeux.DATA_FIELD,Bayeux.DATA_FIELD);
174         _fieldStrings.put(Bayeux.ERROR_FIELD,Bayeux.ERROR_FIELD);
175         _fieldStrings.put(Bayeux.EXT_FIELD,Bayeux.EXT_FIELD);
176         _fieldStrings.put(Bayeux.ID_FIELD,Bayeux.ID_FIELD);
177         _fieldStrings.put(Bayeux.SUBSCRIPTION_FIELD,Bayeux.SUBSCRIPTION_FIELD);
178         _fieldStrings.put(Bayeux.SUCCESSFUL_FIELD,Bayeux.SUCCESSFUL_FIELD);
179         _fieldStrings.put(Bayeux.TIMESTAMP_FIELD,Bayeux.TIMESTAMP_FIELD);
180         _fieldStrings.put(Bayeux.TRANSPORT_FIELD,Bayeux.TRANSPORT_FIELD);
181         
182         _valueStrings.put(Bayeux.META_CLIENT,Bayeux.META_CLIENT);
183         _valueStrings.put(Bayeux.META_CONNECT,Bayeux.META_CONNECT);
184         _valueStrings.put(Bayeux.META_DISCONNECT,Bayeux.META_DISCONNECT);
185         _valueStrings.put(Bayeux.META_HANDSHAKE,Bayeux.META_HANDSHAKE);
186         _valueStrings.put(Bayeux.META_SUBSCRIBE,Bayeux.META_SUBSCRIBE);
187         _valueStrings.put(Bayeux.META_UNSUBSCRIBE,Bayeux.META_UNSUBSCRIBE);
188     }
189     
190 
191     /* ------------------------------------------------------------ */
192     /* ------------------------------------------------------------ */
193     private JSON _json = new JSON()
194     {
195         @Override
196         protected String toString(char[] buffer, int offset, int length)
197         {
198             Map.Entry entry = _valueStrings.getEntry(buffer,offset,length);
199             if (entry!=null)
200                 return (String)entry.getValue();
201             String s= new String(buffer,offset,length);
202             return s;
203         }
204     };
205 
206     /* ------------------------------------------------------------ */
207     /* ------------------------------------------------------------ */
208     private JSON _msgJSON = new JSON()
209     {
210         @Override
211         protected Map newMap()
212         {
213             return newMessage();
214         }
215 
216         @Override
217         protected String toString(char[] buffer, int offset, int length)
218         {
219             Map.Entry entry = _fieldStrings.getEntry(buffer,offset,length);
220             if (entry!=null)
221                 return (String)entry.getValue();
222             String s= new String(buffer,offset,length);
223             return s;
224         }
225 
226         @Override
227         protected JSON contextFor(String field)
228         {
229             return _json;
230         }
231     };
232 
233     /* ------------------------------------------------------------ */
234     /* ------------------------------------------------------------ */
235     private JSON _batchJSON = new JSON()
236     {
237         @Override
238         protected Map newMap()
239         {
240             return newMessage();
241         }
242 
243         @Override
244         protected Object[] newArray(int size)
245         {
246             return new Message[size]; // todo recycle
247         }
248 
249         @Override
250         protected JSON contextFor(String field)
251         {
252             return _json;
253         }
254 
255         @Override
256         protected JSON contextForArray()
257         {
258             return _msgJSON;
259         }
260     };
261 
262 }