/* ---------------------------------------------------------------------------- * Time-stamp: * * GUM Low-Level Inter-Task Communication * * This module defines PVM Routines for PE-PE communication. * * P. Trinder, December 5th. 1994. * P. Trinder, July 1998 * H-W. Loidl, November 1999 - * * J.Berthold, Feb.2005 - generalisation: * This file now uses a "RatherLL"-Comm. interface specified by MPSystem.h * System issues like bringing up the parallel system or error-handling * are not located here any more. * OTOH, here is where buffering for prg. messages is implemented. --------------------------------------------------------------------------- */ /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ * ToDo: this file should be merged with PVMComms.c and MPIComms.c * +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */ #ifdef PARALLEL_RTS /* whole file */ //@node Low-Level Inter-Task Communication, , , //@section Low-Level Inter-Task Communication /* *This module defines the routines which communicate between PEs. The *code is based on Kevin Hammond's GRIP RTS. (OpCodes.h defines *PEOp1 etc. in terms of sendOp1 etc.). * *Routine & Arguments * & *sendOp & 0 \\ *sendOp1 & 1 \\ *sendOp2 & 2 \\ *sendOpN & vector \\ *sendOpV & variable \\ *sendOpNV & variable+ vector \\ * * * Restructuring for Generic RTS: (JB 2004) * The module is an intermediate layer between HLComms (messages properly) * and the message passing library (abstracted via MPSystem.h). * Layer implements the option to buffer messages, collect messages and * save real communication by sending whole "packets" together. * Comments refer to "messages" as the single things to transmit, * to "packets" as collections of messages, which are really sent. * The idea (explained further in subsequent comments): * maintain many buffers for sending, and one for receiving (think of them as objects), * send buffers when they are full, on message timeout or the caller requests immediate action, * receive messages by filling them into the unpacking context * This optimization is supposed to pay off by saving real sending action * (while increasing message size) * * Other Risks: spends much memory by allocating the maximum message size "-qQ" for each PE * */ //@menu //* Macros etc:: //* Includes:: //* Auxiliary functions:: //* Index:: //@end menu //@node Macros etc, Includes, GUM Low-Level Inter-Task Communication, GUM Low-Level Inter-Task Communication //@subsection Macros etc /* Evidently not Posix */ /* #include "PosixSource.h" */ //@node Includes, Auxiliary functions, Macros etc, GUM Low-Level Inter-Task Communication //@subsection Includes // included in LLC.h already... #include "Rts.h" #include "RtsFlags.h" #include "RtsUtils.h" #include "ParallelRts.h" #include "LLC.h" #include "MPSystem.h" // message passing abstractions // TODO: needed? // #include "ParallelRts.h" // DEBUG_HEADROOM #if defined(DEBUG) # include "ParallelDebug.h" #endif #ifdef __STDC__ #include #else #include #endif /* message buffering code: * we maintain a set of sendbuffers and receivebuffers (one for every PE) * * "sending" implies to preliminarily set the context to one of the send buffers. * - setContext(nat dest) * A packet of collected messages is sent: * a) when the buffer is full * b) when lifetime has expired * c) when the client requests it explicitly - sendImmediately(nat dest) * * Received data from a packet is read from the (previously filled) receive buffer. * (Buffer must contain actual message tag -> criteria for buffer activity) * * Buffer's data space must be >= (Gum packbufferSize (in words!) * + 2*PORTSIZE + HEADER) * (see definition DATASPACEWORDS in Parallel.h). This size depends on * the message format. * * Overall idea is to have buffer "objects" for communication with other PEs. * These objects are created on startup, collect messages and send their content * on request, on timeout or when they are full. */ // forward declarations for small helpers INLINE_HEADER nat roomLeft(void); INLINE_HEADER rtsBool timeToSend(void); static void reallySend(void); /* Message buffers: *******************/ static MessageBuffer *sendBuffers = NULL; // rings of buffers for all PEs static MessageBuffer *receiveBuffer = NULL; // (we must be able to send to ourselves, too) // contexts used to pack (this file) and unpack (higher levels) // we presume: unpackContext == NULL => not initialized // packContext!=NULL => currently packing a message static MessageBuffer *packContext=NULL; MessageBuffer *unpackContext=NULL; // externally accessed /* global statistics variables * Sending: no. of sent packets+packet size(accumulating), msgs. in packet (accumulating), * forced send operations, biggest packet+msgs. in it * Receiving: received packets+packet size, packet latency (accumulating) and biggest latency */ long sentPackets=0, sendSize=0, msgCount=0, forcedPackets=0, timeExceeded=0, biggestPacket=0, msgsInBiggest=0; long receivedPackets=0, recvSize=0, avgLatency=0, biggestLatency=0; /* Helpers: ************************************************************************* * lookupPE(): lookup buffer for given PE in given ring * Assumes: "ring" points to a real buffer ring (non-termination if not). * */ INLINE_HEADER MessageBuffer* lookupPE(MessageBuffer* ring, nat pe) { MessageBuffer* temp = ring; ASSERT(ring!=(MessageBuffer*)NULL); // if(ring == receiveBuffer){ // return ring; } // hack! do { temp = temp->next; if (temp->pe == pe) return temp; } while (temp != ring); IF_PAR_DEBUG(mpcomm, debugBelch("~~ sendBuffer for PE %x not found\n",pe)); return NULL; } /* showPacketStats() shows the statistics for the Low-Level messages (packets) * collected when operating on the buffers. */ void showPacketStats(void) { debugBelch("\nPacket statistics\n-----------------\n"); // how many packets have been sent ? debugBelch("Packets sent:\t\t%ld\n", sentPackets); // packets forced to be sent debugBelch("(forced sending of %ld packets, timeout for %ld packets)\n", forcedPackets, timeExceeded); if (sentPackets > 0) { // messages per packet (average = msgs/packets) debugBelch("Messages per packet (avg.):\t%ld\n",msgCount/sentPackets); // avg. packet size ( avg = sizes/packets) debugBelch("Average packet size:\t\t%ld words\n",sendSize/sentPackets); // biggest packet debugBelch("Biggest packet: \t%ld words\n\t\t\t %ld messages.\n",biggestPacket, msgsInBiggest); } // received packets debugBelch("\nPackets received:\t%ld\n", receivedPackets); if (receivedPackets > 0) { // avg. size debugBelch("(avg. size:\t%ld words)\n",recvSize/receivedPackets); // latency debugBelch("Packet latency (msec.):\t%ld (max.)\t%ld (avg.)\n",biggestLatency, avgLatency/receivedPackets); } } /* initialize packcontext and send buffers ************************************************************************************ * initMessageBuffers allocates space for the buffers to/from one PE and sets all needed fields * * The effect of calling initMessageBuffer n times is a ring structure, where sendBuffers * points to the *last* ring element. * ______________________ * | | * V | * sendBuffers -> n -> 1 -> 2 -> ... -> (n-1) * * This ring can dynamically be enlarged and reduced, since the selection of a packContext * uses a simple sequential list scan instead of a selection by PE-Number. * (acceptable payoff for dynamic configuration ability in my opinion. * We eliminate all assumptions on the order of the PE tids here.) * * This is old code from eden-5.02.3, not used outside this file at the moment. * See method "initCommBuffers" below for how new RTE uses them. */ // constructor for new buffer and insertion into given ring: static void insertNewBufferInRing(MessageBuffer **ring, nat pe) { MessageBuffer *newBuffer; // space for the administrative fields: newBuffer = (MessageBuffer*) stgMallocBytes(sizeof(MessageBuffer),"sendBuffer:admin"); newBuffer->age = newBuffer->active = 0; // invalidate activity fields newBuffer->pe = pe; // set receiver (MP-System's task ID). We // don't use PE-numbers in here // space for the data. Maximum space needed is the max.size of one // message, plus header. //fprintf(stderr, "This is LL check by mustsfa for every body \n\n"); newBuffer->position = newBuffer->PackData = (long*) stgMallocBytes(sizeof(long)*DATASPACEWORDS, "sendBuffer:data"); // insert into buffer ring: if (*ring == NULL) { // first buffer we ever create (must be PE 1) // create trivial ring with 1 element newBuffer->next = newBuffer; *ring = newBuffer; } else { // insert into existing ring // remember: sendBuffers points to the *last* buffer in the ring newBuffer->next = (*ring)->next; // link new buffer to 1st element in ring (*ring)->next = newBuffer; // link old last element to new buffer (*ring) = newBuffer; // point to new buffer (new last element) } IF_DEBUG(sanity, // TODO (if debugging needed): fill buffer data space with zeros ); } // new RTE: create all send buffers and one receive buffer in one operation rtsBool initCommBuffers(void) { nat i; // counter ASSERT(nPEs); // nPEs must be set before we do this! IF_PAR_DEBUG(mpcomm, debugBelch("~~ initCommBuffers (%d machines)..", nPEs)); // machines generally count from one to nPEs, as on language level for(i=1; i <= nPEs; i++) { // pes are numbered 1..nPEs-1 insertNewBufferInRing(&sendBuffers, i); // GlobalTaskID not used any more... } IF_PAR_DEBUG(mpcomm, debugBelch("~~ recv ..")); insertNewBufferInRing(&receiveBuffer,0); // one receive buffer, created by helper... unpackContext = receiveBuffer; IF_PAR_DEBUG(mpcomm, debugBelch("~~ sendBufferRing is:\n"); printSendBuffers(sendBuffers)); IF_PAR_DEBUG(mpcomm, debugBelch("~~ Done\n")); return rtsTrue; } void freeCommBuffers(void) { MessageBuffer *current, *next; IF_PAR_DEBUG(mpcomm, debugBelch("~~ freeing Comm.buffers\n")); ASSERT(sendBuffers); // we do have send buffers to free... current = sendBuffers->next; while (current != sendBuffers) { // set ref. to next, free space allocated for current sendBuffers->next = current->next; IF_PAR_DEBUG(mpcomm, debugBelch("~~ pack buffer to PE %d: %s\n", current->pe, current->position == current->PackData? "OK":"still contains data.")); stgFree(current->PackData); stgFree(current); current = sendBuffers->next; } // now invalidate sendBuffers (the only one remaining) IF_PAR_DEBUG(mpcomm, debugBelch("~~ pack buffer to PE %d: %s\n", current->pe, current->position == current->PackData? "OK":"still contains data.")); stgFree(current->PackData); stgFree(current); sendBuffers = NULL; stgFree(receiveBuffer->PackData); stgFree(receiveBuffer); } // in case a new machine joins an open system, we would call this one: void insertNewCommBuffer(nat pe) { if (lookupPE(sendBuffers, pe) == NULL) { insertNewBufferInRing(&sendBuffers, pe); debugBelch(" addded send buffer for PE %d at %x\n ", pe, sendBuffers); } else { barf("Parallel System Error: Machine-ID %d exists twice\n",pe); } } void printSendBuffers(MessageBuffer *ring) { MessageBuffer *currBuff = ring; if (ring==NULL) return; do { fprintf(stderr, "Buffer: [PE %d] @ %x with data @ %x; %s, age %d\n", currBuff->pe, currBuff, currBuff->PackData, (currBuff->active)?"ACTIVE":"INACTIVE", currBuff->age); currBuff=currBuff->next; } while (currBuff!=ring); } /* handling of send buffers: ************************************************************************** * set packcontext (including activity checks) * determine free data space in buffer * invalidate packcontext (and send on timeout) * really send a packet away (when it is full or time to send) * check for active buffers and send away on timeout */ /* * roomLeft determines the remaining space in a buffer * calculates in units of (long) = StgWord. * (assumes: packContext has been set before) *******************************************************/ INLINE_HEADER nat roomLeft(void) { int spent; spent = (packContext->position) - (packContext->PackData); ASSERT(DATASPACEWORDS > spent); ASSERT(spent >= 0); return (DATASPACEWORDS - (nat) spent); } /* setPackContext prepares for "sending" a message (filling it into a packet) * It is assumed that the message will really be packed into the packet right * after calling setPackContext (Opcode is already filled into the packet). * After packing things, the packContext must be invalidated. * * New RTE, abstracting over PVM: * If we use message buffering, the data space must leave space for the * packet header, so that we can send away a contiguous area in memory. */ static void setPackContext(OpCode op, nat forThatPe1, nat datasize) { nat forThatPe = forThatPe1; // FIXME: buffers in ring are numerated from 1 onwards? IF_PAR_DEBUG(paranoia, debugBelch("~~ setPackContext: msg. to task %x.\n", forThatPe); debugBelch("~~ sendBufferRing is:\n"); printSendBuffers(sendBuffers)); // semaphores could be placed here (mutex packContext) ASSERT(packContext == NULL); // set the global context: packContext = lookupPE(sendBuffers, forThatPe); ASSERT(packContext != NULL); // check for activity, set active if not previously active: if (!(packContext->active)) { ASSERT(packContext->position == packContext->PackData); // buffer shouldn't contain data! ASSERT(packContext->age == 0); // set buffer active, record creation time for sending after timeouts: packContext->age = msTime(); packContext->active = op; packContext->counter = 0; packContext->position += PACKET_HEADER ; // leave space for packet heade // fprintf(stderr, "\n\n\n packContext = %x \n" , packContext->active); } else { ASSERT((packContext->position) > packContext->PackData); // if message does not fit into buffer -> send away old packet ("reallySend") if (datasize+MSG_HEADER > roomLeft()) { reallySend(); // this invalidates the current packContext setPackContext(op, forThatPe, datasize); // reenter with same parameters fprintf(stderr,"\n Yes there is a room to pack this \n\n"); // (yes, we could also "goto ...") return; // don't pack opcode twice... } else { // or else we continue (separator, then next packet): *(packContext->position++) = NEXT_MSG; // fill in a packet separator fprintf(stderr,"\n NO there is a room to pack this \n\n"); } } // If we are here, we have started/continued a packet in the buffer. // prepare the message by filling in Opcode: *(packContext->position++)=op; packContext->counter++; IF_PAR_DEBUG(paranoia, debugBelch("~~ Pack Context set to %x with OpCode %s and target PE %u\n", forThatPe, getOpName(op), forThatPe)); } /* Other functions assume the context has been set * correctly and operate on this context (remember: OO-design): * * timeToSend() checks if the packet in the actual context must be sent now. * Either it is too old, or the packet is full (min.msg does not fit in). */ INLINE_HEADER rtsBool timeToSend(void) { rtsTime now; ulong timeout = RtsFlags.ParFlags.BufferTime; ASSERT(packContext != NULL); now=msTime(); // criteria: packet age or remaining packet space if ((now - (packContext->age) > timeout) || (roomLeft() < MIN_MSG)) { timeExceeded += (now - (packContext->age) > timeout); // increase if timeout sending return rtsTrue; } else return rtsFalse; } /* both "reallySend" and "invalidatePackContext" invalidate the packContext. * invalidatePackContext only sends if the packet is full (smallest message does not fit any more) * or if the time to collect has expired (the previously packed message ) */ static void invalidatePackContext(void) { IF_PAR_DEBUG(paranoia, debugBelch("~~ LLComms: invalidatePackContext\n")); ASSERT(packContext != NULL); if (!(RtsFlags.ParFlags.BufferTime) || timeToSend()) { reallySend(); // will complete the packet and send it, then invalidate the context } else { packContext=NULL; // nothing to do except for invalidating packContext // right place for releasing mutex semaphore would be here and in reallySend(). } return; } /* reallySend sends away the message(s) in the current packContext. * Either the next message does not fit in the packet, or it is too old, * or we do not use buffering. * * If message buffering is not used: * => send away message as it is in the buffer's data space, with * original tag. The message will *not* include any header * (PACKET_HEADER==0), but repeat the opCode. packData points to: * +---------------+ * | Message | * | opCode| data | * +---------------+ * * If we use message buffering: * => create packet with header (PACKET_HEADER) and send it, then * invalidate pack context. The packet format is: * +----------------------------------------------------------------------------+ * | Packet Header: | Messages (packet data): | * |start time| size | mach.load | msg1|NEXT_MSG|msg2|NEXT_MSG|...|END_OF_PACKET| * +----------------------------------------------------------------------------+ * and the data space in pack context leaves out the header size when packing * so that we have contiguous data space to send away (see setPackContext). */ static void reallySend(void) { long bufsize; long* dataptr; IF_PAR_DEBUG(paranoia, debugBelch("~~ LLComms::reallySend to %x (context at %p)\n", packContext->pe,packContext)); // we have a valid pack context: ASSERT(packContext != NULL); // there *is* something to send: ASSERT(packContext->active); ASSERT(packContext->counter > 0); if (!(RtsFlags.ParFlags.BufferTime)) { // do not use message buffers at all, skip header ASSERT(packContext->counter == 1); bufsize = (packContext->position) - (packContext->PackData); MP_send((int) packContext->pe, (OpCode) packContext->active, (long *) (packContext->PackData), (int) bufsize); } else { // rest: message buffering code... *(packContext->position++)= END_OF_PACKET; bufsize = (packContext->position) - (packContext->PackData); // buffer is active, so there must be something // else than header and end marker: ASSERT(bufsize > PACKET_HEADER+1); /* statistics stuff: if (RtsFlags.EdenFlags.stats >= COMM) { sendSize += bufsize; sentPackets++; msgCount += packContext->counter; if (bufsize > biggestPacket) { biggestPacket = bufsize; msgsInBiggest = packContext->counter; } } */ dataptr = packContext->PackData; // fill in packet header // TODO - move to separate method writePacketHeader(dataptr), customizable dataptr[0] = (long) packContext->age; dataptr[1] = bufsize; dataptr[2] = 0xdead; // load information // end of "writePacketHeader(dataptr)" // call to lower level MP-routine here: MP_send(packContext->pe, PP_PACKET, packContext->PackData, bufsize); } // common code for both buffering and without: invalidate packContext IF_PAR_DEBUG(paranoia, debugBelch("~~ LLComms: sent message packet to PE [%x]\n", packContext->pe)); // reset the buffer which we just sent away packContext->age = packContext->active = 0; packContext->position = packContext->PackData; // invalidate packContext (also the right place to free a semaphore) packContext = NULL; } /* sendImmediately can be called from anywhere outside the sending routines * (without bothering with the packContext). * It can be called to force quicker message passing in certain * situations (thread termination, other important messages). * If we don't have anything to send to this PE: ignore request. */ void sendImmediately(nat toThisPe) { // request immediate message to a PE IF_PAR_DEBUG(paranoia, debugBelch("~~ LLComms:Immediately sending to %d - ", toThisPe)); ASSERT(packContext == NULL); packContext = lookupPE(sendBuffers, toThisPe); if (!(packContext->active)) { // no action required (buffer empty) packContext = NULL; IF_PAR_DEBUG(paranoia, debugBelch("~~ Nothing to send\n")); return; } else { forcedPackets++; reallySend(); // send away and invalidate buffer and context } } /* sendOldBuffers() is called by the scheduler and realizes the sending timeout * All buffers are checked and sent when they become too old. */ void sendOldBuffers(void) { MessageBuffer *temp; IF_PAR_DEBUG(paranoia, debugBelch("~~ LLComms: Checking all send buffers for old messages:\n")); ASSERT(packContext == NULL); packContext = sendBuffers->next; while (packContext != sendBuffers) { temp = packContext; if ((packContext->active) && timeToSend()) reallySend(); packContext = temp->next; } packContext = NULL; // invalidate packContext } //@node Auxiliary functions, Index, Includes, GUM Low-Level Inter-Task Communication //@subsection Auxiliary functions /* * traceSendOp handles the tracing of messages. */ //@cindex traceSendOp static void traceSendOp(OpCode op, nat dest , unsigned int data1 , unsigned int data2) { IF_PAR_DEBUG(paranoia, debugBelch("~~ trace: %s [%x,%x] sent from %x to %x\n", getOpName(op), data1, data2, thisPE, dest)); } /* sending messages: **************************************************************************** * JB 2004, changes for new RTE: * Many sending routines now deal with the packContext instead of really sending. * We need all data of the message in contiguous memory to use MPSystem.h * Routines which do not mix a no. of arguments and a chunk of memory could * use MPSystem.h directly if message buffering is not used. * * Common parameter "nat task" is value between 1 and nPEs!!! */ /* * sendOp sends a 0-argument message with OpCode {\em op} to * the global task {\em task}. */ //@cindex sendOp void sendOp(OpCode op, nat task) { traceSendOp(op, task,0,0); setPackContext(op, task,0); // this also inserts the Opcode invalidatePackContext(); // sends away if timeToSend or unbuffered } /* * sendOp1 sends a 1-argument message with OpCode {\em op} * to the global task {\em task}. */ //@cindex sendOp1 void sendOp1(OpCode op, nat task, StgWord arg1) { traceSendOp(op, task, arg1,0); // here it is easy to build in direct calls to MPSystem: if (RtsFlags.ParFlags.BufferTime) { setPackContext(op, task,1); PutArg1(arg1); // copy arg to buffer invalidatePackContext(); } else { // unbuffered mode MP_send(task, op, (long*) &arg1, 1); } } /* * sendOp2 is used by the FP code only. */ //@cindex sendOp2 void sendOp2(OpCode op, nat task, StgWord arg1, StgWord arg2) { traceSendOp(op, task, arg1, arg2); setPackContext(op, task,2); PutArg1(arg1); PutArg2(arg2); invalidatePackContext(); } /* * * sendOpV takes a variable number of arguments, as specified by {\em n}. * For example, * * sendOpV( PP_STATS, StatsTask, 3, start_time, stop_time, sparkcount); */ //@cindex sendOpV void sendOpV(OpCode op, nat task, int n, ...) { va_list ap; int i; StgWord arg; va_start(ap, n); traceSendOp(op, task, 0, 0); setPackContext(op, task, (nat)n); // TODO: this should be done in setPackContext; in that case, drop +1's below PutArgN(0,task); for (i = 0; i < n; ++i) { arg = va_arg(ap, StgWord); PutArgN(i+1, arg); } va_end(ap); invalidatePackContext(); } /* * * sendOpNV takes a variable-size datablock, as specified by {\em * nelem} and a variable number of arguments, as specified by {\em * narg}. N.B. The datablock and the additional arguments are contiguous * and are copied over together. For example, * * sendOpNV(PP_RESUME, tsoga.pe, 6, nelem, data, * (W_) ga.weight, (W_) ga.loc.gc.gtid, (W_) ga.loc.gc.slot, * (W_) tsoga.weight, (W_) tsoga.loc.gc.gtid, (W_) tsoga.loc.gc.slot); * * Important: The variable arguments must all be StgWords. sendOpNV(_, tid, m, n, data, x1, ..., xm): | n elems +------------------------------ | x1 | ... | xm | n | data .... +------------------------------ */ //@cindex sendOpNV void sendOpNV(OpCode op, nat task, int nelem, StgWord *datablock, int narg, ...) { va_list ap; int i; StgWord arg; va_start(ap, narg); traceSendOp(op, task, 0, 0); IF_PAR_DEBUG(paranoia, debugBelch("~~ sendOpNV: op = %x (%s), PE = %x, narg = %d, nelem = %d\n", op, getOpName(op), task, narg, nelem)); setPackContext(op, task,nelem+narg+1); // TODO: this should be done in setPackContext; in that case, drop +1's below PutArgN(0,task); for (i = 0; i < narg; ++i) { arg = va_arg(ap, StgWord); IF_PAR_DEBUG(paranoia, fprintf(stderr,"~~ sendOpNV: arg = %d\n",arg)); PutArgN(i+1,arg); } arg = (StgWord) nelem; PutArgN(narg+1, arg); /* for (i=0; i < nelem; ++i) fprintf(stderr, "%d ",datablock[i]); */ /* fprintf(stderr," in sendOpNV\n");*/ PutArgs(datablock, nelem); va_end(ap); invalidatePackContext(); } /* * sendOpN take a variable size array argument, whose size is given by * {\em n}. For example, * * sendOpN( PP_STATS, StatsTask, 3, stats_array); */ //@cindex sendOpN void sendOpN(OpCode op, nat task, int n, StgPtr args) { long arg; traceSendOp(op, task, 0, 0); setPackContext(op, task,(nat) n+1); // TODO: this should be done in setPackContext; in that case, drop +1's below PutArgN(0,task); arg = (long) n; PutArgN(0+1, arg); PutArgs(args, n); invalidatePackContext(); } /* receiving messages: ****************************************************************************** !!!!!!!!!!TODO: update the description!!!!!!!!!!! */ /* JB: optimization (buffered receiving), Handling of receive Buffers: ********************************************************************************** * The idea: maintain an unpacking context (buffer) independently from the * MP-System, set this context to the actual message on request (getPacket) * and supply data to unpacking functions from this buffer (Attention: unprotected!) * * Receive Buffers are filled when checking for messages to process. Once filled, * with a packet of messages, the field "active" will contain the actual Opcode * (to allow several calls to "getOpcodeAndSender"). Buffers are checked RR for * messages to process (could be changed, would even save memory space, but * wouldn't guarantee fair processing). * The buffered implementation must implement every function in LLC.h for receiving * data. Similar to the packing code, it uses an unpackContext which is set * when getting a packet (PacketsWaiting/getPacket must preceed every message * processing action). * * Update Oct.2004: The inefficiencies of receive buffers lead to experiments * with a version that uses only one receive buffer (define NORECVBUFFER). * This receive buffer is always completely processed when a packet has arrived. * Adapted to using just one buffer: startNewPacket, PacketsWaiting, ... * */ // read ahead in the buffer (to see if another packet follows) INLINE_HEADER long nextSep(void) { long* readpos = unpackContext->position; long sep=0; // we don't want to run out of the current buffer's dataspace: ASSERT(readpos <= (unpackContext->PackData + (unpackContext->counter))); // scan remains of previous message in buffer (could be only halfway unpacked...) do { sep = *readpos++; } while ( sep != NEXT_MSG && sep != END_OF_PACKET ); return sep; } /* send actions to implement (interface LLC.h): * * int GetPacket(); -- blocking receive * rtsBool PacketsWaiting(); -- probing only, not receiving * void GetArg*(...) -- data retrieval, * #define'd in LLC.h (using memcpy on unpackContext) * void getOpcodeAndSender -- just a selector on unpackContext */ /* Buffer mechanism: * start a new packet of messages by * - receiving the packet, blocking receive done in MPSystem * - consuming packet header * - setting messageBuffer fields appropriately * assumes: there *is* a packet for this buffer (see above) and the buffer is empty */ static void startNewPacket(void) { IF_PAR_DEBUG(paranoia, debugBelch("~~ LLComms: starting a packet of messages.\n")); ASSERT(unpackContext); // receive packet from MPSystem MP_recv(DATASPACEWORDS, unpackContext->PackData, &(unpackContext->active), &(unpackContext->pe)); ASSERT(1 <= unpackContext->pe && unpackContext->pe <= nPEs); // PE already set unpackContext->position = unpackContext->PackData; // Handle (unbuffered) system messages from level below separately, // select this case over the received opcode. if (unpackContext->active != PP_PACKET) { // we have a system message: ASSERT(ISSYSCODE(unpackContext->active)); // nothing to do (no header included) } else { // if it is a packet: consume header first ASSERT(unpackContext->active == PP_PACKET); // should be a packet! // TODO: put into own method "mungePacketHeader()" unpackContext->age = *unpackContext->position++; unpackContext->counter = *unpackContext->position++; unpackContext->position++; // machine load // end of header } // First message starts by its OpCode, before the included data (see // layout above). unpackContext->active = *unpackContext->position++; ASSERT(ISOPCODE(unpackContext->active)); IF_PAR_DEBUG(paranoia, debugBelch("~~ LLComms: packet from %x loaded",unpackContext->pe)); } /* packetsWaiting (just pvm_probe in the old implementation) must do * some more than MP_probe in case of buffering: * * It also searches the receive buffer to see if another message follows. */ rtsBool PacketsWaiting(void) { ASSERT(receiveBuffer && unpackContext); // unbuffered case, only MP_probe: if (!RtsFlags.ParFlags.BufferTime) return MP_probe(); if (MP_probe()) // next packet already waiting, all fine. return rtsTrue; // buffer active: parse for if ((unpackContext->active) && (nextSep() == NEXT_MSG)) return rtsTrue; // another message in active buffer // if we are here: no active buffers, no messages // buffer spent, should be invalidated... unpackContext->active = 0; unpackContext->pe = 0; unpackContext->counter=0; return rtsFalse; } /* This method prepares a new message in the unpackContext to be * processed. The message is either retrieved from the MP-System * directly (no buffering) or the unpackContext contains a packet of * messages (buffering), and the next one is prepared to be processed. * * Methods from MPSystem.h guarantee that system messages have priority. * If messages are buffered, this cannot always be guaranteed. */ int GetPacket(void) { IF_PAR_DEBUG(paranoia, debugBelch("~~ LLComms: GetPacket\n")); ASSERT(receiveBuffer && unpackContext); // buffered case: check current buffer first if (RtsFlags.ParFlags.BufferTime) { IF_PAR_DEBUG(paranoia, debugBelch("~~ buffered, checking context")); if (unpackContext->active) { // buffer was active before long sep; OpCode op; // TODO: somewhat inefficient... sep = nextSep(); // look ahead if (sep == NEXT_MSG) {// another message follows in the packet: IF_PAR_DEBUG(paranoia, debugBelch("~~ previous packet still active, preparing")); // consume remains of previous message in buffer (could be only halfway unpacked...) while ( *(unpackContext->position)!=sep ) unpackContext->position++; unpackContext->position++; // consume separator op = *(unpackContext->position++); // get and set Opcode ASSERT(ISOPCODE(op)); unpackContext->active=op; return 1; // separator consumed, opcode set, ready to unpack. } else { // no new message: must be end marker: invalidate current context IF_PAR_DEBUG(paranoia, debugBelch("~~ previous packet becomes inactive")); ASSERT(sep == END_OF_PACKET); unpackContext->active = 0; // set buffer inactive unpackContext->pe = 0; // invalidate sender unpackContext->counter = 0; // reset datasize } // continue as if it had been inactive before: } // (blocking) receive one message from MPSystem.h, // munge header, // prepare to consume by setting sender and opcode startNewPacket(); } else { IF_PAR_DEBUG(paranoia, debugBelch("~~ unbuffered, loading message\n")); MP_recv(DATASPACEWORDS, unpackContext->PackData, &(unpackContext->active), &(unpackContext->pe)); unpackContext->age = msTime(); ASSERT(1 <= unpackContext->pe && unpackContext->pe <= nPEs); ASSERT(ISOPCODE(unpackContext->active)); unpackContext->position = unpackContext->PackData; // no header, set header values "manually" unpackContext->age = msTime(); unpackContext->counter = 1; // opcode is repeated in the dataspace, move forward one step ASSERT((long) unpackContext->active == *unpackContext->position); unpackContext->position++; } IF_PAR_DEBUG(paranoia, debugBelch("~~ GetPacket finished\n")); return 1; } void getOpcodeAndSender (OpCode *popcode, nat *psender_id) { *popcode = unpackContext->active; *psender_id = unpackContext->pe; return; } #endif /* PARALLEL_HASKELL -- whole file */