/* ----------------------------------------------------------------------------- * * (c) The GHC Team, 2008-2009 * * Support for fast binary event logging. * * ---------------------------------------------------------------------------*/ #include "PosixSource.h" #include "Rts.h" #ifdef TRACING #include "Trace.h" #include "Capability.h" #include "Trace.h" #include "RtsUtils.h" #include "Stats.h" #include "EventLog.h" #include #include #ifdef PARALLEL_RTS #include "rts/Parallel.h" //need thisPE for parallel trace file name #endif //PARALLEL_RTS static char *event_log_filename = NULL; // File for logging events FILE *event_log_file = NULL; #define EVENT_LOG_SIZE 2 * (1024 * 1024) // 2MB static int flushCount; // Struct for record keeping of buffer to store event types and events. typedef struct _EventsBuf { StgInt8 *begin; StgInt8 *pos; StgInt8 *marker; StgWord64 size; EventCapNo capno; // which capability this buffer belongs to, or -1 } EventsBuf; EventsBuf *capEventBuf; // one EventsBuf for each Capability EventsBuf eventBuf; // an EventsBuf not associated with any Capability #ifdef THREADED_RTS Mutex eventBufMutex; // protected by this mutex #endif char *EventDesc[] = { [EVENT_CREATE_THREAD] = "Create thread", [EVENT_RUN_THREAD] = "Run thread", [EVENT_STOP_THREAD] = "Stop thread", [EVENT_THREAD_RUNNABLE] = "Thread runnable", [EVENT_MIGRATE_THREAD] = "Migrate thread", [EVENT_RUN_SPARK] = "Run spark", [EVENT_STEAL_SPARK] = "Steal spark", [EVENT_SHUTDOWN] = "Shutdown", [EVENT_THREAD_WAKEUP] = "Wakeup thread", [EVENT_GC_START] = "Starting GC", [EVENT_GC_END] = "Finished GC", [EVENT_REQUEST_SEQ_GC] = "Request sequential GC", [EVENT_REQUEST_PAR_GC] = "Request parallel GC", [EVENT_CREATE_SPARK_THREAD] = "Create spark thread", [EVENT_LOG_MSG] = "Log message", [EVENT_USER_MSG] = "User message", [EVENT_STARTUP] = "Startup", [EVENT_BLOCK_MARKER] = "Block marker", [EVENT_VERSION] = "Version", [EVENT_PROGRAM_INVOCATION] = "Program invocation", [EVENT_EDEN_START_RECEIVE] = "Starting message receival", [EVENT_EDEN_END_RECEIVE] = "Finished message receival", [EVENT_CREATE_PROCESS] = "Creating Process", [EVENT_KILL_PROCESS] = "Killing Process", [EVENT_ASSIGN_THREAD_TO_PROCESS] = "Assigning thread to process", [EVENT_CREATE_MACHINE] = "Creating machine", [EVENT_KILL_MACHINE] = "Killing machine", [EVENT_SEND_MESSAGE] = "Sending message", [EVENT_RECEIVE_MESSAGE] = "Receiving message" }; // Event type. typedef struct _EventType { EventTypeNum etNum; // Event Type number. nat size; // size of the payload in bytes char *desc; // Description } EventType; EventType eventTypes[NUM_EVENT_TAGS]; static void initEventsBuf(EventsBuf* eb, StgWord64 size, EventCapNo capno); static void resetEventsBuf(EventsBuf* eb); static void printAndClearEventBuf (EventsBuf *eventsBuf); static void postEventType(EventsBuf *eb, EventType *et); static void postLogMsg(EventsBuf *eb, EventTypeNum type, char *msg, va_list ap); static void postBlockMarker(EventsBuf *eb); static void closeBlockMarker(EventsBuf *ebuf); static StgBool hasRoomForEvent(EventsBuf *eb, EventTypeNum eNum); static StgBool hasRoomForVariableEvent(EventsBuf *eb, nat payload_bytes); static inline void postWord8(EventsBuf *eb, StgWord8 i) { *(eb->pos++) = i; } static inline void postWord16(EventsBuf *eb, StgWord16 i) { postWord8(eb, (StgWord8)(i >> 8)); postWord8(eb, (StgWord8)i); } static inline void postWord32(EventsBuf *eb, StgWord32 i) { postWord16(eb, (StgWord16)(i >> 16)); postWord16(eb, (StgWord16)i); } static inline void postWord64(EventsBuf *eb, StgWord64 i) { postWord32(eb, (StgWord32)(i >> 32)); postWord32(eb, (StgWord32)i); } static inline void postBuf(EventsBuf *eb, StgWord8 *buf, nat size) { memcpy(eb->pos, buf, size); eb->pos += size; } static inline StgWord64 time_ns(void) { return stat_getElapsedTime() * (1000000000LL/TICKS_PER_SECOND); } static inline void postEventTypeNum(EventsBuf *eb, EventTypeNum etNum) { postWord16(eb, etNum); } static inline void postTimestamp(EventsBuf *eb) { postWord64(eb, time_ns()); } static inline void postThreadID(EventsBuf *eb, EventThreadID id) { postWord32(eb,id); } static inline void postProcessID(EventsBuf *eb, EventProcessID id) { postWord32(eb,id); } static inline void postMachineID(EventsBuf *eb, EventMachineID id) { postWord16(eb,id); } static inline void postPortID(EventsBuf *eb, EventPortID id) { postThreadID(eb,id); } static inline void postCapNo(EventsBuf *eb, EventCapNo no) { postWord16(eb,no); } static inline void postPayloadSize(EventsBuf *eb, EventPayloadSize size) { postWord16(eb,size); } static inline void postEventHeader(EventsBuf *eb, EventTypeNum type) { postEventTypeNum(eb, type); postTimestamp(eb); } static inline void postEventHeaderTime(EventsBuf *eb, EventTypeNum type, StgWord64 time) { postEventTypeNum(eb, type); postWord64(eb, time); } static inline void postInt8(EventsBuf *eb, StgInt8 i) { postWord8(eb, (StgWord8)i); } static inline void postInt16(EventsBuf *eb, StgInt16 i) { postWord16(eb, (StgWord16)i); } static inline void postInt32(EventsBuf *eb, StgInt32 i) { postWord32(eb, (StgWord32)i); } static inline void postInt64(EventsBuf *eb, StgInt64 i) { postWord64(eb, (StgWord64)i); } void initEventLogging(void) { StgWord8 t, c; nat n_caps; #ifdef PARALLEL_RTS int thisPESize, peTemp; peTemp = thisPE; thisPESize = 1; while (peTemp > 9){ peTemp = peTemp / 10; thisPESize ++; } event_log_filename = stgMallocBytes(strlen(prog_name) + 10 + 1 + thisPESize, "initEventLogging"); #else //PARALLEL_RTS event_log_filename = stgMallocBytes(strlen(prog_name) + 10, "initEventLogging"); #endif //else PARALLEL_RTS if (sizeof(EventDesc) / sizeof(char*) != NUM_EVENT_TAGS) { barf("EventDesc array has the wrong number of elements"); } #ifdef PARALLEL_RTS sprintf(event_log_filename, "%s#%d.eventlog", prog_name, thisPE); #else //PARALLEL_RTS sprintf(event_log_filename, "%s.eventlog", prog_name); #endif //else PARALLEL_RTS /* Open event log file for writing. */ if ((event_log_file = fopen(event_log_filename, "wb")) == NULL) { sysErrorBelch("initEventLogging: can't open %s", event_log_filename); stg_exit(EXIT_FAILURE); } /* * Allocate buffer(s) to store events. * Create buffer large enough for the header begin marker, all event * types, and header end marker to prevent checking if buffer has room * for each of these steps, and remove the need to flush the buffer to * disk during initialization. * * Use a single buffer to store the header with event types, then flush * the buffer so all buffers are empty for writing events. */ #ifdef THREADED_RTS // XXX n_capabilities hasn't been initislised yet n_caps = RtsFlags.ParFlags.nNodes; #else n_caps = 1; #endif capEventBuf = stgMallocBytes(n_caps * sizeof(EventsBuf),"initEventLogging"); for (c = 0; c < n_caps; ++c) { // Init buffer for events. initEventsBuf(&capEventBuf[c], EVENT_LOG_SIZE, c); } initEventsBuf(&eventBuf, EVENT_LOG_SIZE, (EventCapNo)(-1)); // Write in buffer: the header begin marker. postInt32(&eventBuf, EVENT_HEADER_BEGIN); // Mark beginning of event types in the header. postInt32(&eventBuf, EVENT_HET_BEGIN); for (t = 0; t < NUM_EVENT_TAGS; ++t) { eventTypes[t].etNum = t; eventTypes[t].desc = EventDesc[t]; switch (t) { case EVENT_CREATE_THREAD: // (cap, thread) case EVENT_RUN_THREAD: // (cap, thread) case EVENT_THREAD_RUNNABLE: // (cap, thread) case EVENT_RUN_SPARK: // (cap, thread) case EVENT_CREATE_SPARK_THREAD: // (cap, spark_thread) eventTypes[t].size = sizeof(EventThreadID); break; case EVENT_MIGRATE_THREAD: // (cap, thread, new_cap) case EVENT_STEAL_SPARK: // (cap, thread, victim_cap) case EVENT_THREAD_WAKEUP: // (cap, thread, other_cap) eventTypes[t].size = sizeof(EventThreadID) + sizeof(EventCapNo); break; case EVENT_STOP_THREAD: // (cap, thread, status) eventTypes[t].size = sizeof(EventThreadID) + sizeof(StgWord16); break; case EVENT_SHUTDOWN: // (cap) case EVENT_REQUEST_SEQ_GC: // (cap) case EVENT_REQUEST_PAR_GC: // (cap) case EVENT_GC_START: // (cap) case EVENT_GC_END: // (cap) case EVENT_STARTUP: case EVENT_EDEN_START_RECEIVE: case EVENT_EDEN_END_RECEIVE: eventTypes[t].size = 0; break; case EVENT_LOG_MSG: // (msg) case EVENT_USER_MSG: // (msg) case EVENT_VERSION: // (version_string) case EVENT_PROGRAM_INVOCATION: // (commandline) eventTypes[t].size = 0xffff; break; case EVENT_BLOCK_MARKER: eventTypes[t].size = sizeof(StgWord32) + sizeof(EventTimestamp) + sizeof(EventCapNo); break; case EVENT_CREATE_PROCESS: // (process) case EVENT_KILL_PROCESS: // (process) eventTypes[t].size = sizeof(EventProcessID); break; case EVENT_ASSIGN_THREAD_TO_PROCESS: // (thread, process) eventTypes[t].size = sizeof(EventThreadID) + sizeof(EventProcessID); break; case EVENT_CREATE_MACHINE: // (machine) case EVENT_KILL_MACHINE: // (machine) eventTypes[t].size = sizeof(EventMachineID); break; case EVENT_SEND_MESSAGE: //(msgtag, sender_process, sender_thread, receiver_machine, receiver_process, receiver_inport) eventTypes[t].size = sizeof(StgWord8) + sizeof(EventThreadID) + 2 * sizeof(EventProcessID) + sizeof(EventPortID) + sizeof(EventMachineID); break; case EVENT_RECEIVE_MESSAGE: // (msgtag, receiver_process, receiver_inport, sender_machine, sender_process, sender_outport, message_size) eventTypes[t].size = sizeof(StgWord8) + 2 * sizeof(EventProcessID) + 2 * sizeof(EventPortID) + sizeof(EventMachineID) + sizeof(StgInt32); break; default: continue; /* ignore deprecated events */ } // Write in buffer: the start event type. postEventType(&eventBuf, &eventTypes[t]); } // Mark end of event types in the header. postInt32(&eventBuf, EVENT_HET_END); // Write in buffer: the header end marker. postInt32(&eventBuf, EVENT_HEADER_END); // Prepare event buffer for events (data). postInt32(&eventBuf, EVENT_DATA_BEGIN); // Post a STARTUP event with the number of capabilities postEventHeader(&eventBuf, EVENT_STARTUP); postCapNo(&eventBuf, n_caps); // Flush capEventBuf with header. /* * Flush header and data begin marker to the file, thus preparing the * file to have events written to it. */ printAndClearEventBuf(&eventBuf); for (c = 0; c < n_caps; ++c) { postBlockMarker(&capEventBuf[c]); } #ifdef THREADED_RTS initMutex(&eventBufMutex); #endif } void endEventLogging(void) { nat c; // Flush all events remaining in the buffers. for (c = 0; c < n_capabilities; ++c) { printAndClearEventBuf(&capEventBuf[c]); } printAndClearEventBuf(&eventBuf); resetEventsBuf(&eventBuf); // we don't want the block marker // Mark end of events (data). postEventTypeNum(&eventBuf, EVENT_DATA_END); // Flush the end of data marker. printAndClearEventBuf(&eventBuf); if (event_log_file != NULL) { fclose(event_log_file); } } void freeEventLogging(void) { StgWord8 c; // Free events buffer. for (c = 0; c < n_capabilities; ++c) { if (capEventBuf[c].begin != NULL) stgFree(capEventBuf[c].begin); } if (capEventBuf != NULL) { stgFree(capEventBuf); } if (event_log_filename != NULL) { stgFree(event_log_filename); } } /* * Post an event message to the capability's eventlog buffer. * If the buffer is full, prints out the buffer and clears it. */ void postSchedEvent (Capability *cap, EventTypeNum tag, StgThreadID thread, StgWord64 other) { EventsBuf *eb; eb = &capEventBuf[cap->no]; if (!hasRoomForEvent(eb, tag)) { // Flush event buffer to make room for new event. printAndClearEventBuf(eb); } postEventHeader(eb, tag); switch (tag) { case EVENT_CREATE_THREAD: // (cap, thread) case EVENT_RUN_THREAD: // (cap, thread) case EVENT_THREAD_RUNNABLE: // (cap, thread) case EVENT_RUN_SPARK: // (cap, thread) { postThreadID(eb,thread); break; } case EVENT_CREATE_SPARK_THREAD: // (cap, spark_thread) { postThreadID(eb,other /* spark_thread */); break; } case EVENT_MIGRATE_THREAD: // (cap, thread, new_cap) case EVENT_STEAL_SPARK: // (cap, thread, victim_cap) case EVENT_THREAD_WAKEUP: // (cap, thread, other_cap) { postThreadID(eb,thread); postCapNo(eb,other /* new_cap | victim_cap | other_cap */); break; } case EVENT_STOP_THREAD: // (cap, thread, status) { postThreadID(eb,thread); postWord16(eb,other /* status */); break; } case EVENT_SHUTDOWN: // (cap) case EVENT_REQUEST_SEQ_GC: // (cap) case EVENT_REQUEST_PAR_GC: // (cap) case EVENT_GC_START: // (cap) case EVENT_GC_END: // (cap) { break; } default: barf("postEvent: unknown event tag %d", tag); } } void postEvent (Capability *cap, EventTypeNum tag) { EventsBuf *eb; eb = &capEventBuf[cap->no]; if (!hasRoomForEvent(eb, tag)) { // Flush event buffer to make room for new event. printAndClearEventBuf(eb); } postEventHeader(eb, tag); } void postLogMsgF(EventsBuf *eb, EventTypeNum type, char *msg, ...) { va_list ap; va_start(ap,msg); postLogMsg(eb, type, msg, ap); va_end(ap); } #define BUF 512 void postLogMsg(EventsBuf *eb, EventTypeNum type, char *msg, va_list ap) { char buf[BUF]; nat size; size = vsnprintf(buf,BUF,msg,ap); if (size > BUF) { buf[BUF-1] = '\0'; size = BUF; } if (!hasRoomForVariableEvent(eb, size)) { // Flush event buffer to make room for new event. printAndClearEventBuf(eb); } postEventHeader(eb, type); postPayloadSize(eb, size); postBuf(eb,(StgWord8*)buf,size); } void postMsg(char *msg, va_list ap) { ACQUIRE_LOCK(&eventBufMutex); postLogMsg(&eventBuf, EVENT_LOG_MSG, msg, ap); RELEASE_LOCK(&eventBufMutex); } void postCapMsg(Capability *cap, char *msg, va_list ap) { postLogMsg(&capEventBuf[cap->no], EVENT_LOG_MSG, msg, ap); } void postUserMsg(Capability *cap, char *msg, va_list ap) { postLogMsg(&capEventBuf[cap->no], EVENT_USER_MSG, msg, ap); } void closeBlockMarker (EventsBuf *ebuf) { StgInt8* save_pos; if (ebuf->marker) { // (type:16, time:64, size:32, end_time:64) save_pos = ebuf->pos; ebuf->pos = ebuf->marker + sizeof(EventTypeNum) + sizeof(EventTimestamp); postWord32(ebuf, save_pos - ebuf->marker); postTimestamp(ebuf); ebuf->pos = save_pos; ebuf->marker = NULL; } } void postBlockMarker (EventsBuf *eb) { if (!hasRoomForEvent(eb, EVENT_BLOCK_MARKER)) { printAndClearEventBuf(eb); } closeBlockMarker(eb); eb->marker = eb->pos; postEventHeader(eb, EVENT_BLOCK_MARKER); postWord32(eb,0); // these get filled in later by closeBlockMarker(); postWord64(eb,0); postCapNo(eb, eb->capno); } void postVersion(char *version) { ACQUIRE_LOCK(&eventBufMutex); postLogMsgF(&eventBuf, EVENT_VERSION, version); RELEASE_LOCK(&eventBufMutex); } void postProgramInvocation(char *commandline) { ACQUIRE_LOCK(&eventBufMutex); postLogMsgF(&eventBuf, EVENT_PROGRAM_INVOCATION, commandline); RELEASE_LOCK(&eventBufMutex); } #ifdef PARALLEL_RTS void postProcessEvent(EventProcessID pid, EventTypeNum tag) { ASSERT(tag == EVENT_CREATE_PROCESS || tag == EVENT_KILL_PROCESS); EventsBuf *eb; eb = &eventBuf; if (!hasRoomForEvent(eb, tag)) { // Flush event buffer to make room for new event. printAndClearEventBuf(eb); } postEventHeader(eb, tag); postProcessID(eb, pid); } void postAssignThreadToProcessEvent(Capability *cap, EventThreadID tid, EventProcessID pid) { EventsBuf *eb; eb = &capEventBuf[cap->no]; if (!hasRoomForEvent(eb, EVENT_ASSIGN_THREAD_TO_PROCESS)) { // Flush event buffer to make room for new event. printAndClearEventBuf(eb); } postEventHeader(eb, EVENT_ASSIGN_THREAD_TO_PROCESS); postThreadID(eb, tid); postProcessID(eb, pid); } void postCreateMachineEvent(EventMachineID pe, StgWord64 time,StgWord64 ticks, EventTypeNum tag) { ASSERT(tag == EVENT_CREATE_MACHINE); EventsBuf *eb; eb = &eventBuf; if (!hasRoomForEvent(eb,EVENT_CREATE_MACHINE)) { // Flush event buffer to make room for new event. printAndClearEventBuf(eb); } postEventHeaderTime(eb, tag, ticks); postMachineID(eb, pe); postInt64(eb, time); } void postKillMachineEvent(EventMachineID pe, EventTypeNum tag) { ASSERT(tag == EVENT_KILL_MACHINE); EventsBuf *eb; eb = &eventBuf; if (!hasRoomForEvent(eb,EVENT_KILL_MACHINE)) { // Flush event buffer to make room for new event. printAndClearEventBuf(eb); } postEventHeader(eb, tag); postMachineID(eb, pe); } void postSendMessageEvent(OpCode msgtag, rtsPackBuffer* buf) { EventsBuf *eb; eb = &eventBuf; if (!hasRoomForEvent(eb, EVENT_SEND_MESSAGE)) { // Flush event buffer to make room for new event. printAndClearEventBuf(eb); } postEventHeader(eb, EVENT_SEND_MESSAGE); postWord8(eb, msgtag); postProcessID(eb, buf->sender.process); postThreadID(eb, buf->sender.id); postMachineID(eb, buf->receiver.machine); postProcessID(eb, buf->receiver.process); postPortID(eb, buf->receiver.id); } void postReceiveMessageEvent(Capability *cap, OpCode msgtag, rtsPackBuffer* buf) { EventsBuf *eb; eb = &capEventBuf[cap->no]; if (!hasRoomForEvent(eb, EVENT_RECEIVE_MESSAGE)) { // Flush event buffer to make room for new event. printAndClearEventBuf(eb); } postEventHeader(eb, EVENT_RECEIVE_MESSAGE); postWord8(eb, msgtag); postProcessID(eb, buf->receiver.process); postPortID(eb, buf->receiver.id); postMachineID(eb, buf->sender.machine); postProcessID(eb, buf->sender.process); postThreadID(eb, buf->sender.id); postInt32(eb, buf->size); } #endif //PARALLEL_RTS void printAndClearEventBuf (EventsBuf *ebuf) { StgWord64 numBytes = 0, written = 0; closeBlockMarker(ebuf); if (ebuf->begin != NULL && ebuf->pos != ebuf->begin) { numBytes = ebuf->pos - ebuf->begin; written = fwrite(ebuf->begin, 1, numBytes, event_log_file); if (written != numBytes) { debugBelch( "printAndClearEventLog: fwrite() failed, written=%" FMT_Word64 " doesn't match numBytes=%" FMT_Word64, written, numBytes); return; } resetEventsBuf(ebuf); flushCount++; postBlockMarker(ebuf); } } void initEventsBuf(EventsBuf* eb, StgWord64 size, EventCapNo capno) { eb->begin = eb->pos = stgMallocBytes(size, "initEventsBuf"); eb->size = size; eb->marker = NULL; eb->capno = capno; } void resetEventsBuf(EventsBuf* eb) { eb->pos = eb->begin; eb->marker = NULL; } StgBool hasRoomForEvent(EventsBuf *eb, EventTypeNum eNum) { nat size; size = sizeof(EventTypeNum) + sizeof(EventTimestamp) + eventTypes[eNum].size; if (eb->pos + size > eb->begin + eb->size) { return 0; // Not enough space. } else { return 1; // Buf has enough space for the event. } } StgBool hasRoomForVariableEvent(EventsBuf *eb, nat payload_bytes) { nat size; size = sizeof(EventTypeNum) + sizeof(EventTimestamp) + sizeof(EventPayloadSize) + payload_bytes; if (eb->pos + size > eb->begin + eb->size) { return 0; // Not enough space. } else { return 1; // Buf has enough space for the event. } } void postEventType(EventsBuf *eb, EventType *et) { StgWord8 d; nat desclen; postInt32(eb, EVENT_ET_BEGIN); postEventTypeNum(eb, et->etNum); postWord16(eb, (StgWord16)et->size); desclen = strlen(et->desc); postWord32(eb, desclen); for (d = 0; d < desclen; ++d) { postInt8(eb, (StgInt8)et->desc[d]); } postWord32(eb, 0); // no extensions yet postInt32(eb, EVENT_ET_END); } #endif /* TRACING */