00001 #ifndef _PASSENGER_EVENTED_SERVER_H_
00002 #define _PASSENGER_EVENTED_SERVER_H_
00003
00004 #include <ev++.h>
00005 #include <boost/shared_ptr.hpp>
00006 #include <boost/enable_shared_from_this.hpp>
00007 #include <oxt/system_calls.hpp>
00008 #include <oxt/macros.hpp>
00009 #include <algorithm>
00010 #include <string>
00011 #include <set>
00012 #include <sys/types.h>
00013 #include <sys/un.h>
00014 #include <sys/uio.h>
00015 #include <netinet/in.h>
00016 #include <unistd.h>
00017 #include "MessageReadersWriters.h"
00018 #include "FileDescriptor.h"
00019 #include "StaticString.h"
00020 #include "Utils.h"
00021
00022 namespace Passenger {
00023
00024 using namespace std;
00025 using namespace boost;
00026 using namespace oxt;
00027
00028 class EventedServer {
00029 protected:
00030 struct Client: public enable_shared_from_this<Client> {
00031 enum {
00032 ES_CONNECTED,
00033 ES_WRITES_PENDING,
00034 ES_DISCONNECTING_WITH_WRITES_PENDING,
00035 ES_DISCONNECTED
00036 } state;
00037
00038 EventedServer *server;
00039 FileDescriptor fd;
00040 ev::io readWatcher;
00041 ev::io writeWatcher;
00042 bool readWatcherStarted;
00043 bool writeWatcherStarted;
00044 string outbox;
00045
00046 Client(EventedServer *_server)
00047 : server(_server),
00048 readWatcher(_server->loop),
00049 writeWatcher(_server->loop)
00050 { }
00051
00052 void _onReadable(ev::io &w, int revents) {
00053 server->onClientReadable(shared_from_this());
00054 }
00055
00056 void _onWritable(ev::io &w, int revents) {
00057 server->onClientWritable(shared_from_this());
00058 }
00059 };
00060
00061 typedef shared_ptr<Client> ClientPtr;
00062
00063 void write(const ClientPtr &client, const char *data) {
00064 write(client, StaticString(data));
00065 }
00066
00067 void write(const ClientPtr &client, const StaticString &data) {
00068 write(client, &data, 1);
00069 }
00070
00071 virtual void write(const ClientPtr &client, const StaticString data[], size_t count) {
00072 if (client->state == Client::ES_DISCONNECTED) {
00073 return;
00074 }
00075
00076 ssize_t ret;
00077 size_t totalSize;
00078 this_thread::disable_syscall_interruption dsi;
00079
00080 if (client->outbox.empty()) {
00081 struct iovec iov[count];
00082
00083 totalSize = staticStringArrayToIoVec(data, count, iov);
00084 ret = syscalls::writev(client->fd, iov, count);
00085 if (ret == -1) {
00086 if (errno == EAGAIN) {
00087 for (size_t i = 0; i < count; i++) {
00088 client->outbox.append(data[i].data(), data[i].size());
00089 }
00090 } else {
00091 printf("write error\n");
00092 }
00093 } else if ((size_t) ret < totalSize) {
00094 size_t index, offset;
00095
00096 findEndOfDataInVectors(iov, count + 1, ret, &index, &offset);
00097 for (size_t i = index; i < count; i++) {
00098 if (i == index) {
00099 client->outbox.append(data[i].data() + offset,
00100 data[i].size() - offset);
00101 } else {
00102 client->outbox.append(data[i].data(),
00103 data[i].size());
00104 }
00105 }
00106 }
00107 } else {
00108 struct iovec iov[count + 1];
00109
00110 iov[0].iov_base = (char *) client->outbox.data();
00111 iov[0].iov_len = client->outbox.size();
00112 totalSize = staticStringArrayToIoVec(data, count, iov + 1);
00113
00114 ret = syscalls::writev(client->fd, iov, count + 1);
00115 if (ret == -1) {
00116 if (errno != EAGAIN) {
00117 printf("write error\n");
00118 }
00119
00120 } else {
00121 string::size_type outboxSize = client->outbox.size();
00122 size_t outboxSent = std::min((size_t) ret, outboxSize);
00123
00124 client->outbox.erase(0, outboxSent);
00125 if (client->outbox.empty()) {
00126 size_t index, offset;
00127
00128 findEndOfDataInVectors(iov, count + 1, ret, &index, &offset);
00129 for (size_t i = index; i < count + 1; i++) {
00130 if (i == index) {
00131 client->outbox.append(
00132 data[i - 1].data() + offset,
00133 data[i - 1].size() - offset);
00134 } else {
00135 client->outbox.append(
00136 data[i - 1].data(),
00137 data[i - 1].size());
00138 }
00139 }
00140 } else {
00141
00142
00143
00144 for (size_t i = 1; i < count + 1; i++) {
00145 client->outbox.append(data[i - 1].data(),
00146 data[i - 1].size());
00147 }
00148 }
00149 }
00150 }
00151 if (client->outbox.empty()) {
00152 outboxFlushed(client);
00153 } else {
00154 outboxNotFlushed(client);
00155 }
00156 }
00157
00158
00159
00160
00161
00162
00163
00164
00165 virtual void disconnect(const ClientPtr &client, bool force = false) {
00166 if (client->state == Client::ES_CONNECTED
00167 || (force && client->state != Client::ES_DISCONNECTED)) {
00168 watchReadEvents(client, false);
00169 watchWriteEvents(client, false);
00170 client->fd.close();
00171 client->state = Client::ES_DISCONNECTED;
00172 clients.erase(client);
00173 } else if (client->state == Client::ES_WRITES_PENDING) {
00174 watchReadEvents(client, false);
00175 watchWriteEvents(client, true);
00176 shutdown(client->fd, SHUT_RD);
00177 client->state = Client::ES_DISCONNECTING_WITH_WRITES_PENDING;
00178 }
00179 }
00180
00181 virtual ClientPtr createClient() {
00182 return ClientPtr(new Client(this));
00183 }
00184
00185 virtual void onNewClient(const ClientPtr &client) { }
00186 virtual void onClientReadable(const ClientPtr &client) { }
00187
00188 private:
00189 struct ev_loop *loop;
00190 FileDescriptor fd;
00191 ev::io acceptWatcher;
00192 set<ClientPtr> clients;
00193
00194 size_t staticStringArrayToIoVec(const StaticString ary[], size_t count, struct iovec *vec) {
00195 size_t total = 0;
00196 for (size_t i = 0; i < count; i++) {
00197 vec[i].iov_base = (char *) ary[i].data();
00198 vec[i].iov_len = ary[i].size();
00199 total += ary[i].size();
00200 }
00201 return total;
00202 }
00203
00204 void findEndOfDataInVectors(struct iovec iov[], size_t count, size_t dataSize,
00205 size_t *index, size_t *offset)
00206 {
00207 size_t i, begin;
00208
00209 begin = 0;
00210 for (i = 0; OXT_LIKELY(i < count); i++) {
00211 size_t end = begin + iov[i].iov_len;
00212 if (OXT_LIKELY(begin <= dataSize)) {
00213 if (dataSize < end) {
00214 *index = i;
00215 *offset = dataSize - begin;
00216 return;
00217 } else {
00218 begin = end;
00219 }
00220 } else {
00221
00222 abort();
00223 }
00224 }
00225
00226 abort();
00227 }
00228
00229 void outboxFlushed(const ClientPtr &client) {
00230 switch (client->state) {
00231 case Client::ES_CONNECTED:
00232 watchReadEvents(client, true);
00233 watchWriteEvents(client, false);
00234 break;
00235 case Client::ES_WRITES_PENDING:
00236 client->state = Client::ES_CONNECTED;
00237 watchReadEvents(client, true);
00238 watchWriteEvents(client, false);
00239 break;
00240 case Client::ES_DISCONNECTING_WITH_WRITES_PENDING:
00241 client->state = Client::ES_DISCONNECTED;
00242 client->fd.close();
00243 clients.erase(client);
00244 break;
00245 default:
00246
00247 abort();
00248 }
00249 }
00250
00251 void outboxNotFlushed(const ClientPtr &client) {
00252 switch (client->state) {
00253 case Client::ES_CONNECTED:
00254 client->state = Client::ES_WRITES_PENDING;
00255
00256
00257 watchReadEvents(client, client->outbox.size() < 1024 * 32);
00258 watchWriteEvents(client, true);
00259 break;
00260 case Client::ES_WRITES_PENDING:
00261 case Client::ES_DISCONNECTING_WITH_WRITES_PENDING:
00262 watchReadEvents(client, false);
00263 watchWriteEvents(client, true);
00264 break;
00265 default:
00266
00267 abort();
00268 }
00269 }
00270
00271 void watchReadEvents(const ClientPtr &client, bool enable = true) {
00272 if (client->readWatcherStarted && !enable) {
00273 client->readWatcherStarted = false;
00274 client->readWatcher.stop();
00275 } else if (!client->readWatcherStarted && enable) {
00276 client->readWatcherStarted = true;
00277 client->readWatcher.start();
00278 }
00279 }
00280
00281 void watchWriteEvents(const ClientPtr &client, bool enable = true) {
00282 if (client->writeWatcherStarted && !enable) {
00283 client->writeWatcherStarted = false;
00284 client->writeWatcher.stop();
00285 } else if (!client->writeWatcherStarted && enable) {
00286 client->writeWatcherStarted = true;
00287 client->writeWatcher.start();
00288 }
00289 }
00290
00291 void onClientWritable(const ClientPtr &client) {
00292 if (client->state == Client::ES_DISCONNECTED) {
00293 return;
00294 }
00295
00296 this_thread::disable_syscall_interruption dsi;
00297 size_t sent = 0;
00298 bool done = client->outbox.empty();
00299
00300 while (!done) {
00301 ssize_t ret = syscalls::write(client->fd,
00302 client->outbox.data() + sent,
00303 client->outbox.size() - sent);
00304 if (ret == -1) {
00305 if (errno != EAGAIN) {
00306 printf("write error\n");
00307 }
00308 done = true;
00309 } else {
00310 sent += ret;
00311 done = sent == client->outbox.size();
00312 }
00313 }
00314 if (sent > 0) {
00315 client->outbox.erase(0, sent);
00316 }
00317
00318 if (client->outbox.empty()) {
00319 outboxFlushed(client);
00320 } else {
00321 outboxNotFlushed(client);
00322 }
00323 }
00324
00325 void onAcceptable(ev::io &w, int revents) {
00326 this_thread::disable_syscall_interruption dsi;
00327 int i = 0;
00328 bool done = false;
00329
00330
00331 while (i < 100 && !done) {
00332
00333
00334 union {
00335 struct sockaddr_un local;
00336 struct sockaddr_in inet;
00337 } addr;
00338 socklen_t len = sizeof(addr);
00339
00340 int clientfd = syscalls::accept(fd, (struct sockaddr *) &addr, &len);
00341 if (clientfd == -1) {
00342 if (errno != EAGAIN && errno != EWOULDBLOCK) {
00343 printf("accept error!\n");
00344 }
00345 done = true;
00346 } else {
00347 FileDescriptor clientfdGuard = clientfd;
00348 setNonBlocking(clientfdGuard);
00349
00350 ClientPtr client = createClient();
00351 client->state = Client::ES_CONNECTED;
00352 client->fd = clientfdGuard;
00353 client->readWatcher.set<Client, &Client::_onReadable>(client.get());
00354 client->readWatcher.set(client->fd, ev::READ);
00355 client->readWatcher.start();
00356 client->readWatcherStarted = true;
00357 client->writeWatcher.set<Client, &Client::_onWritable>(client.get());
00358 client->writeWatcher.set(client->fd, ev::WRITE);
00359 client->writeWatcherStarted = false;
00360 clients.insert(client);
00361 onNewClient(client);
00362 }
00363 i++;
00364 }
00365 }
00366
00367 public:
00368 EventedServer(struct ev_loop *_loop, FileDescriptor serverFd)
00369 : loop(_loop),
00370 acceptWatcher(_loop)
00371 {
00372 fd = serverFd;
00373 setNonBlocking(serverFd);
00374 acceptWatcher.set<EventedServer, &EventedServer::onAcceptable>(this);
00375 acceptWatcher.start(fd, ev::READ);
00376 }
00377
00378 virtual ~EventedServer() { }
00379
00380 struct ev_loop *getLoop() const {
00381 return loop;
00382 }
00383 };
00384
00385 }
00386
00387 #endif