1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.mortbay.jetty.client;
15
16
17 import java.io.IOException;
18 import java.lang.reflect.Constructor;
19 import java.util.ArrayList;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.concurrent.ArrayBlockingQueue;
23
24 import javax.servlet.http.Cookie;
25
26 import org.mortbay.io.Buffer;
27 import org.mortbay.io.ByteArrayBuffer;
28 import org.mortbay.jetty.HttpHeaders;
29 import org.mortbay.jetty.client.security.Authorization;
30 import org.mortbay.jetty.client.security.SecurityListener;
31 import org.mortbay.jetty.servlet.PathMap;
32 import org.mortbay.log.Log;
33
34
35
36
37
38 public class HttpDestination
39 {
40 private ByteArrayBuffer _hostHeader;
41 private final Address _address;
42 private final LinkedList<HttpConnection> _connections = new LinkedList<HttpConnection>();
43 private final ArrayList<HttpConnection> _idle = new ArrayList<HttpConnection>();
44 private final HttpClient _client;
45 private final boolean _ssl;
46 private int _maxConnections;
47 private int _pendingConnections=0;
48 private ArrayBlockingQueue<Object> _newQueue = new ArrayBlockingQueue<Object>(10,true);
49 private int _newConnection=0;
50 private Address _proxy;
51 private Authorization _proxyAuthentication;
52 private PathMap _authorizations;
53 private List<Cookie> _cookies;
54
55 public void dump() throws IOException
56 {
57 synchronized (this)
58 {
59 System.err.println(this);
60 System.err.println("connections="+_connections.size());
61 System.err.println("idle="+_idle.size());
62 System.err.println("pending="+_pendingConnections);
63 for (HttpConnection c : _connections)
64 {
65 if (!c.isIdle())
66 c.dump();
67 }
68 }
69 }
70
71
72 private LinkedList<HttpExchange> _queue=new LinkedList<HttpExchange>();
73
74
75 HttpDestination(HttpClient pool, Address address, boolean ssl, int maxConnections)
76 {
77 _client=pool;
78 _address=address;
79 _ssl=ssl;
80 _maxConnections=maxConnections;
81 String addressString = address.getHost();
82 if (address.getPort() != (_ssl ? 443 : 80)) addressString += ":" + address.getPort();
83 _hostHeader = new ByteArrayBuffer(addressString);
84 }
85
86
87 public Address getAddress()
88 {
89 return _address;
90 }
91
92
93 public Buffer getHostHeader()
94 {
95 return _hostHeader;
96 }
97
98
99 public HttpClient getHttpClient()
100 {
101 return _client;
102 }
103
104
105 public boolean isSecure()
106 {
107 return _ssl;
108 }
109
110
111 public void addAuthorization(String pathSpec,Authorization authorization)
112 {
113 synchronized (this)
114 {
115 if (_authorizations==null)
116 _authorizations=new PathMap();
117 _authorizations.put(pathSpec,authorization);
118 }
119
120
121 }
122
123
124 public void addCookie(Cookie cookie)
125 {
126 synchronized (this)
127 {
128 if (_cookies==null)
129 _cookies=new ArrayList<Cookie>();
130 _cookies.add(cookie);
131 }
132
133
134 }
135
136
137 public HttpConnection getConnection() throws IOException
138 {
139 HttpConnection connection = getIdleConnection();
140
141 while (connection==null)
142 {
143 synchronized(this)
144 {
145 _newConnection++;
146 startNewConnection();
147 }
148
149 try
150 {
151 Object o =_newQueue.take();
152 if (o instanceof HttpConnection)
153 connection=(HttpConnection)o;
154 else
155 throw (IOException)o;
156 }
157 catch (InterruptedException e)
158 {
159 Log.ignore(e);
160 }
161 }
162 return connection;
163 }
164
165
166 public HttpConnection getIdleConnection() throws IOException
167 {
168 synchronized (this)
169 {
170 long now = System.currentTimeMillis();
171 long idleTimeout=_client.getIdleTimeout();
172
173
174 while (_idle.size() > 0)
175 {
176 HttpConnection connection = _idle.remove(_idle.size()-1);
177 long last = connection.getLast();
178 if (connection.getEndPoint().isOpen() && (last==0 || ((now-last)<idleTimeout)) )
179 return connection;
180 else
181 {
182 _connections.remove(connection);
183 connection.getEndPoint().close();
184 }
185 }
186
187 return null;
188 }
189 }
190
191
192 protected void startNewConnection()
193 {
194 try
195 {
196 synchronized (this)
197 {
198 _pendingConnections++;
199 }
200 _client._connector.startConnection(this);
201 }
202 catch(Exception e)
203 {
204 onConnectionFailed(e);
205 }
206 }
207
208
209 public void onConnectionFailed(Throwable throwable)
210 {
211 Throwable connect_failure=null;
212
213 synchronized (this)
214 {
215 _pendingConnections--;
216 if (_newConnection>0)
217 {
218 connect_failure=throwable;
219 _newConnection--;
220 }
221 else if (_queue.size()>0)
222 {
223 HttpExchange ex=_queue.removeFirst();
224 ex.getEventListener().onConnectionFailed(throwable);
225 }
226 }
227
228 if(connect_failure!=null)
229 {
230 try
231 {
232 _newQueue.put(connect_failure);
233 }
234 catch (InterruptedException e)
235 {
236 Log.ignore(e);
237 }
238 }
239 }
240
241
242 public void onException(Throwable throwable)
243 {
244 synchronized (this)
245 {
246 _pendingConnections--;
247 if (_queue.size()>0)
248 {
249 HttpExchange ex=_queue.removeFirst();
250 ex.getEventListener().onException(throwable);
251 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
252 }
253 }
254 }
255
256
257 public void onNewConnection(HttpConnection connection) throws IOException
258 {
259 HttpConnection q_connection=null;
260
261 synchronized (this)
262 {
263 _pendingConnections--;
264 _connections.add(connection);
265
266 if (_newConnection>0)
267 {
268 q_connection=connection;
269 _newConnection--;
270 }
271 else if (_queue.size()==0)
272 {
273 _idle.add(connection);
274 }
275 else
276 {
277 HttpExchange ex=_queue.removeFirst();
278 connection.send(ex);
279 }
280 }
281
282 if (q_connection!=null)
283 {
284 try
285 {
286 _newQueue.put(q_connection);
287 }
288 catch (InterruptedException e)
289 {
290 Log.ignore(e);
291 }
292 }
293 }
294
295
296 public void returnConnection(HttpConnection connection, boolean close) throws IOException
297 {
298 if (close)
299 {
300 try
301 {
302 connection.close();
303 }
304 catch(IOException e)
305 {
306 Log.ignore(e);
307 }
308 }
309
310 if (!_client.isStarted())
311 return;
312
313 if (!close && connection.getEndPoint().isOpen())
314 {
315 synchronized (this)
316 {
317 if (_queue.size()==0)
318 {
319 connection.setLast(System.currentTimeMillis());
320 _idle.add(connection);
321 }
322 else
323 {
324 HttpExchange ex=_queue.removeFirst();
325 connection.send(ex);
326 }
327 this.notifyAll();
328 }
329 }
330 else
331 {
332 synchronized (this)
333 {
334 _connections.remove(connection);
335 if (!_queue.isEmpty())
336 startNewConnection();
337 }
338 }
339 }
340
341
342 public void send(HttpExchange ex) throws IOException
343 {
344 LinkedList<String> listeners = _client.getRegisteredListeners();
345
346 if (listeners != null)
347 {
348
349 for (int i = listeners.size(); i > 0; --i)
350 {
351 String listenerClass = listeners.get(i - 1);
352
353 try
354 {
355 Class listener = Class.forName(listenerClass);
356 Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
357 HttpEventListener elistener = (HttpEventListener) constructor.newInstance(this, ex);
358 ex.setEventListener(elistener);
359 }
360 catch (Exception e)
361 {
362 e.printStackTrace();
363 throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass );
364 }
365 }
366 }
367
368
369 if ( _client.hasRealms() )
370 {
371 ex.setEventListener( new SecurityListener( this, ex ) );
372 }
373
374 doSend(ex);
375 }
376
377
378 public void resend(HttpExchange ex) throws IOException
379 {
380 ex.getEventListener().onRetry();
381 doSend(ex);
382 }
383
384
385 protected void doSend(HttpExchange ex) throws IOException
386 {
387
388
389 if (_cookies!=null)
390 {
391 StringBuilder buf=null;
392 for (Cookie cookie : _cookies)
393 {
394 if (buf==null)
395 buf=new StringBuilder();
396 else
397 buf.append("; ");
398 buf.append(cookie.getName());
399 buf.append("=");
400 buf.append(cookie.getValue());
401 }
402 if (buf!=null)
403 ex.addRequestHeader(HttpHeaders.COOKIE,buf.toString());
404 }
405
406
407 if (_authorizations!=null)
408 {
409 Authorization auth= (Authorization)_authorizations.match(ex.getURI());
410 if (auth !=null)
411 ((Authorization)auth).setCredentials(ex);
412 }
413
414 synchronized(this)
415 {
416
417
418 HttpConnection connection=null;
419 if (_queue.size()>0 || (connection=getIdleConnection())==null || !connection.send(ex))
420 {
421 _queue.add(ex);
422 if (_connections.size()+_pendingConnections <_maxConnections)
423 {
424 startNewConnection();
425 }
426 }
427 }
428 }
429
430
431 public synchronized String toString()
432 {
433 return "HttpDestination@" + hashCode() + "//" + _address.getHost() + ":" + _address.getPort() + "(" + _connections.size() + "," + _idle.size() + "," + _queue.size() + ")";
434 }
435
436
437 public synchronized String toDetailString()
438 {
439 StringBuilder b = new StringBuilder();
440 b.append(toString());
441 b.append('\n');
442 synchronized(this)
443 {
444 for (HttpConnection connection : _connections)
445 {
446 if (connection._exchange!=null)
447 {
448 b.append(connection.toDetailString());
449 if (_idle.contains(connection))
450 b.append(" IDLE");
451 b.append('\n');
452 }
453 }
454 }
455 b.append("--");
456 b.append('\n');
457
458 return b.toString();
459 }
460
461
462 public void setProxy(Address proxy)
463 {
464 _proxy=proxy;
465 }
466
467
468 public Address getProxy()
469 {
470 return _proxy;
471 }
472
473
474 public Authorization getProxyAuthentication()
475 {
476 return _proxyAuthentication;
477 }
478
479
480 public void setProxyAuthentication(Authorization authentication)
481 {
482 _proxyAuthentication = authentication;
483 }
484
485
486 public boolean isProxied()
487 {
488 return _proxy!=null;
489 }
490
491
492 public void close() throws IOException
493 {
494 synchronized (this)
495 {
496 for (HttpConnection connection : _connections)
497 {
498 connection.close();
499 }
500 }
501 }
502
503 }