00001 /* 00002 * Phusion Passenger - http://www.modrails.com/ 00003 * Copyright (c) 2010 Phusion 00004 * 00005 * "Phusion Passenger" is a trademark of Hongli Lai & Ninh Bui. 00006 * 00007 * Permission is hereby granted, free of charge, to any person obtaining a copy 00008 * of this software and associated documentation files (the "Software"), to deal 00009 * in the Software without restriction, including without limitation the rights 00010 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 00011 * copies of the Software, and to permit persons to whom the Software is 00012 * furnished to do so, subject to the following conditions: 00013 * 00014 * The above copyright notice and this permission notice shall be included in 00015 * all copies or substantial portions of the Software. 00016 * 00017 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 00018 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 00019 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 00020 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 00021 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 00022 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 00023 * THE SOFTWARE. 00024 */ 00025 #ifndef _PASSENGER_EVENTED_CLIENT_H_ 00026 #define _PASSENGER_EVENTED_CLIENT_H_ 00027 00028 #include <ev++.h> 00029 #include <string> 00030 #include <sys/types.h> 00031 #include <cstdlib> 00032 #include <cerrno> 00033 #include <cassert> 00034 00035 #include <boost/function.hpp> 00036 #include <oxt/system_calls.hpp> 00037 #include <oxt/thread.hpp> 00038 00039 #include "FileDescriptor.h" 00040 #include "Utils/IOUtils.h" 00041 00042 namespace Passenger { 00043 00044 using namespace std; 00045 using namespace boost; 00046 using namespace oxt; 00047 00048 00049 /** 00050 * A utility class for making I/O handling in non-blocking libev evented servers 00051 * much easier. 00052 * - An EventedClient is associated with a reference counted file descriptor. 00053 * - It contains connection state information (i.e. whether the connection is 00054 * established or closed). Callbacks are provided for watching connection 00055 * state changes (e.g. <tt>onDisconnect</tt>). 00056 * - It provides reference counting features for simpler memory management 00057 * (<tt>ref()</tt> and <tt>unref()</tt>). 00058 * - It installs input and output readiness watchers that are unregistered 00059 * when the EventedClient is destroyed. One can hook into input readiness 00060 * watcher with the <tt>onReadable</tt> callback. 00061 * - Makes zero-copy writes easy. The <tt>write()</tt> method accepts an array 00062 * of buffers. Whenever possible, all of these buffers are written out in 00063 * the given order, using a single system call, without copying them into a 00064 * single temporary buffer. 00065 * - Makes non-blocking writes easy. Normally a write() system call on a 00066 * non-blocking socket can fail with EAGAIN if the socket send buffer is 00067 * full. EventedClient schedules the data to be sent later when the socket is 00068 * writable again. It automatically integrates into the main loop in order 00069 * to do this. This allows one to have write operations occur concurrently 00070 * with read operations. 00071 * In case too many scheduled writes are being piled up, EventedClient 00072 * is smart enough to temporarily disable read notifications and wait until 00073 * everything is written out before enabling read notifications again. 00074 * The definition of "too many" is customizable (<tt>setOutboxLimit()</tt>). 00075 * - EventedClient's <tt>disconnect</tt> method respects pending writes. It 00076 * will disconnect after all pending outgoing data have been written out. 00077 * 00078 * <h2>Basic usage</h2> 00079 * Construct an EventedClient with a libev loop and a file descriptor: 00080 * 00081 * @code 00082 * EventedClient *client = new EventedClient(loop, fd); 00083 * @endcode 00084 * 00085 * You are probably interested in read readiness notifications on <tt>fd</tt>. 00086 * However these notifications are disabled by default. You need to set the 00087 * <tt>onReadable</tt> callback (which is called every time the fd is 00088 * readable) and enable read notifications. 00089 * 00090 * @code 00091 * void onReadable(EventedClient *client) { 00092 * // do whatever you want 00093 * } 00094 * 00095 * ... 00096 * client->onReadable = onReadable; 00097 * client->notifyReads(true); 00098 * @endcode 00099 * 00100 * <h2>Error handling</h2> 00101 * EventedClient never raises exceptions, except when your callbacks do. 00102 * It reports errors with the <tt>onSystemError</tt> callback. That said, 00103 * EventedClient is exception-aware and will ensure that its internal 00104 * state stays consistent even when your callbacks throw exceptions. 00105 */ 00106 class EventedClient { 00107 public: 00108 typedef void (*Callback)(EventedClient *client); 00109 typedef void (*SystemErrorCallback)(EventedClient *client, const string &message, int code); 00110 00111 private: 00112 enum { 00113 /** 00114 * This is the initial state for a client. It means we're 00115 * connected to the client, ready to receive data and 00116 * there's no pending outgoing data. In this state we will 00117 * only be watching for read events. 00118 */ 00119 EC_CONNECTED, 00120 00121 /** 00122 * This state is entered from EC_CONNECTED when the write() 00123 * method fails to send all data immediately and EventedClient 00124 * schedules some data to be sent later, when the socket becomes 00125 * readable again. In here we will be watching for read 00126 * and write events. 00127 */ 00128 EC_WRITES_PENDING, 00129 00130 /** 00131 * This state is entered from EC_WRITES_PENDING or from EC_CONNECTED 00132 * when the write() method fails to send all data immediately, and 00133 * the amount of data to be scheduled to be sent later is larger 00134 * than the specified outbox limit. In this state, EventedClient 00135 * will not watch for read events and will instead concentrate on 00136 * sending out all pending data before watching read events again. 00137 * When all pending data has been sent out the system will transition 00138 * to EC_CONNECTED. 00139 */ 00140 EC_TOO_MANY_WRITES_PENDING, 00141 00142 /** 00143 * This state is entered from the EC_WRITES_PENDING or the 00144 * EC_TOO_MANY_WRITES_PENDING state when disconnect() is called. 00145 * It means that we want to close the connection as soon as all 00146 * pending outgoing data has been sent. As soon as that happens 00147 * it'll transition to EC_DISCONNECTED. In this state no further 00148 * I/O should be allowed. 00149 */ 00150 EC_DISCONNECTING_WITH_WRITES_PENDING, 00151 00152 /** 00153 * Final state. Client connection has been closed. No 00154 * I/O with the client is possible. 00155 */ 00156 EC_DISCONNECTED 00157 } state; 00158 00159 /** A libev watcher on for watching read events on <tt>fd</tt>. */ 00160 ev::io readWatcher; 00161 /** A libev watcher on for watching write events on <tt>fd</tt>. */ 00162 ev::io writeWatcher; 00163 /** Storage for data that could not be sent out immediately. */ 00164 string outbox; 00165 int refcount; 00166 bool m_notifyReads; 00167 unsigned int outboxLimit; 00168 00169 void _onReadable(ev::io &w, int revents) { 00170 emitEvent(onReadable); 00171 } 00172 00173 void onWritable(ev::io &w, int revents) { 00174 assert(state != EC_DISCONNECTED); 00175 00176 this_thread::disable_interruption di; 00177 this_thread::disable_syscall_interruption dsi; 00178 size_t sent = 0; 00179 bool done = outbox.empty(); 00180 00181 while (!done) { 00182 ssize_t ret = syscalls::write(fd, 00183 outbox.data() + sent, 00184 outbox.size() - sent); 00185 if (ret == -1) { 00186 if (errno != EAGAIN) { 00187 int e = errno; 00188 disconnect(true); 00189 emitSystemErrorEvent("Cannot write data to client", e); 00190 return; 00191 } 00192 done = true; 00193 } else { 00194 sent += ret; 00195 done = sent == outbox.size(); 00196 } 00197 } 00198 if (sent > 0) { 00199 outbox.erase(0, sent); 00200 } 00201 00202 updateWatcherStates(); 00203 if (outbox.empty()) { 00204 emitEvent(onPendingDataFlushed); 00205 } 00206 } 00207 00208 bool outboxTooLarge() { 00209 return outbox.size() > 0 && outbox.size() >= outboxLimit; 00210 } 00211 00212 void updateWatcherStates() { 00213 if (outbox.empty()) { 00214 switch (state) { 00215 case EC_CONNECTED: 00216 watchReadEvents(m_notifyReads); 00217 watchWriteEvents(false); 00218 break; 00219 case EC_WRITES_PENDING: 00220 case EC_TOO_MANY_WRITES_PENDING: 00221 state = EC_CONNECTED; 00222 watchReadEvents(m_notifyReads); 00223 watchWriteEvents(false); 00224 break; 00225 case EC_DISCONNECTING_WITH_WRITES_PENDING: 00226 state = EC_DISCONNECTED; 00227 watchReadEvents(false); 00228 watchWriteEvents(false); 00229 try { 00230 fd.close(); 00231 } catch (const SystemException &e) { 00232 emitSystemErrorEvent(e.brief(), e.code()); 00233 } 00234 emitEvent(onDisconnect); 00235 break; 00236 default: 00237 // Should never be reached. 00238 abort(); 00239 } 00240 } else { 00241 switch (state) { 00242 case EC_CONNECTED: 00243 if (outboxTooLarge()) { 00244 // If we have way too much stuff in the outbox then 00245 // suspend reading until we've sent out the entire outbox. 00246 state = EC_TOO_MANY_WRITES_PENDING; 00247 watchReadEvents(false); 00248 watchWriteEvents(true); 00249 } else { 00250 state = EC_WRITES_PENDING; 00251 watchReadEvents(m_notifyReads); 00252 watchWriteEvents(true); 00253 } 00254 break; 00255 case EC_WRITES_PENDING: 00256 watchReadEvents(m_notifyReads); 00257 watchWriteEvents(true); 00258 break; 00259 case EC_TOO_MANY_WRITES_PENDING: 00260 case EC_DISCONNECTING_WITH_WRITES_PENDING: 00261 watchReadEvents(false); 00262 watchWriteEvents(true); 00263 break; 00264 default: 00265 // Should never be reached. 00266 abort(); 00267 } 00268 } 00269 } 00270 00271 void watchReadEvents(bool enable = true) { 00272 if (readWatcher.is_active() && !enable) { 00273 readWatcher.stop(); 00274 } else if (!readWatcher.is_active() && enable) { 00275 readWatcher.start(); 00276 } 00277 } 00278 00279 void watchWriteEvents(bool enable = true) { 00280 if (writeWatcher.is_active() && !enable) { 00281 writeWatcher.stop(); 00282 } else if (!writeWatcher.is_active() && enable) { 00283 writeWatcher.start(); 00284 } 00285 } 00286 00287 void emitEvent(Callback callback) { 00288 if (callback != NULL) { 00289 callback(this); 00290 } 00291 } 00292 00293 void emitSystemErrorEvent(const string &message, int code) { 00294 if (onSystemError != NULL) { 00295 onSystemError(this, message, code); 00296 } 00297 } 00298 00299 public: 00300 /** The client's file descriptor. Could be -1: see <tt>ioAllowed()</tt>. */ 00301 FileDescriptor fd; 00302 00303 /** 00304 * Called when the file descriptor becomes readable and read notifications 00305 * are enabled (see <tt>notifyRead()</tt>). When there's too much pending 00306 * outgoing data, readability notifications are temporarily disabled; see 00307 * <tt>write()</tt> for details. 00308 */ 00309 Callback onReadable; 00310 00311 /** 00312 * Called when the client is disconnected. This happens either immediately 00313 * when <tt>disconnect()</tt> is called, or a short amount of time later. 00314 * See the documentation for that function for details. 00315 * 00316 * Please note that destroying an EventedClient object does *not* cause 00317 * this callback to be called. 00318 */ 00319 Callback onDisconnect; 00320 00321 /** 00322 * Called when <tt>detach()</tt> is called for the first time. 00323 */ 00324 Callback onDetach; 00325 00326 /** 00327 * Called after all pending outgoing data have been written out. 00328 * If <tt>write()</tt> can be completed immediately without scheduling 00329 * data for later, then <tt>write()</tt> will call this callback 00330 * immediately after writing. 00331 */ 00332 Callback onPendingDataFlushed; 00333 00334 /** 00335 * System call errors are reported with this callback. 00336 */ 00337 SystemErrorCallback onSystemError; 00338 00339 /** 00340 * EventedClient doesn't do anything with this. Set it to whatever you want. 00341 */ 00342 void *userData; 00343 00344 /** 00345 * Creates a new EventedClient with the given libev loop and file descriptor. 00346 * The initial reference count is 1. 00347 */ 00348 EventedClient(struct ev_loop *loop, const FileDescriptor &_fd) 00349 : readWatcher(loop), 00350 writeWatcher(loop), 00351 fd(_fd) 00352 { 00353 state = EC_CONNECTED; 00354 refcount = 1; 00355 m_notifyReads = false; 00356 outboxLimit = 1024 * 32; 00357 onReadable = NULL; 00358 onDisconnect = NULL; 00359 onDetach = NULL; 00360 onPendingDataFlushed = NULL; 00361 onSystemError = NULL; 00362 userData = NULL; 00363 readWatcher.set(fd, ev::READ); 00364 readWatcher.set<EventedClient, &EventedClient::_onReadable>(this); 00365 writeWatcher.set<EventedClient, &EventedClient::onWritable>(this); 00366 writeWatcher.set(fd, ev::WRITE); 00367 } 00368 00369 virtual ~EventedClient() { 00370 // Unregister file descriptor from the event loop poller before 00371 // closing the file descriptor. 00372 watchReadEvents(false); 00373 watchWriteEvents(false); 00374 } 00375 00376 /** 00377 * Increase reference count. 00378 */ 00379 void ref() { 00380 refcount++; 00381 } 00382 00383 /** 00384 * Decrease reference count. Upon reaching 0, this EventedClient object 00385 * will be destroyed. 00386 */ 00387 void unref() { 00388 refcount--; 00389 assert(refcount >= 0); 00390 if (refcount == 0) { 00391 delete this; 00392 } 00393 } 00394 00395 /** 00396 * Returns whether it is allowed to perform any I/O with this client. 00397 * Usually true, and false when the client is either being disconnected 00398 * or has been disconnected. A return value of false indicates that 00399 * <tt>fd</tt> might be -1, but even when it isn't -1 you shouldn't 00400 * access <tt>fd</tt> anymore. 00401 */ 00402 bool ioAllowed() const { 00403 return state != EC_DISCONNECTING_WITH_WRITES_PENDING 00404 && state != EC_DISCONNECTED; 00405 } 00406 00407 /** Used by unit tests. */ 00408 bool readWatcherActive() const { 00409 return readWatcher.is_active(); 00410 } 00411 00412 /** 00413 * Returns the number of bytes that are scheduled to be sent to the 00414 * client at a later time. 00415 * 00416 * @see write() 00417 */ 00418 size_t pendingWrites() const { 00419 return outbox.size(); 00420 } 00421 00422 /** 00423 * Sets whether you're interested in read events. This will start or 00424 * stop the input readiness watcher appropriately according to the 00425 * current state. 00426 * 00427 * If the client connection is already being closed or has already 00428 * been closed then this method does nothing. 00429 */ 00430 void notifyReads(bool enable) { 00431 if (!ioAllowed()) { 00432 return; 00433 } 00434 00435 this_thread::disable_interruption di; 00436 this_thread::disable_syscall_interruption dsi; 00437 m_notifyReads = enable; 00438 updateWatcherStates(); 00439 } 00440 00441 /** 00442 * Sets a limit on the client outbox. The outbox is where data is stored 00443 * that could not be immediately sent to the client, e.g. because of 00444 * network congestion. Whenver the outbox's size grows past this limit, 00445 * EventedClient will enter a state in which it will stop listening for 00446 * read events and instead concentrate on sending out all pending data. 00447 * 00448 * Setting this to 0 means that the outbox has an unlimited size. Please 00449 * note however that this also means that the outbox's memory could grow 00450 * unbounded if the client is too slow at receiving data. 00451 * 00452 * The default value is some non-zero value. 00453 * 00454 * If the client connection is already being closed or has already 00455 * been closed then this method does nothing. 00456 */ 00457 void setOutboxLimit(unsigned int size) { 00458 if (!ioAllowed()) { 00459 return; 00460 } 00461 00462 this_thread::disable_interruption di; 00463 this_thread::disable_syscall_interruption dsi; 00464 outboxLimit = size; 00465 updateWatcherStates(); 00466 } 00467 00468 void write(const StaticString &data) { 00469 write(&data, 1); 00470 } 00471 00472 /** 00473 * Sends data to this client. This method will try to send the data 00474 * immediately (in which no intermediate copies of the data will be made), 00475 * but if the client is not yet ready to receive data (e.g. because of 00476 * network congestion) then the data will be buffered and scheduled for 00477 * sending later. 00478 * 00479 * If an I/O error was encountered then the client connection will be 00480 * closed by calling <tt>disconnect(true)</tt>. This means this method 00481 * could potentially call the <tt>onDisconnect</tt> callback. 00482 * 00483 * If the client connection is already being closed or has already 00484 * been closed then this method does nothing. 00485 * 00486 * The <tt>onPendingDataFlushed</tt> callback will be called after 00487 * this data and whatever existing pending data have been written 00488 * out. That may either be immediately or after a short period of 00489 * of time. 00490 */ 00491 void write(const StaticString data[], unsigned int count) { 00492 if (!ioAllowed()) { 00493 return; 00494 } 00495 00496 ssize_t ret; 00497 this_thread::disable_interruption di; 00498 this_thread::disable_syscall_interruption dsi; 00499 00500 ret = gatheredWrite(fd, data, count, outbox); 00501 if (ret == -1) { 00502 int e = errno; 00503 disconnect(true); 00504 emitSystemErrorEvent("Cannot write data to client", e); 00505 } else { 00506 updateWatcherStates(); 00507 if (outbox.empty()) { 00508 emitEvent(onPendingDataFlushed); 00509 } 00510 } 00511 } 00512 00513 /** 00514 * Disconnects the client. This actually closes the underlying file 00515 * descriptor, even if the FileDescriptor object still has references. 00516 * 00517 * If <em>force</em> is true then the client will be disconnected 00518 * immediately, and any pending outgoing data will be discarded. 00519 * Otherwise the client will be disconnected after all pending 00520 * outgoing data have been sent; in the mean time no new data can be 00521 * received from or sent to the client. 00522 * 00523 * After the client has actually been disconnected (which may be either 00524 * immediately or after a short period of time), a disconnect event will 00525 * be emitted. 00526 * 00527 * If the client connection has already been closed then this method 00528 * does nothing. If the client connection is being closed (because 00529 * there's pending outgoing data) then the behavior depends on the 00530 * <tt>force</tt> argument: if true then the connection is closed 00531 * immediately and the pending data is discarded, otherwise this 00532 * method does nothing. 00533 * 00534 * The <tt>onDisconnect</tt> callback will be called after the file 00535 * descriptor is closed, which is either immediately or after all 00536 * pending data has been sent out. 00537 */ 00538 void disconnect(bool force = false) { 00539 if (!ioAllowed() && !(state == EC_DISCONNECTING_WITH_WRITES_PENDING && force)) { 00540 return; 00541 } 00542 00543 this_thread::disable_interruption di; 00544 this_thread::disable_syscall_interruption dsi; 00545 00546 if (state == EC_CONNECTED || force) { 00547 state = EC_DISCONNECTED; 00548 watchReadEvents(false); 00549 watchWriteEvents(false); 00550 try { 00551 fd.close(); 00552 } catch (const SystemException &e) { 00553 emitSystemErrorEvent(e.brief(), e.code()); 00554 } 00555 emitEvent(onDisconnect); 00556 } else { 00557 state = EC_DISCONNECTING_WITH_WRITES_PENDING; 00558 watchReadEvents(false); 00559 watchWriteEvents(true); 00560 if (syscalls::shutdown(fd, SHUT_RD) == -1) { 00561 int e = errno; 00562 emitSystemErrorEvent( 00563 "Cannot shutdown reader half of the client socket", 00564 e); 00565 } 00566 } 00567 } 00568 00569 /** 00570 * Detaches the client file descriptor so that this EventedClient no longer 00571 * has any control over it. Any EventedClient I/O watchers on the client file 00572 * descriptor will be stopped and further I/O on the file descriptor via 00573 * EventedClient will become impossible. The original client file descriptor 00574 * is returned and <tt>onDetach</tt> is called. Subsequent calls to this 00575 * function will return -1 and will no longer call <tt>onDetach</tt>. 00576 * 00577 * @post !ioAllowed() 00578 * @post fd == -1 00579 */ 00580 FileDescriptor detach() { 00581 if (state == EC_DISCONNECTED) { 00582 return fd; 00583 } else { 00584 FileDescriptor oldFd = fd; 00585 state = EC_DISCONNECTED; 00586 watchReadEvents(false); 00587 watchWriteEvents(false); 00588 fd = -1; 00589 emitEvent(onDetach); 00590 return oldFd; 00591 } 00592 } 00593 }; 00594 00595 00596 } // namespace Passenger 00597 00598 #endif /* _PASSENGER_EVENTED_CLIENT_H_ */