/* ----------------------------------------------------------------------------
 * Time-stamp: <Fri Dec 24 2010 15:28:02 Stardate: Stardate: [-28]4188.22 hwloidl>
 *
 * GUM Low-Level Inter-Task Communication
 *
 * This module defines PVM Routines for PE-PE  communication.
 *
 * P. Trinder, December 5th. 1994.
 * P. Trinder, July 1998
 * H-W. Loidl, November 1999 -
 *
 * J.Berthold, Feb.2005 - generalisation:
 *  This file now uses a "RatherLL"-Comm. interface specified by MPSystem.h
 *  System issues like bringing up the parallel system or error-handling 
 *  are not located here any more.
 *  OTOH, here is where buffering for prg. messages is implemented.
 --------------------------------------------------------------------------- */

/* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 * ToDo: this file should be merged with PVMComms.c and MPIComms.c
 * +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 */

#ifdef PARALLEL_RTS /* whole file */

//@node  Low-Level Inter-Task Communication, , ,
//@section Low-Level Inter-Task Communication

/*
 *This module defines the routines which communicate between PEs.  The
 *code is based on Kevin Hammond's GRIP RTS. (OpCodes.h defines
 *PEOp1 etc. in terms of sendOp1 etc.).  
 *
 *Routine	&	Arguments 
 *		&		
 *sendOp	&	0			\\
 *sendOp1	&	1			\\
 *sendOp2	&	2			\\
 *sendOpN	&	vector			\\
 *sendOpV	&	variable		\\
 *sendOpNV	&	variable+ vector	\\
 *
 *
 * Restructuring for Generic RTS: (JB 2004)
 *  The module is an intermediate layer between HLComms (messages properly)
 *  and the message passing library (abstracted via MPSystem.h).
 *  Layer implements the option to buffer messages, collect messages and 
 *  save real communication by sending whole "packets" together.
 *  Comments refer to "messages" as the single things to transmit, 
 *                 to "packets" as collections of messages, which are really sent.
 * The idea (explained further in subsequent comments):
 *   maintain many buffers for sending, and one for receiving (think of them as objects),
 *   send buffers when they are full, on message timeout or the caller requests immediate action,
 *   receive messages by filling them into the unpacking context
 * This optimization is supposed to pay off by saving real sending action
 *   (while increasing message size)
 *
 * Other Risks: spends much memory by allocating the maximum message size "-qQ<n>" for each PE
 * 
 */

//@menu
//* Macros etc::		
//* Includes::			
//* Auxiliary functions::	
//* Index::			
//@end menu

//@node Macros etc, Includes, GUM Low-Level Inter-Task Communication, GUM Low-Level Inter-Task Communication
//@subsection Macros etc

/* Evidently not Posix */
/* #include "PosixSource.h" */

//@node Includes, Auxiliary functions, Macros etc, GUM Low-Level Inter-Task Communication
//@subsection Includes

// included in LLC.h already...
#include "Rts.h"
#include "RtsFlags.h"
#include "RtsUtils.h"

#include "ParallelRts.h"
#include "LLC.h"

#include "MPSystem.h" // message passing abstractions

// TODO: needed?
// #include "ParallelRts.h" // DEBUG_HEADROOM
 #if defined(DEBUG)
 # include "ParallelDebug.h"
 #endif

#ifdef __STDC__
#include <stdarg.h>
#else
#include <varargs.h>
#endif


/* message buffering code: 
 * we maintain a set of sendbuffers and receivebuffers (one for every PE)
 *
 * "sending" implies to preliminarily set the context to one of the send buffers. 
 *              - setContext(nat dest)
 * A packet of collected messages is sent:
 *  a) when the buffer is full
 *  b) when lifetime has expired
 *  c) when the client requests it explicitly - sendImmediately(nat dest)
 *
 * Received data from a packet is read from the (previously filled) receive buffer.
 * (Buffer must contain actual message tag -> criteria for buffer activity)
 * 
 * Buffer's data space must be >= (Gum packbufferSize (in words!)
 *                                 + 2*PORTSIZE + HEADER) 
 * (see definition DATASPACEWORDS in Parallel.h). This size depends on
 * the message format.
 *
 * Overall idea is to have buffer "objects" for communication with other PEs.
 *  These objects are created on startup, collect messages and send their content 
 *  on request, on timeout or when they are full.
 */


// forward declarations for small helpers
INLINE_HEADER nat roomLeft(void);
INLINE_HEADER rtsBool timeToSend(void);
static void reallySend(void);

/* Message buffers:
*******************/
static MessageBuffer *sendBuffers    = NULL; // rings of buffers for all PEs
static MessageBuffer *receiveBuffer = NULL; // (we must be able to send to ourselves, too)

//  contexts used to pack (this file) and unpack (higher levels)
//     we presume: unpackContext == NULL => not initialized
//                 packContext!=NULL => currently packing a message
static MessageBuffer *packContext=NULL;      
MessageBuffer *unpackContext=NULL;    // externally accessed

/* global statistics variables
 *  Sending: no. of sent packets+packet size(accumulating), msgs. in packet (accumulating),
 *     forced send operations, biggest packet+msgs. in it
 *  Receiving: received packets+packet size, packet latency (accumulating) and biggest latency 
 */
long sentPackets=0, sendSize=0, msgCount=0, forcedPackets=0, timeExceeded=0, biggestPacket=0, msgsInBiggest=0;
long receivedPackets=0, recvSize=0, avgLatency=0, biggestLatency=0;

/* Helpers: 
 *************************************************************************
 * lookupPE(): lookup buffer for given PE in given ring
 * Assumes: "ring" points to a real buffer ring (non-termination if not).
 *
 */
INLINE_HEADER
MessageBuffer* lookupPE(MessageBuffer* ring, nat pe) {
  MessageBuffer* temp = ring;

  ASSERT(ring!=(MessageBuffer*)NULL);
 
  // if(ring == receiveBuffer){
  //   return ring; } // hack! 
  do { 
    temp = temp->next;
    if (temp->pe == pe)
	return temp;
  } while (temp != ring);
  IF_PAR_DEBUG(mpcomm,
	       debugBelch("~~  sendBuffer for PE %x not found\n",pe)); 
  return NULL;
}

/* showPacketStats() shows the statistics for the Low-Level messages (packets) 
 * collected when operating on the buffers.
 */
void showPacketStats(void) {

  debugBelch("\nPacket statistics\n-----------------\n");

  // how many packets have been sent ?
  debugBelch("Packets sent:\t\t%ld\n", sentPackets);
  // packets forced to be sent
  debugBelch("(forced sending of %ld packets, timeout for %ld packets)\n",
	  forcedPackets, timeExceeded);
  if (sentPackets > 0) {
    // messages per packet (average = msgs/packets)
    debugBelch("Messages per packet (avg.):\t%ld\n",msgCount/sentPackets);
    // avg. packet size    ( avg = sizes/packets)
    debugBelch("Average packet size:\t\t%ld words\n",sendSize/sentPackets);

    // biggest packet
    debugBelch("Biggest packet: \t%ld words\n\t\t\t %ld messages.\n",biggestPacket, msgsInBiggest);
  }
  // received packets
  debugBelch("\nPackets received:\t%ld\n", receivedPackets);
  if (receivedPackets > 0) {
    // avg. size
    debugBelch("(avg. size:\t%ld words)\n",recvSize/receivedPackets);
    // latency
    debugBelch("Packet latency (msec.):\t%ld (max.)\t%ld (avg.)\n",biggestLatency, avgLatency/receivedPackets);
  }
}

/* initialize packcontext and send buffers
************************************************************************************
 *  initMessageBuffers allocates space for the buffers to/from one PE and sets all needed fields
 *
 * The effect of calling initMessageBuffer n times is a ring structure, where sendBuffers
 * points to the *last* ring element. 
 *                   ______________________
 *                  |                      |
 *                  V                      |
 *   sendBuffers -> n -> 1 -> 2 -> ... -> (n-1)
 *
 * This ring can dynamically be enlarged and reduced, since the selection of a packContext 
 * uses a simple sequential list scan instead of a selection by PE-Number.
 * (acceptable payoff for dynamic configuration ability in my opinion. 
 * We eliminate all assumptions on the order of the PE tids here.)
 *
 * This is old code from eden-5.02.3, not used outside this file at the moment.
 * See method "initCommBuffers" below for how new RTE uses them.
 */

// constructor for new buffer and insertion into given ring:
static
void insertNewBufferInRing(MessageBuffer **ring, nat pe) {
  MessageBuffer *newBuffer;

  // space for the administrative fields:
  newBuffer = (MessageBuffer*) stgMallocBytes(sizeof(MessageBuffer),"sendBuffer:admin");
  newBuffer->age = newBuffer->active = 0; // invalidate activity fields
  newBuffer->pe = pe;    // set receiver (MP-System's task ID). We
			 // don't use PE-numbers in here

  // space for the data. Maximum space needed is the max.size of one
  // message, plus header.
  //fprintf(stderr, "This is LL check by mustsfa for every  body \n\n");  
  newBuffer->position = newBuffer->PackData = 
           (long*) stgMallocBytes(sizeof(long)*DATASPACEWORDS,
				  "sendBuffer:data");

  // insert into buffer ring:
  if (*ring == NULL) { // first buffer we ever create (must be PE 1)
    // create trivial ring with 1 element
    newBuffer->next = newBuffer;
    *ring = newBuffer;
  } else { // insert into existing ring
    // remember: sendBuffers points to the *last* buffer in the ring
    newBuffer->next = (*ring)->next; // link new buffer to 1st element in ring
    (*ring)->next = newBuffer;       // link old last element to new buffer 
    (*ring) = newBuffer;             // point to new buffer (new last element)
  }

  IF_DEBUG(sanity,
	   // TODO (if debugging needed): fill buffer data space with zeros
	   );
}

// new RTE: create all send buffers and one receive buffer in one operation
rtsBool initCommBuffers(void) {
  nat i; // counter

  ASSERT(nPEs); // nPEs must be set before we do this!

  IF_PAR_DEBUG(mpcomm,
	       debugBelch("~~ initCommBuffers (%d machines)..", nPEs));
  
  // machines generally count from one to nPEs, as on language level
  for(i=1; i <= nPEs; i++) { // pes are numbered 1..nPEs-1
    insertNewBufferInRing(&sendBuffers, i); // GlobalTaskID not used any more...
 }

  IF_PAR_DEBUG(mpcomm,
	       debugBelch("~~ recv .."));
  insertNewBufferInRing(&receiveBuffer,0); // one receive buffer, created by helper...
  unpackContext = receiveBuffer;

  IF_PAR_DEBUG(mpcomm, 
	       debugBelch("~~  sendBufferRing is:\n");
	       printSendBuffers(sendBuffers));
  IF_PAR_DEBUG(mpcomm,
	       debugBelch("~~ Done\n"));

  return rtsTrue;
}

void freeCommBuffers(void) {
  MessageBuffer *current, *next;

  IF_PAR_DEBUG(mpcomm,
	       debugBelch("~~ freeing Comm.buffers\n"));
  ASSERT(sendBuffers); // we do have send buffers to free...
  current = sendBuffers->next;
  while (current != sendBuffers) {
    // set ref. to next, free space allocated for current
    sendBuffers->next = current->next;
    IF_PAR_DEBUG(mpcomm,
		 debugBelch("~~ pack buffer to PE %d: %s\n",
			    current->pe, 
			    current->position == current->PackData?
			        "OK":"still contains data."));
    stgFree(current->PackData);
    stgFree(current);
    current = sendBuffers->next;
  } 
  // now invalidate sendBuffers (the only one remaining)
  IF_PAR_DEBUG(mpcomm,
	       debugBelch("~~ pack buffer to PE %d: %s\n",
			  current->pe, 
			  current->position == current->PackData?
			  "OK":"still contains data."));
  stgFree(current->PackData);
  stgFree(current);
  sendBuffers = NULL;

  stgFree(receiveBuffer->PackData);
  stgFree(receiveBuffer);
}

// in case a new machine joins an open system, we would call this one:
void insertNewCommBuffer(nat pe) {
  if (lookupPE(sendBuffers, pe) == NULL) {
    insertNewBufferInRing(&sendBuffers, pe);
    debugBelch(" addded send buffer for PE %d at %x\n ", pe, sendBuffers);
  } else {
    barf("Parallel System Error: Machine-ID %d exists twice\n",pe);
  }
}

void printSendBuffers(MessageBuffer *ring) {
  MessageBuffer *currBuff = ring;

  if (ring==NULL) return;
  do {
    fprintf(stderr, "Buffer: [PE %d] @ %x with data @ %x;  %s, age %d\n",
	    currBuff->pe, currBuff, currBuff->PackData, (currBuff->active)?"ACTIVE":"INACTIVE", currBuff->age);
    currBuff=currBuff->next;
  } while (currBuff!=ring);
}


/* handling of send buffers:
**************************************************************************
 *  set packcontext (including activity checks)
 *  determine free data space in buffer
 *  invalidate packcontext (and send on timeout)
 *  really send a packet away (when it is full or time to send)
 *  check for active buffers and send away on timeout
 */

/*
 * roomLeft determines the remaining space in a buffer
 * calculates in units of (long) = StgWord.
 * (assumes: packContext has been set before)
 *******************************************************/
INLINE_HEADER 
nat roomLeft(void) {
  int spent;

  spent  = (packContext->position) - (packContext->PackData);
  ASSERT(DATASPACEWORDS > spent);
  ASSERT(spent >= 0);
  return (DATASPACEWORDS - (nat) spent);
}

/* setPackContext prepares for "sending" a message (filling it into a packet)
 *  It is assumed that the message will really be packed into the packet right 
 *  after calling setPackContext (Opcode is already filled into the packet).
 *  After packing things, the packContext must be invalidated.
 *
 * New RTE, abstracting over PVM:
 *  If we use message buffering, the data space must leave space for the 
 *  packet header, so that we can send away a contiguous area in memory.
 */

static
void setPackContext(OpCode op, nat forThatPe1, nat datasize) {
  nat forThatPe = forThatPe1;  // FIXME: buffers in ring are numerated from 1 onwards?

  IF_PAR_DEBUG(paranoia, 
	       debugBelch("~~  setPackContext: msg. to task %x.\n", forThatPe);
	       debugBelch("~~  sendBufferRing is:\n");
	       printSendBuffers(sendBuffers));
  // semaphores could be placed here (mutex packContext)
  ASSERT(packContext == NULL);
  // set the global context:
  packContext = lookupPE(sendBuffers, forThatPe);
  ASSERT(packContext != NULL);
  // check for activity, set active if not previously active:
  if (!(packContext->active)) { 
    ASSERT(packContext->position == packContext->PackData); // buffer shouldn't contain data!
    ASSERT(packContext->age == 0);

    // set buffer active, record creation time for sending after timeouts:
    packContext->age = msTime();
    packContext->active = op;
    packContext->counter = 0;
    packContext->position += PACKET_HEADER  ; // leave space for packet heade
    // fprintf(stderr, "\n\n\n  packContext = %x  \n" ,  packContext->active); 
    } else {
    ASSERT((packContext->position) > packContext->PackData);

    // if message does not fit into buffer -> send away old packet ("reallySend")
    if (datasize+MSG_HEADER > roomLeft()) { 
        reallySend(); // this invalidates the current packContext
        setPackContext(op, forThatPe, datasize); // reenter with same parameters 
        fprintf(stderr,"\n Yes there is a room  to  pack this \n\n"); 
	                                         // (yes, we could also "goto ...")
        return; // don't pack opcode twice...
    } else { // or else we continue (separator, then next packet):
       *(packContext->position++) = NEXT_MSG; // fill in a packet separator
    fprintf(stderr,"\n NO there is a room  to  pack this \n\n");
    }
  }
  // If we are here, we have started/continued a packet in the buffer.
  // prepare the message by filling in Opcode:
  *(packContext->position++)=op;
   packContext->counter++;
  IF_PAR_DEBUG(paranoia, 
	       debugBelch("~~  Pack Context set to %x with OpCode %s and target PE %u\n", 
			  forThatPe, getOpName(op), forThatPe));
}

/* Other functions assume the context has been set 
 * correctly and operate on this context (remember: OO-design):
 * 
 * timeToSend() checks if the packet in the actual context must be sent now.
 *  Either it is too old, or the packet is full (min.msg does not fit in).
 */
INLINE_HEADER
rtsBool timeToSend(void) {
  rtsTime now;
  ulong timeout = RtsFlags.ParFlags.BufferTime;

  ASSERT(packContext != NULL);

  now=msTime();

  // criteria: packet age or remaining packet space
  if ((now - (packContext->age) > timeout) || (roomLeft() < MIN_MSG)) {
    timeExceeded += (now - (packContext->age) > timeout); // increase if timeout sending
    return rtsTrue;
  } else return rtsFalse;
}

/* both "reallySend" and "invalidatePackContext" invalidate the packContext.
 * invalidatePackContext only sends if the packet is full (smallest message does not fit any more)
 * or if the time to collect has expired (the previously packed message )
 */
static 
void invalidatePackContext(void) {
  IF_PAR_DEBUG(paranoia, 
	       debugBelch("~~  LLComms: invalidatePackContext\n"));
  ASSERT(packContext != NULL);
  if (!(RtsFlags.ParFlags.BufferTime) || timeToSend()) {  
    reallySend(); // will complete the packet and send it, then invalidate the context
  } else { 
    packContext=NULL; // nothing to do except for invalidating packContext
    // right place for releasing mutex semaphore would be here and in reallySend().
  }
  return;
}

/* reallySend sends away the message(s) in the current packContext. 
 *   Either the next message does not fit in the packet, or it is too old,
 *   or we do not use buffering.
 *
 * If message buffering is not used:
 * => send away message as it is in the buffer's data space, with
 *    original tag. The message will *not* include any header
 *    (PACKET_HEADER==0), but repeat the opCode. packData points to:
 * +---------------+
 * |    Message    |
 * | opCode| data  |
 * +---------------+
 *
 * If we use message buffering:
 *   => create packet with header (PACKET_HEADER) and send it, then
 *   invalidate pack context. The packet format is:
 * +----------------------------------------------------------------------------+
 * |  Packet Header:             |   Messages (packet data):                    |
 * |start time| size | mach.load | msg1|NEXT_MSG|msg2|NEXT_MSG|...|END_OF_PACKET|
 * +----------------------------------------------------------------------------+
 *  and the data space in pack context leaves out the header size when packing
 *  so that we have contiguous data space to send away (see setPackContext).
 */
static void reallySend(void) {
  long bufsize;
  long* dataptr;

  IF_PAR_DEBUG(paranoia, 
	       debugBelch("~~  LLComms::reallySend to %x (context at %p)\n",
			  packContext->pe,packContext));
  // we have a valid pack context:
  
  ASSERT(packContext != NULL);

  // there *is* something to send:
  ASSERT(packContext->active);

  ASSERT(packContext->counter > 0);

 if (!(RtsFlags.ParFlags.BufferTime)) {
    // do not use message buffers at all, skip header
     
    ASSERT(packContext->counter == 1);
    bufsize = (packContext->position) - (packContext->PackData);
    
    MP_send((int) packContext->pe, (OpCode) packContext->active, 
	    (long *) (packContext->PackData), 
	    (int)  bufsize); 
 
 } else {

  // rest: message buffering code...
  *(packContext->position++)= END_OF_PACKET;

  bufsize = (packContext->position) - (packContext->PackData);
  //  buffer is active, so there must be something 
  //  else than header and end marker:
  ASSERT(bufsize > PACKET_HEADER+1);

  /* statistics stuff:
  if (RtsFlags.EdenFlags.stats >= COMM) {
    sendSize += bufsize;
    sentPackets++;
    msgCount += packContext->counter;
    if (bufsize > biggestPacket) {
      biggestPacket = bufsize;
      msgsInBiggest = packContext->counter;
    }
  }
  */

  dataptr = packContext->PackData;
  
  // fill in packet header
  // TODO - move to separate method writePacketHeader(dataptr), customizable
  dataptr[0] = (long) packContext->age;
  dataptr[1] = bufsize;
  dataptr[2] = 0xdead; // load information
  // end of "writePacketHeader(dataptr)"

  // call to lower level MP-routine here:
  MP_send(packContext->pe, PP_PACKET,
	  packContext->PackData, bufsize);
 }
 // common code for both buffering and without: invalidate packContext
  IF_PAR_DEBUG(paranoia, 
	       debugBelch("~~  LLComms: sent message packet to PE [%x]\n", 
		       packContext->pe));
  // reset the buffer which we just sent away
  packContext->age = packContext->active = 0;
  packContext->position = packContext->PackData;

  // invalidate packContext (also the right place to free a semaphore)
  packContext = NULL;
}

/* sendImmediately can be called from anywhere outside the sending routines 
 * (without bothering with the packContext).
 * It can be called to force quicker message passing in certain 
 * situations (thread termination, other important messages).
 * If we don't have anything to send to this PE: ignore request.
 */ 
void sendImmediately(nat toThisPe) { // request immediate message to a PE
  
  IF_PAR_DEBUG(paranoia, 
	       debugBelch("~~  LLComms:Immediately sending to %d - ", toThisPe));

  ASSERT(packContext == NULL);

  packContext = lookupPE(sendBuffers, toThisPe);

  if (!(packContext->active)) { // no action required (buffer empty)
    packContext = NULL;
    IF_PAR_DEBUG(paranoia, 
		 debugBelch("~~  Nothing to send\n"));
    return;
  } else {
    forcedPackets++;
    reallySend(); // send away and invalidate buffer and context
  }
}

/* sendOldBuffers() is called by the scheduler and realizes the sending timeout
 * All buffers are checked and sent when they become too old.
 */
void sendOldBuffers(void) {
  MessageBuffer *temp;

  IF_PAR_DEBUG(paranoia, 
	       debugBelch("~~  LLComms: Checking all send buffers for old messages:\n"));

  ASSERT(packContext == NULL);

  packContext = sendBuffers->next;
  
  while (packContext != sendBuffers) {
    temp = packContext;
    if ((packContext->active) && timeToSend()) 
        reallySend();
    packContext = temp->next;
  }
  packContext = NULL; // invalidate packContext
}


//@node Auxiliary functions, Index, Includes, GUM Low-Level Inter-Task Communication
//@subsection Auxiliary functions


/*
 * traceSendOp handles the tracing of messages. 
 */

//@cindex traceSendOp
static void
traceSendOp(OpCode op, nat dest ,
	     unsigned int data1 , unsigned int data2)
{
    IF_PAR_DEBUG(paranoia,
		 debugBelch("~~  trace: %s [%x,%x] sent from %x to %x\n", 
			    getOpName(op), data1, data2, thisPE, dest));
}

/* sending messages:
****************************************************************************
 * JB 2004, changes for new RTE:
 * Many sending routines now deal with the packContext instead of really sending.
 *  We need all data of the message in contiguous memory to use MPSystem.h
 * Routines which do not mix a no. of arguments and a chunk of memory could 
 * use MPSystem.h directly if message buffering is not used.
 *
 * Common parameter "nat task" is value between 1 and nPEs!!!
 */


/*
 * sendOp sends a 0-argument message with OpCode {\em op} to
 * the global task {\em task}.
 */

//@cindex sendOp
void
sendOp(OpCode op, nat task)
{
    traceSendOp(op, task,0,0);

    setPackContext(op, task,0); // this also inserts the Opcode
    invalidatePackContext(); // sends away if timeToSend or unbuffered
}

/*
 * sendOp1 sends a 1-argument message with OpCode {\em op}
 * to the global task {\em task}.
 */

//@cindex sendOp1
void
sendOp1(OpCode op, nat task, StgWord arg1)
{
    traceSendOp(op, task, arg1,0);

// here it is easy to build in direct calls to MPSystem:
    if (RtsFlags.ParFlags.BufferTime) {
      
      setPackContext(op, task,1);
      PutArg1(arg1); // copy arg to buffer
      invalidatePackContext();

    } else {
      // unbuffered mode
      MP_send(task, op, (long*) &arg1, 1);
    }
}


/*
 * sendOp2 is used by the FP code only. 
 */

//@cindex sendOp2
void
sendOp2(OpCode op, nat task, StgWord arg1, StgWord arg2)
{
    traceSendOp(op, task, arg1, arg2);

    setPackContext(op, task,2);
    PutArg1(arg1);
    PutArg2(arg2);
    invalidatePackContext();
}

/*
 *
 * sendOpV takes a variable number of arguments, as specified by {\em n}.  
 * For example,
 *
 *    sendOpV( PP_STATS, StatsTask, 3, start_time, stop_time, sparkcount);
 */

//@cindex sendOpV
void
sendOpV(OpCode op, nat task, int n, ...)
{
    va_list ap;
    int i;
    StgWord arg; 
   
    va_start(ap, n);
    traceSendOp(op, task, 0, 0);
     
    setPackContext(op, task, (nat)n);
    // TODO: this should be done in setPackContext; in that case, drop +1's below
    PutArgN(0,task); 
    for (i = 0; i < n; ++i) {
	arg = va_arg(ap, StgWord);
	PutArgN(i+1, arg);
    }
    va_end(ap);

    invalidatePackContext();
}

/*    
 *
 * sendOpNV takes a variable-size datablock, as specified by {\em
 * nelem} and a variable number of arguments, as specified by {\em
 * narg}. N.B. The datablock and the additional arguments are contiguous
 * and are copied over together.  For example,
 *
 *        sendOpNV(PP_RESUME, tsoga.pe, 6, nelem, data,
 *	    (W_) ga.weight, (W_) ga.loc.gc.gtid, (W_) ga.loc.gc.slot, 
 *	    (W_) tsoga.weight, (W_) tsoga.loc.gc.gtid, (W_) tsoga.loc.gc.slot);
 *
 * Important: The variable arguments must all be StgWords.

 sendOpNV(_, tid, m, n, data, x1, ..., xm):

                         |   n elems
     +------------------------------
     | x1 | ... | xm | n | data ....
     +------------------------------
 */

//@cindex sendOpNV
void
sendOpNV(OpCode op, nat task, int nelem, 
	 StgWord *datablock, int narg, ...)
{
    va_list ap;
    int i;
    StgWord arg;

 
    va_start(ap, narg);

    traceSendOp(op, task, 0, 0);
    IF_PAR_DEBUG(paranoia,
		 debugBelch("~~ sendOpNV: op = %x (%s), PE = %x, narg = %d, nelem = %d\n",
			    op, getOpName(op), task, narg, nelem));

    setPackContext(op, task,nelem+narg+1);
    // TODO: this should be done in setPackContext; in that case, drop +1's below
    PutArgN(0,task); 
    for (i = 0; i < narg; ++i) {
	arg = va_arg(ap, StgWord);
        IF_PAR_DEBUG(paranoia,
		     fprintf(stderr,"~~ sendOpNV: arg = %d\n",arg));
	PutArgN(i+1,arg);
    }
    arg = (StgWord) nelem;
    PutArgN(narg+1, arg);

/*  for (i=0; i < nelem; ++i) fprintf(stderr, "%d ",datablock[i]); */
/*  fprintf(stderr," in sendOpNV\n");*/

    PutArgs(datablock, nelem);
    va_end(ap);
    invalidatePackContext();
}

/*    
 * sendOpN take a variable size array argument, whose size is given by
 * {\em n}.  For example,
 *
 *    sendOpN( PP_STATS, StatsTask, 3, stats_array);
 */

//@cindex sendOpN
void
sendOpN(OpCode op, nat task, int n, StgPtr args)
{
    long arg;
    traceSendOp(op, task, 0, 0);

    setPackContext(op, task,(nat) n+1);
    // TODO: this should be done in setPackContext; in that case, drop +1's below
    PutArgN(0,task); 

    arg = (long) n;
    PutArgN(0+1, arg);
    PutArgs(args, n);

    invalidatePackContext();
}


/* receiving messages:
******************************************************************************

!!!!!!!!!!TODO: update the description!!!!!!!!!!!
*/

/* JB: optimization (buffered receiving), Handling of receive Buffers:
**********************************************************************************
 * The idea: maintain an unpacking context (buffer) independently from the 
 * MP-System, set this context to the actual message on request (getPacket)
 * and supply data to unpacking functions from this buffer (Attention: unprotected!)
 *
 * Receive Buffers are filled when checking for messages to process. Once filled, 
 * with a packet of messages, the field "active" will contain the actual Opcode
 * (to allow several calls to "getOpcodeAndSender"). Buffers are checked RR for 
 * messages to process (could be changed, would even save memory space, but 
 * wouldn't guarantee fair processing).
 *  The buffered implementation must implement every function in LLC.h for receiving 
 *  data. Similar to the packing code, it uses an unpackContext which is set 
 *  when getting a packet (PacketsWaiting/getPacket must preceed every message 
 *  processing action).
 *
 * Update Oct.2004: The inefficiencies of receive buffers lead to experiments 
 * with a version that uses only one receive buffer (define NORECVBUFFER).
 * This receive buffer is always completely processed when a packet has arrived.
 * Adapted to using just one buffer: startNewPacket, PacketsWaiting, ...
 *
 */

// read ahead in the buffer (to see if another packet follows)
INLINE_HEADER 
long nextSep(void) {
  long* readpos = unpackContext->position;
  long sep=0;

  // we don't want to run out of the current buffer's dataspace:
  ASSERT(readpos <= (unpackContext->PackData + (unpackContext->counter)));

  // scan remains of previous message in buffer (could be only halfway unpacked...)
  do {
    sep = *readpos++;
  } while ( sep != NEXT_MSG && sep != END_OF_PACKET );

  return sep;
}


/* send actions to implement (interface LLC.h):
 *
 * int GetPacket();          -- blocking receive
 * rtsBool PacketsWaiting(); -- probing only, not receiving
 * void GetArg*(...)         -- data retrieval, 
 *         #define'd in LLC.h (using memcpy on unpackContext)
 * void getOpcodeAndSender   -- just a selector on unpackContext
 */

/* Buffer mechanism:
 * start a new packet of messages by
 *   - receiving the packet, blocking receive done in MPSystem
 *   - consuming packet header 
 *   - setting messageBuffer fields appropriately
 * assumes: there *is* a packet for this buffer (see above) and the buffer is empty 
 */
static
void startNewPacket(void) {

  IF_PAR_DEBUG(paranoia, 
	       debugBelch("~~  LLComms: starting a packet of messages.\n"));

  ASSERT(unpackContext);

  // receive packet from MPSystem
  MP_recv(DATASPACEWORDS, unpackContext->PackData,
	  &(unpackContext->active), &(unpackContext->pe));
  ASSERT(1 <= unpackContext->pe && 
	 unpackContext->pe <= nPEs); // PE already set

  unpackContext->position = unpackContext->PackData;

  // Handle (unbuffered) system messages from level below separately,
  // select this case over the received opcode.
  if (unpackContext->active != PP_PACKET) {
    // we have a system message:
    ASSERT(ISSYSCODE(unpackContext->active));
    // nothing to do (no header included)
  } else {
    // if it is a packet: consume header first
    ASSERT(unpackContext->active == PP_PACKET); // should be a packet!

    // TODO: put into own method "mungePacketHeader()"
    unpackContext->age = *unpackContext->position++;
    unpackContext->counter = *unpackContext->position++;
    unpackContext->position++; // machine load
    // end of header
  }

  // First message starts by its OpCode, before the included data (see
  // layout above).
  unpackContext->active = *unpackContext->position++;
  ASSERT(ISOPCODE(unpackContext->active));
  
  IF_PAR_DEBUG(paranoia, 
	       debugBelch("~~  LLComms: packet from %x loaded",unpackContext->pe));
}


/* packetsWaiting (just pvm_probe in the old implementation) must do 
 * some more than MP_probe in case of buffering:
 *
 * It also searches the receive buffer to see if another message follows.
 */
rtsBool PacketsWaiting(void) { 

  ASSERT(receiveBuffer && unpackContext);

  // unbuffered case, only MP_probe:
  if (!RtsFlags.ParFlags.BufferTime) 
    return MP_probe();

  if (MP_probe()) 
  // next packet already waiting, all fine.
    return rtsTrue;

  // buffer active: parse for 
  if ((unpackContext->active) && (nextSep() == NEXT_MSG))
    return rtsTrue; // another message in active buffer

  // if we are here: no active buffers, no messages

  // buffer spent, should be invalidated...
  unpackContext->active = 0;
  unpackContext->pe = 0;
  unpackContext->counter=0;

  return rtsFalse;
}

/* This method prepares a new message in the unpackContext to be
 * processed.  The message is either retrieved from the MP-System
 * directly (no buffering) or the unpackContext contains a packet of
 * messages (buffering), and the next one is prepared to be processed.
 * 
 * Methods from MPSystem.h guarantee that system messages have priority.
 * If messages are buffered, this cannot always be guaranteed.
 */
int GetPacket(void) {

  IF_PAR_DEBUG(paranoia, 
  	       debugBelch("~~  LLComms: GetPacket\n"));
 
  ASSERT(receiveBuffer  && unpackContext);

  // buffered case: check current buffer first
  if (RtsFlags.ParFlags.BufferTime) {
    IF_PAR_DEBUG(paranoia, 
		 debugBelch("~~  buffered, checking context"));
    if (unpackContext->active) { // buffer was active before
      long sep;
      OpCode op;

      // TODO: somewhat inefficient...
      sep = nextSep(); // look ahead
      if (sep == NEXT_MSG) {// another message follows in the packet:
	IF_PAR_DEBUG(paranoia, 
		     debugBelch("~~ previous packet still active, preparing"));
      // consume remains of previous message in buffer (could be only halfway unpacked...)
        while ( *(unpackContext->position)!=sep ) unpackContext->position++;

	unpackContext->position++; // consume separator
	op = *(unpackContext->position++); // get and set Opcode
	ASSERT(ISOPCODE(op));
	unpackContext->active=op;
        return 1; // separator consumed, opcode set, ready to unpack.

      } else { // no new message: must be end marker: invalidate current context
	IF_PAR_DEBUG(paranoia, 
		     debugBelch("~~ previous packet  becomes inactive"));
	ASSERT(sep == END_OF_PACKET);
	unpackContext->active = 0; // set buffer inactive
	unpackContext->pe = 0; // invalidate sender
	unpackContext->counter = 0; // reset datasize
      } // continue as if it had been inactive before:
    }
    // (blocking) receive one message from MPSystem.h, 
    // munge header,
    // prepare to consume by setting sender and opcode
    startNewPacket();
  } else {  
    IF_PAR_DEBUG(paranoia, 
		 debugBelch("~~  unbuffered, loading message\n"));
    
    MP_recv(DATASPACEWORDS, unpackContext->PackData,
	    &(unpackContext->active), &(unpackContext->pe));
       unpackContext->age = msTime();
    ASSERT(1 <= unpackContext->pe && 
	   unpackContext->pe <= nPEs);

    ASSERT(ISOPCODE(unpackContext->active));
    unpackContext->position = unpackContext->PackData;
    // no header, set header values "manually"
    unpackContext->age = msTime();
    unpackContext->counter = 1;
    // opcode is repeated in the dataspace, move forward one step
    ASSERT((long) unpackContext->active == *unpackContext->position);
    unpackContext->position++;
  }  
  IF_PAR_DEBUG(paranoia, 
	       debugBelch("~~  GetPacket finished\n"));
  return 1;
}

void getOpcodeAndSender (OpCode *popcode, nat *psender_id) {

  *popcode = unpackContext->active;
  *psender_id = unpackContext->pe;
  return;
}


#endif /* PARALLEL_HASKELL -- whole file */
