00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef _PASSENGER_LOGGING_H_
00026 #define _PASSENGER_LOGGING_H_
00027
00028 #include <boost/shared_ptr.hpp>
00029 #include <oxt/system_calls.hpp>
00030 #include <oxt/backtrace.hpp>
00031
00032 #include <sys/types.h>
00033 #include <sys/time.h>
00034 #include <sys/file.h>
00035 #include <sys/resource.h>
00036 #include <unistd.h>
00037 #include <fcntl.h>
00038 #include <string>
00039 #include <map>
00040 #include <ostream>
00041 #include <sstream>
00042 #include <cstdio>
00043 #include <ctime>
00044
00045 #include "RandomGenerator.h"
00046 #include "FileDescriptor.h"
00047 #include "MessageClient.h"
00048 #include "StaticString.h"
00049 #include "Exceptions.h"
00050 #include "Utils.h"
00051 #include "Utils/MD5.h"
00052 #include "Utils/SystemTime.h"
00053
00054
00055 namespace Passenger {
00056
00057 using namespace std;
00058 using namespace boost;
00059 using namespace oxt;
00060
00061
00062
00063
00064 extern unsigned int _logLevel;
00065 extern ostream *_logStream;
00066 extern ostream *_debugStream;
00067
00068 unsigned int getLogLevel();
00069 void setLogLevel(unsigned int value);
00070 void setDebugFile(const char *logFile = NULL);
00071
00072
00073
00074
00075
00076
00077
00078 #define P_LOG_TO(expr, stream) \
00079 do { \
00080 if (stream != 0) { \
00081 time_t the_time; \
00082 struct tm *the_tm; \
00083 char datetime_buf[60]; \
00084 struct timeval tv; \
00085 std::stringstream sstream; \
00086 \
00087 the_time = time(NULL); \
00088 the_tm = localtime(&the_time); \
00089 strftime(datetime_buf, sizeof(datetime_buf), "%F %H:%M:%S", the_tm); \
00090 gettimeofday(&tv, NULL); \
00091 sstream << \
00092 "[ pid=" << ((unsigned long) getpid()) << \
00093 " file=" << __FILE__ << ":" << (unsigned long) __LINE__ << \
00094 " time=" << datetime_buf << "." << (unsigned long) (tv.tv_usec / 1000) << " ]:" << \
00095 "\n " << expr << std::endl; \
00096 *stream << sstream.str(); \
00097 stream->flush(); \
00098 } \
00099 } while (false)
00100
00101
00102
00103
00104 #define P_LOG(expr) P_LOG_TO(expr, Passenger::_logStream)
00105
00106
00107
00108
00109
00110 #define P_WARN(expr) P_LOG(expr)
00111
00112
00113
00114
00115
00116 #define P_ERROR(expr) P_LOG(expr)
00117
00118
00119
00120
00121
00122 #define P_DEBUG(expr) P_TRACE(1, expr)
00123
00124 #ifdef PASSENGER_DEBUG
00125 #define P_TRACE(level, expr) \
00126 do { \
00127 if (Passenger::_logLevel >= level) { \
00128 P_LOG_TO(expr, Passenger::_debugStream); \
00129 } \
00130 } while (false)
00131
00132 #define P_ASSERT(expr, result_if_failed, message) \
00133 do { \
00134 if (!(expr)) { \
00135 P_ERROR("Assertion failed: " << message); \
00136 return result_if_failed; \
00137 } \
00138 } while (false)
00139 #else
00140 #define P_TRACE(level, expr) do { } while (false)
00141
00142 #define P_ASSERT(expr, result_if_failed, message) do { } while (false)
00143 #endif
00144
00145
00146
00147
00148 class AnalyticsLog {
00149 private:
00150 static const int INT64_STR_BUFSIZE = 22;
00151
00152 FileDescriptor handle;
00153 string groupName;
00154 string txnId;
00155 bool largeMessages;
00156
00157 class FileLock {
00158 private:
00159 int handle;
00160 public:
00161 FileLock(const int &handle) {
00162 this->handle = handle;
00163 int ret;
00164 do {
00165 ret = ::flock(handle, LOCK_EX);
00166 } while (ret == -1 && errno == EINTR);
00167 if (ret == -1) {
00168 int e = errno;
00169 throw SystemException("Cannot lock analytics log file", e);
00170 }
00171 }
00172
00173 ~FileLock() {
00174 int ret;
00175 do {
00176 ret = ::flock(handle, LOCK_UN);
00177 } while (ret == -1 && errno == EINTR);
00178 if (ret == -1) {
00179 int e = errno;
00180 throw SystemException("Cannot unlock analytics log file", e);
00181 };
00182 }
00183 };
00184
00185
00186
00187
00188
00189
00190 void atomicWrite(const char *data, unsigned int size) {
00191 int ret;
00192
00193 ret = syscalls::write(handle, data, size);
00194 if (ret == -1) {
00195 int e = errno;
00196 throw SystemException("Cannot write to the transaction log", e);
00197 } else if ((unsigned int) ret != size) {
00198 throw IOException("Cannot atomically write to the transaction log");
00199 }
00200 }
00201
00202
00203
00204
00205 char *insertTxnIdAndTimestamp(char *buffer) {
00206 int size;
00207
00208
00209 memcpy(buffer, txnId.c_str(), txnId.size());
00210 buffer += txnId.size();
00211
00212
00213 *buffer = ' ';
00214 buffer++;
00215
00216
00217 size = snprintf(buffer, INT64_STR_BUFSIZE, "%llu", SystemTime::getUsec());
00218 if (size >= INT64_STR_BUFSIZE) {
00219
00220 throw IOException("Cannot format a new transaction log message timestamp.");
00221 }
00222 buffer += size;
00223
00224
00225 *buffer = ' ';
00226
00227 return buffer + 1;
00228 }
00229
00230 public:
00231 AnalyticsLog() { }
00232
00233 AnalyticsLog(const FileDescriptor &handle, const string &groupName, const string &txnId,
00234 bool largeMessages)
00235 {
00236 this->handle = handle;
00237 this->groupName = groupName;
00238 this->txnId = txnId;
00239 this->largeMessages = largeMessages;
00240 message("ATTACH");
00241 }
00242
00243 ~AnalyticsLog() {
00244 if (handle != -1) {
00245 message("DETACH");
00246 }
00247 }
00248
00249 void message(const StaticString &text) {
00250 if (handle != -1 && largeMessages) {
00251
00252 char header[txnId.size() + 1 + INT64_STR_BUFSIZE + 1];
00253 char *end = insertTxnIdAndTimestamp(header);
00254 char sizeHeader[7];
00255
00256 snprintf(sizeHeader, sizeof(sizeHeader) - 1,
00257 "%4x ", (int) (end - header) + (int) text.size() + 1);
00258 sizeHeader[sizeof(sizeHeader) - 1] = '\0';
00259
00260 MessageChannel channel(handle);
00261 FileLock lock(handle);
00262 channel.writeRaw(sizeHeader, strlen(sizeHeader));
00263 channel.writeRaw(header, end - header);
00264 channel.writeRaw(text);
00265 channel.writeRaw("\n");
00266 } else if (handle != -1 && !largeMessages) {
00267
00268 char data[txnId.size() + 1 + INT64_STR_BUFSIZE + 1 + text.size() + 1];
00269 char *end;
00270
00271
00272 end = insertTxnIdAndTimestamp(data);
00273
00274
00275 memcpy(end, text.c_str(), text.size());
00276 end += text.size();
00277
00278
00279 *end = '\n';
00280 end++;
00281
00282 atomicWrite(data, end - data);
00283 }
00284 }
00285
00286 void abort(const StaticString &text) {
00287 if (handle != -1 && largeMessages) {
00288 char header[txnId.size() + 1 + INT64_STR_BUFSIZE + 1 + sizeof("ABORT: ") - 1];
00289 char *end;
00290 char sizeHeader[7];
00291
00292
00293 end = insertTxnIdAndTimestamp(header);
00294
00295 memcpy(end, "ABORT: ", sizeof("ABORT: ") - 1);
00296 end += sizeof("ABORT: ") - 1;
00297
00298 snprintf(sizeHeader, sizeof(sizeHeader) - 1,
00299 "%4x ", (int) (end - header) + (int) text.size() + 1);
00300 sizeHeader[sizeof(sizeHeader) - 1] = '\0';
00301
00302 MessageChannel channel(handle);
00303 FileLock lock(handle);
00304 channel.writeRaw(sizeHeader, strlen(sizeHeader));
00305 channel.writeRaw(header, end - header);
00306 channel.writeRaw(text);
00307 channel.writeRaw("\n");
00308 } else if (handle != -1 && !largeMessages) {
00309
00310 char data[txnId.size() + 1 + INT64_STR_BUFSIZE + 1 +
00311 (sizeof("ABORT: ") - 1) + text.size() + 1];
00312 char *end;
00313
00314
00315 end = insertTxnIdAndTimestamp(data);
00316
00317
00318 memcpy(end, "ABORT: ", sizeof("ABORT: ") - 1);
00319 end += sizeof("ABORT: ") - 1;
00320
00321
00322 *end = '\n';
00323 end++;
00324
00325 atomicWrite(data, end - data);
00326 }
00327 }
00328
00329 bool isNull() const {
00330 return handle == -1;
00331 }
00332
00333 string getGroupName() const {
00334 return groupName;
00335 }
00336
00337 string getTxnId() const {
00338 return txnId;
00339 }
00340 };
00341
00342 typedef shared_ptr<AnalyticsLog> AnalyticsLogPtr;
00343
00344 class AnalyticsScopeLog {
00345 private:
00346 AnalyticsLog *log;
00347 enum {
00348 NAME,
00349 GRANULAR
00350 } type;
00351 union {
00352 const char *name;
00353 struct {
00354 const char *endMessage;
00355 const char *abortMessage;
00356 } granular;
00357 } data;
00358 bool ok;
00359
00360 static string timevalToMsecString(struct timeval &tv) {
00361 unsigned long long i = (unsigned long long) tv.tv_sec * 1000000 + tv.tv_usec;
00362 return toString<unsigned long long>(i);
00363 }
00364
00365 public:
00366 AnalyticsScopeLog(const AnalyticsLogPtr &log, const char *name) {
00367 this->log = log.get();
00368 type = NAME;
00369 data.name = name;
00370 ok = false;
00371 if (log != NULL && !log->isNull()) {
00372 string message;
00373 struct rusage usage;
00374
00375 message.reserve(150);
00376 message.append("BEGIN: ");
00377 message.append(name);
00378 message.append(" (utime = ");
00379 if (getrusage(RUSAGE_SELF, &usage) == -1) {
00380 int e = errno;
00381 throw SystemException("getrusage() failed", e);
00382 }
00383 message.append(timevalToMsecString(usage.ru_utime));
00384 message.append(", stime = ");
00385 message.append(timevalToMsecString(usage.ru_stime));
00386 message.append(")");
00387 log->message(message);
00388 }
00389 }
00390
00391 AnalyticsScopeLog(const AnalyticsLogPtr &log, const char *beginMessage,
00392 const char *endMessage, const char *abortMessage = NULL
00393 ) {
00394 this->log = log.get();
00395 if (log != NULL) {
00396 type = GRANULAR;
00397 data.granular.endMessage = endMessage;
00398 data.granular.abortMessage = abortMessage;
00399 ok = abortMessage == NULL;
00400 log->message(beginMessage);
00401 }
00402 }
00403
00404 ~AnalyticsScopeLog() {
00405 if (log == NULL) {
00406 return;
00407 }
00408 if (type == NAME) {
00409 if (!log->isNull()) {
00410 string message;
00411 struct rusage usage;
00412
00413 message.reserve(150);
00414 if (ok) {
00415 message.append("END: ");
00416 } else {
00417 message.append("FAIL: ");
00418 }
00419 message.append(data.name);
00420 message.append(" (utime = ");
00421 if (getrusage(RUSAGE_SELF, &usage) == -1) {
00422 int e = errno;
00423 throw SystemException("getrusage() failed", e);
00424 }
00425 message.append(timevalToMsecString(usage.ru_utime));
00426 message.append(", stime = ");
00427 message.append(timevalToMsecString(usage.ru_stime));
00428 message.append(")");
00429 log->message(message);
00430 }
00431 } else {
00432 if (ok) {
00433 log->message(data.granular.endMessage);
00434 } else {
00435 log->message(data.granular.abortMessage);
00436 }
00437 }
00438 }
00439
00440 void success() {
00441 ok = true;
00442 }
00443 };
00444
00445 class AnalyticsLogger {
00446 private:
00447 struct CachedFileHandle {
00448 FileDescriptor fd;
00449 time_t lastUsed;
00450 };
00451
00452 typedef map<string, CachedFileHandle> Cache;
00453
00454 string socketFilename;
00455 string username;
00456 string password;
00457 string nodeName;
00458 RandomGenerator randomGenerator;
00459
00460 boost::mutex lock;
00461 Cache fileHandleCache;
00462
00463 FileDescriptor openLogFile(const StaticString &groupName, unsigned long long timestamp,
00464 const StaticString &nodeName, const StaticString &category = "requests")
00465 {
00466 string logFile = determineLogFilename("", groupName, nodeName, category, timestamp);
00467 Cache::iterator it;
00468 lock_guard<boost::mutex> l(lock);
00469
00470 it = fileHandleCache.find(logFile);
00471 if (it == fileHandleCache.end()) {
00472
00473
00474
00475
00476
00477 while (fileHandleCache.size() >= 10) {
00478 Cache::iterator oldest_it = fileHandleCache.begin();
00479 it = oldest_it;
00480 it++;
00481
00482 for (; it != fileHandleCache.end(); it++) {
00483 if (it->second.lastUsed < oldest_it->second.lastUsed) {
00484 oldest_it = it;
00485 }
00486 }
00487
00488 fileHandleCache.erase(oldest_it);
00489 }
00490
00491 MessageClient client;
00492 CachedFileHandle fileHandle;
00493 vector<string> args;
00494
00495 client.connect(socketFilename, username, password);
00496 client.write("open log file",
00497 groupName.c_str(),
00498 toString(timestamp).c_str(),
00499 nodeName.c_str(),
00500 category.c_str(),
00501 NULL);
00502 if (!client.read(args)) {
00503
00504
00505 throw IOException("The logging agent unexpectedly closed the connection.");
00506 }
00507 if (args[0] == "error") {
00508 throw IOException("The logging agent could not open the log file: " + args[1]);
00509 }
00510 fileHandle.fd = client.readFileDescriptor();
00511 fileHandle.lastUsed = SystemTime::get();
00512 it = fileHandleCache.insert(make_pair(logFile, fileHandle)).first;
00513 } else {
00514 it->second.lastUsed = SystemTime::get();
00515 }
00516 return it->second.fd;
00517 }
00518
00519 unsigned long long extractTimestamp(const string &txnId) const {
00520 const char *timestampBegin = strchr(txnId.c_str(), '-');
00521 if (timestampBegin != NULL) {
00522 return atoll(timestampBegin + 1);
00523 } else {
00524 return 0;
00525 }
00526 }
00527
00528 public:
00529 AnalyticsLogger() { }
00530
00531 AnalyticsLogger(const string &socketFilename, const string &username,
00532 const string &password, const string &nodeName = "")
00533 {
00534 this->socketFilename = socketFilename;
00535 this->username = username;
00536 this->password = password;
00537 if (nodeName.empty()) {
00538 this->nodeName = getHostName();
00539 } else {
00540 this->nodeName = nodeName;
00541 }
00542 }
00543
00544 static void determineGroupAndNodeDir(const string &dir, const StaticString &groupName,
00545 const StaticString &nodeName, string &groupDir, string &nodeDir)
00546 {
00547 string result = dir;
00548 appendVersionAndGroupId(result, groupName);
00549 groupDir = result;
00550 result.append(1, '/');
00551 appendNodeId(result, nodeName);
00552 nodeDir = result;
00553 }
00554
00555 static void appendVersionAndGroupId(string &output, const StaticString &groupName) {
00556 md5_state_t state;
00557 md5_byte_t digest[MD5_SIZE];
00558 char checksum[MD5_HEX_SIZE];
00559
00560 output.append("/1/", 3);
00561
00562 md5_init(&state);
00563 md5_append(&state, (const md5_byte_t *) groupName.data(), groupName.size());
00564 md5_finish(&state, digest);
00565 toHex(StaticString((const char *) digest, MD5_SIZE), checksum);
00566 output.append(checksum, MD5_HEX_SIZE);
00567 }
00568
00569 static void appendNodeId(string &output, const StaticString &nodeName) {
00570 md5_state_t state;
00571 md5_byte_t digest[MD5_SIZE];
00572 char checksum[MD5_HEX_SIZE];
00573
00574 md5_init(&state);
00575 md5_append(&state, (const md5_byte_t *) nodeName.data(), nodeName.size());
00576 md5_finish(&state, digest);
00577 toHex(StaticString((const char *) digest, MD5_SIZE), checksum);
00578 output.append(checksum, MD5_HEX_SIZE);
00579 }
00580
00581 static string determineLogFilename(const StaticString &dir,
00582 const StaticString &groupName, const StaticString &nodeName,
00583 const StaticString &category, unsigned long long timestamp)
00584 {
00585 struct tm tm;
00586 time_t time_value;
00587 char time_str[14];
00588
00589 time_value = timestamp / 1000000;
00590 gmtime_r(&time_value, &tm);
00591 strftime(time_str, sizeof(time_str), "%Y/%m/%d/%H", &tm);
00592
00593 string filename;
00594 filename.reserve(dir.size()
00595 + (3 + MD5_HEX_SIZE)
00596 + 1
00597 + MD5_HEX_SIZE
00598 + 1
00599 + category.size()
00600 + 1
00601 + sizeof(time_str)
00602 + sizeof("log.txt")
00603 );
00604 filename.append(dir.c_str(), dir.size());
00605 appendVersionAndGroupId(filename, groupName);
00606 filename.append(1, '/');
00607 appendNodeId(filename, nodeName);
00608 filename.append(1, '/');
00609 filename.append(category.c_str(), category.size());
00610 filename.append(1, '/');
00611 filename.append(time_str);
00612 filename.append("/log.txt");
00613 return filename;
00614 }
00615
00616 AnalyticsLogPtr newTransaction(const string &groupName, const StaticString &category = "requests",
00617 bool largeMessages = false)
00618 {
00619 if (socketFilename.empty()) {
00620 return ptr(new AnalyticsLog());
00621 } else {
00622 unsigned long long timestamp = SystemTime::getUsec();
00623 string txnId = randomGenerator.generateHexString(4);
00624 txnId.append("-");
00625 txnId.append(toString(timestamp));
00626 return ptr(new AnalyticsLog(
00627 openLogFile(groupName, timestamp, nodeName, category),
00628 groupName, txnId, largeMessages));
00629 }
00630 }
00631
00632 AnalyticsLogPtr continueTransaction(const string &groupName, const string &txnId,
00633 const StaticString &category = "requests",
00634 bool largeMessages = false)
00635 {
00636 if (socketFilename.empty() || groupName.empty() || txnId.empty()) {
00637 return ptr(new AnalyticsLog());
00638 } else {
00639 unsigned long long timestamp;
00640
00641 timestamp = extractTimestamp(txnId);
00642 if (timestamp == 0) {
00643 TRACE_POINT();
00644 throw ArgumentException("Invalid transaction ID '" + txnId + "'");
00645 }
00646 return ptr(new AnalyticsLog(
00647 openLogFile(groupName, timestamp, nodeName, category),
00648 groupName, txnId, largeMessages));
00649 }
00650 }
00651
00652 bool isNull() const {
00653 return socketFilename.empty();
00654 }
00655
00656 string getAddress() const {
00657 return socketFilename;
00658 }
00659
00660 string getUsername() const {
00661 return username;
00662 }
00663
00664 string getPassword() const {
00665 return password;
00666 }
00667
00668
00669
00670
00671 string getNodeName() const {
00672 return nodeName;
00673 }
00674 };
00675
00676 typedef shared_ptr<AnalyticsLogger> AnalyticsLoggerPtr;
00677
00678 }
00679
00680 #endif
00681