View Javadoc

1   // ========================================================================
2   // Copyright 2006-2007 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // Licensed under the Apache License, Version 2.0 (the "License");
5   // you may not use this file except in compliance with the License.
6   // You may obtain a copy of the License at
7   // http://www.apache.org/licenses/LICENSE-2.0
8   // Unless required by applicable law or agreed to in writing, software
9   // distributed under the License is distributed on an "AS IS" BASIS,
10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11  // See the License for the specific language governing permissions and
12  // limitations under the License.
13  // ========================================================================
14  
15  package org.mortbay.jetty.client;
16  
17  import java.io.IOException;
18  import java.io.InputStream;
19  import java.io.InterruptedIOException;
20  
21  import javax.net.ssl.SSLEngineResult.HandshakeStatus;
22  
23  import org.mortbay.io.Buffer;
24  import org.mortbay.io.Buffers;
25  import org.mortbay.io.ByteArrayBuffer;
26  import org.mortbay.io.Connection;
27  import org.mortbay.io.EndPoint;
28  import org.mortbay.io.nio.SelectChannelEndPoint;
29  import org.mortbay.jetty.HttpGenerator;
30  import org.mortbay.jetty.HttpHeaderValues;
31  import org.mortbay.jetty.HttpHeaders;
32  import org.mortbay.jetty.HttpParser;
33  import org.mortbay.jetty.HttpSchemes;
34  import org.mortbay.jetty.HttpVersions;
35  import org.mortbay.jetty.client.security.Authorization;
36  import org.mortbay.jetty.security.SslHttpChannelEndPoint;
37  import org.mortbay.log.Log;
38  import org.mortbay.thread.Timeout;
39  
40  /**
41   * 
42   * @author Greg Wilkins
43   * @author Guillaume Nodet
44   */
45  public class HttpConnection implements Connection
46  {
47      HttpDestination _destination;
48      EndPoint _endp;
49      HttpGenerator _generator;
50      HttpParser _parser;
51      boolean _http11 = true;
52      Buffer _connectionHeader;
53      Buffer _requestContentChunk;
54      long _last;
55      boolean _requestComplete;
56      public String _message;
57      public Throwable _throwable;
58  
59      /* The current exchange waiting for a response */
60      HttpExchange _exchange;
61      HttpExchange _pipeline;
62  
63      public void dump() throws IOException
64      {
65          System.err.println("endp=" + _endp + " " + _endp.isBufferingInput() + " " + _endp.isBufferingOutput());
66          System.err.println("generator=" + _generator);
67          System.err.println("parser=" + _parser.getState() + " " + _parser.isMoreInBuffer());
68          System.err.println("exchange=" + _exchange);
69          if (_endp instanceof SslHttpChannelEndPoint)
70              ((SslHttpChannelEndPoint)_endp).dump();
71      }
72  
73      Timeout.Task _timeout = new Timeout.Task()
74      {
75          public void expire()
76          {
77              HttpExchange ex = null;
78              try
79              {
80                  synchronized (HttpConnection.this)
81                  {
82                      ex = _exchange;
83                      _exchange = null;
84                      if (ex != null)
85                          _destination.returnConnection(HttpConnection.this,true);
86                  }
87              }
88              catch (Exception e)
89              {
90                  Log.debug(e);
91              }
92              finally
93              {
94                  try
95                  {
96                      _endp.close();
97                  }
98                  catch (IOException e)
99                  {
100                     Log.ignore(e);
101                 }
102 
103                 if (ex.getStatus() < HttpExchange.STATUS_COMPLETED)
104                 {
105                     ex.setStatus(HttpExchange.STATUS_EXPIRED);
106                 }
107             }
108         }
109 
110     };
111 
112     /* ------------------------------------------------------------ */
113     HttpConnection(Buffers buffers, EndPoint endp, int hbs, int cbs)
114     {
115         _endp = endp;
116         _generator = new HttpGenerator(buffers,endp,hbs,cbs);
117         _parser = new HttpParser(buffers,endp,new Handler(),hbs,cbs);
118     }
119 
120     /* ------------------------------------------------------------ */
121     public HttpDestination getDestination()
122     {
123         return _destination;
124     }
125 
126     /* ------------------------------------------------------------ */
127     public void setDestination(HttpDestination destination)
128     {
129         _destination = destination;
130     }
131 
132     /* ------------------------------------------------------------ */
133     public boolean send(HttpExchange ex) throws IOException
134     {
135         // _message =
136         // Thread.currentThread().getName()+": Generator instance="+_generator
137         // .hashCode()+" state= "+_generator.getState()+" _exchange="+_exchange;
138         _throwable = new Throwable();
139         synchronized (this)
140         {
141             if (_exchange != null)
142             {
143                 if (_pipeline != null)
144                     throw new IllegalStateException(this + " PIPELINED!!!  _exchange=" + _exchange);
145                 _pipeline = ex;
146                 return true;
147             }
148 
149             if (!_endp.isOpen())
150                 return false;
151 
152             ex.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
153             _exchange = ex;
154 
155             if (_endp.isBlocking())
156                 this.notify();
157             else
158             {
159                 SelectChannelEndPoint scep = (SelectChannelEndPoint)_endp;
160                 scep.scheduleWrite();
161             }
162 
163             if (!_endp.isBlocking())
164                 _destination.getHttpClient().schedule(_timeout);
165 
166             return true;
167         }
168     }
169 
170     /* ------------------------------------------------------------ */
171     public void handle() throws IOException
172     {
173         int no_progress = 0;
174         long flushed = 0;
175 
176         boolean failed = false;
177         while (_endp.isBufferingInput() || _endp.isOpen())
178         {
179             synchronized (this)
180             {
181                 while (_exchange == null)
182                 {
183                     if (_endp.isBlocking())
184                     {
185                         try
186                         {
187                             this.wait();
188                         }
189                         catch (InterruptedException e)
190                         {
191                             throw new InterruptedIOException();
192                         }
193                     }
194                     else
195                     {
196                         // Hopefully just space?
197                         _parser.fill();
198                         _parser.skipCRLF();
199                         if (_parser.isMoreInBuffer())
200                         {
201                             Log.warn("unexpected data");
202                             _endp.close();
203                         }
204 
205                         return;
206                     }
207                 }
208             }
209             if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
210             {
211                 no_progress = 0;
212                 commitRequest();
213             }
214 
215             try
216             {
217                 long io = 0;
218                 _endp.flush();
219 
220                 if (_generator.isComplete())
221                 {
222                     if (!_requestComplete)
223                     {
224                         _requestComplete = true;
225                         _exchange.getEventListener().onRequestComplete();
226                     }
227                 }
228                 else
229                 {
230                     // Write as much of the request as possible
231                     synchronized (this)
232                     {
233                         if (_exchange == null)
234                             continue;
235                         flushed = _generator.flush();
236                         io += flushed;
237                     }
238 
239                     if (!_generator.isComplete())
240                     {
241                         InputStream in = _exchange.getRequestContentSource();
242                         if (in != null)
243                         {
244                             if (_requestContentChunk == null || _requestContentChunk.length() == 0)
245                             {
246                                 _requestContentChunk = _exchange.getRequestContentChunk();
247                                 if (_requestContentChunk != null)
248                                     _generator.addContent(_requestContentChunk,false);
249                                 else
250                                     _generator.complete();
251                                 io += _generator.flush();
252                             }
253                         }
254                         else
255                             _generator.complete();
256                     }
257                 }
258                 
259 
260                 // If we are not ended then parse available
261                 if (!_parser.isComplete() && _generator.isCommitted())
262                 {
263                     long filled = _parser.parseAvailable();
264                     io += filled;
265                 }
266 
267                 if (io > 0)
268                     no_progress = 0;
269                 else if (no_progress++ >= 2 && !_endp.isBlocking())
270                 {
271                     // SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
272                     if (_endp instanceof SslHttpChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
273                     {
274                         if (_generator.flush()>0)
275                             continue;
276                     }
277                     return;
278                 }
279             }
280             catch (IOException e)
281             {
282                 synchronized (this)
283                 {
284                     if (_exchange != null)
285                     {
286                         _exchange.getEventListener().onException(e);
287                         _exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
288                     }
289                 }
290                 failed = true;
291                 throw e;
292             }
293             finally
294             {
295                 boolean complete = false;
296                 boolean close = failed; // always close the connection on error
297                 if (!failed)
298                 {
299                     // are we complete?
300                     if (_generator.isComplete())
301                     {
302                         if (!_requestComplete)
303                         {
304                             _requestComplete = true;
305                             _exchange.getEventListener().onRequestComplete();
306                         }
307 
308                         // we need to return the HttpConnection to a state that
309                         // it can be reused or closed out
310                         if (_parser.isComplete())
311                         {
312                             _destination.getHttpClient().cancel(_timeout);
313                             complete = true;
314                         }
315                     }
316                 }
317 
318                 if (complete || failed)
319                 {
320                     synchronized (this)
321                     {
322                         if (!close)
323                             close = shouldClose();
324                             
325                         reset(true);
326                         no_progress = 0;
327                         flushed = -1;
328                         if (_exchange != null)
329                         {
330                             _exchange = null;
331 
332                             if (_pipeline == null)
333                             {
334                                 _destination.returnConnection(this,close);
335                                 if (close)
336                                     return;
337                             }
338                             else
339                             {
340                                 if (close)
341                                 {
342                                     _destination.returnConnection(this,close);
343                                     _destination.send(_pipeline);
344                                     _pipeline = null;
345                                     return;
346                                 }
347 
348                                 HttpExchange ex = _pipeline;
349                                 _pipeline = null;
350 
351                                 send(ex);
352                             }
353                         }
354                     }
355                 }
356             }
357         }
358     }
359 
360     /* ------------------------------------------------------------ */
361     public boolean isIdle()
362     {
363         synchronized (this)
364         {
365             return _exchange == null;
366         }
367     }
368 
369     /* ------------------------------------------------------------ */
370     public EndPoint getEndPoint()
371     {
372         return _endp;
373     }
374 
375     /* ------------------------------------------------------------ */
376     private void commitRequest() throws IOException
377     {
378         synchronized (this)
379         {
380             if (_exchange.getStatus() != HttpExchange.STATUS_WAITING_FOR_COMMIT)
381                 throw new IllegalStateException();
382 
383             _exchange.setStatus(HttpExchange.STATUS_SENDING_REQUEST);
384             _generator.setVersion(_exchange._version);
385 
386             String uri = _exchange._uri;
387             if (_destination.isProxied() && uri.startsWith("/"))
388             {
389                 // TODO suppress port 80 or 443
390                 uri = (_destination.isSecure()?HttpSchemes.HTTPS:HttpSchemes.HTTP) + "://" + _destination.getAddress().getHost() + ":"
391                         + _destination.getAddress().getPort() + uri;
392                 Authorization auth = _destination.getProxyAuthentication();
393                 if (auth != null)
394                     auth.setCredentials(_exchange);
395             }
396 
397             _generator.setRequest(_exchange._method,uri);
398 
399             if (_exchange._version >= HttpVersions.HTTP_1_1_ORDINAL)
400             {
401                 if (!_exchange._requestFields.containsKey(HttpHeaders.HOST_BUFFER))
402                     _exchange._requestFields.add(HttpHeaders.HOST_BUFFER,_destination.getHostHeader());
403             }
404 
405             if (_exchange._requestContent != null)
406             {
407                 _exchange._requestFields.putLongField(HttpHeaders.CONTENT_LENGTH,_exchange._requestContent.length());
408                 _generator.completeHeader(_exchange._requestFields,false);
409                 _generator.addContent(_exchange._requestContent,true);
410             }
411             else if (_exchange._requestContentSource != null)
412             {
413                 _generator.completeHeader(_exchange._requestFields,false);
414                 int available = _exchange._requestContentSource.available();
415                 if (available > 0)
416                 {
417                     // TODO deal with any known content length
418 
419                     // TODO reuse this buffer!
420                     byte[] buf = new byte[available];
421                     int length = _exchange._requestContentSource.read(buf);
422                     _generator.addContent(new ByteArrayBuffer(buf,0,length),false);
423                 }
424             }
425             else
426             {
427                 _exchange._requestFields.remove(HttpHeaders.CONTENT_LENGTH); // TODO
428                 // :
429                 // should
430                 // not
431                 // be
432                 // needed
433                 _generator.completeHeader(_exchange._requestFields,true);
434             }
435 
436             _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
437         }
438     }
439 
440     /* ------------------------------------------------------------ */
441     protected void reset(boolean returnBuffers) throws IOException
442     {
443         _requestComplete = false;
444         _connectionHeader = null;
445         _parser.reset(returnBuffers);
446         _generator.reset(returnBuffers);
447         _http11 = true;
448     }
449 
450     /* ------------------------------------------------------------ */
451     private boolean shouldClose()
452     {
453         if (_connectionHeader!=null)
454         {
455             if (HttpHeaderValues.CLOSE_BUFFER.equals(_connectionHeader))
456                 return true;
457             if (HttpHeaderValues.KEEP_ALIVE_BUFFER.equals(_connectionHeader))
458                 return false;
459         }
460         return !_http11;
461     }
462 
463     /* ------------------------------------------------------------ */
464     private class Handler extends HttpParser.EventHandler
465     {
466         @Override
467         public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
468         {
469             // System.out.println( method.toString() + "///" + url.toString() +
470             // "///" + version.toString() );
471             // TODO validate this is acceptable, the <!DOCTYPE goop was coming
472             // out here
473             // throw new IllegalStateException();
474         }
475 
476         @Override
477         public void startResponse(Buffer version, int status, Buffer reason) throws IOException
478         {
479             _http11 = HttpVersions.HTTP_1_1_BUFFER.equals(version);
480             _exchange.getEventListener().onResponseStatus(version,status,reason);
481             _exchange.setStatus(HttpExchange.STATUS_PARSING_HEADERS);
482         }
483 
484         @Override
485         public void parsedHeader(Buffer name, Buffer value) throws IOException
486         {
487             if (HttpHeaders.CACHE.getOrdinal(name) == HttpHeaders.CONNECTION_ORDINAL)
488             {
489                 _connectionHeader = HttpHeaderValues.CACHE.lookup(value);
490             }
491             _exchange.getEventListener().onResponseHeader(name,value);
492         }
493 
494         @Override
495         public void headerComplete() throws IOException
496         {
497             _exchange.setStatus(HttpExchange.STATUS_PARSING_CONTENT);
498         }
499 
500         @Override
501         public void content(Buffer ref) throws IOException
502         {
503             _exchange.getEventListener().onResponseContent(ref);
504         }
505 
506         @Override
507         public void messageComplete(long contextLength) throws IOException
508         {
509             _exchange.setStatus(HttpExchange.STATUS_COMPLETED);
510         }
511     }
512 
513     /* ------------------------------------------------------------ */
514     public String toString()
515     {
516         return "HttpConnection@" + hashCode() + "//" + _destination.getAddress().getHost() + ":" + _destination.getAddress().getPort();
517     }
518 
519     /* ------------------------------------------------------------ */
520     public String toDetailString()
521     {
522         return toString() + " ex=" + _exchange + " " + _timeout.getAge();
523     }
524 
525     /* ------------------------------------------------------------ */
526     /**
527      * @return the last
528      */
529     public long getLast()
530     {
531         return _last;
532     }
533 
534     /* ------------------------------------------------------------ */
535     /**
536      * @param last
537      *            the last to set
538      */
539     public void setLast(long last)
540     {
541         _last = last;
542     }
543 
544     /* ------------------------------------------------------------ */
545     public void close() throws IOException
546     {
547         _endp.close();
548     }
549 
550 }