/* ----------------------------------------------------------------------------
 * Time-stamp: <Tue Dec 28 2010 19:27:28 Stardate: Stardate: [-28]4209.05 hwloidl>
 *
 * High Level Communications Routines (HLComms.c)
 *
 * Contains the high-level routines (i.e. communication
 * subsystem independent) used by GUM
 * 
 * GUM 0.2x: Phil Trinder, Glasgow University, 12 December 1994
 * GUM 3.xx: Phil Trinder, Simon Marlow July 1998
 * GUM 4.xx: H-W. Loidl, Heriot-Watt University, November 1999 -
 * 
 * GenPar 6.x: J.Berthold, Philipps-Universität Marburg 2006
 *      this file contains routines for *data* communication only
 * ------------------------------------------------------------------------- */
 
/* -------------------------------------------------------
   This version is the inital GUM-Eden merge developed just
   aftyer the Hackathon, St Andrews, Dec 2009 
   ------------------------------------------------------- */

#ifdef PARALLEL_RTS /* whole file */

#include "Rts.h"
#include "RtsUtils.h"
#include "RtsFlags.h"
#include "sm/Storage.h"  // allocation etc.
#include "Updates.h"     // UPD_IND
#include "HLC.h"
#include "RTTables.h"
#include "PEOpCodes.h"
#include "Sparks.h"      // for spark pool management
#include "Capability.h"  // for sparkPoolSizeCap

#include "Capability.h"
//#include "StgMiscClosures.h" // END_TSO_QUEUE etc
//#include "DerivedConstants.h" // STDR_HDR_SIZE
#include "ParallelRts.h"
// #include "RtsTypes.h"

#if !defined(DUMMY_BLOCKED_FETCH)
# error "PANIC: DUMMY_BLOCKED_FETCH is off, but GUM6 does not support BLOCKED_FETCH nodes any more (replaced by Ghost TSOs)"
#endif

// #include "Parallel.h"
#include "ParallelDebug.h"
#include "Sparks.h"
// #include "FetchMe.h"     // for BLOCKED_FETCH_info etc

// JB HACK
// #include "SchedAPI.h" // createThread for GhostTSOs

//@cindex freeMsgBuffer
static StgWord **freeMsgBuffer = NULL;
//@cindex freeMsgIndex
static nat      *freeMsgIndex  = NULL;
nat  choosePE(void);

/* debugging */
extern rtsBool check_IND_cycle(StgClosure *pT);

/* dummy settings */
// HWL NO: this is now in ParallelRts.h
// extern Port RtsPort, NoPort;

extern StgTSO *blackhole_queue;
extern StgTSO *ghost_TSO_queue;
// Global.c
// void setFetchedLA (StgTSO *t);


// gumPackBuffer: for receiving messages
static rtsPackBuffer *gumPackBuffer;
// variable for round-robin distribution, will start at thisPE + 1
// (set by initMoreBuffers)
// static // JB: conflict with Parallel.h
// defined in DataComms.c
// nat targetPE = 0; 

/*
 * ChoosePE selects a PE number between 1 and nPEs, either 'at
 * random' or round-robin, starting with thisPE+1
 */
nat 
choosePE(void)
{
 nat temp;

  IF_PAR_DEBUG(paranoia,
	       debugBelch("choosePE (current = %d)\n", 
			  targetPE));
  if (RtsFlags.ParFlags.placement & 1) // random
    temp = 1 + (nat) (lrand48() % nPEs);
  else {// round-robin
    temp = targetPE; 
    targetPE = (targetPE >= nPEs) ? 1 : (targetPE + 1);
  }
  if ((RtsFlags.ParFlags.placement & 2) // no local placement
      && (temp ==  thisPE)) { 
    temp = (temp ==  nPEs) ? 1 : (temp + 1);
  }
  IF_PAR_DEBUG(paranoia,
	       debugBelch("chosen: %d, new targetPE == %d\n", 
			  temp,targetPE));
  return temp;
}

#if 0
StgTSO *
createGhostTSO(Capability *cap, StgClosure *closure, globalAddr rga) {
  StgTSO *t;
  globalAddr *new_rga;

  /* This includes (seq) BHs and RBHs */
  // closure = GALAlookup(&ga);
  t = createThread (cap, 0/*stack_size*/);   /* create a Ghost TSO; GUM-6x -- HWL */
  t->block_info.closure = closure;           /* ptr to local closure; GUM-6x -- HWL */
  t->what_next = ThreadNeverDoAnything;  // this marks a ghost TSO
  t->why_blocked = BlockedOnRemoteFetch; //MKA BlockedOnFetchMe /*BlockedOnRemoteFetch*/;         /* characterises Ghost TSO; GUM-6x -- HWL */
  /* we create a GALA entry, which we can point to from the Ghost TSO */
  new_rga = setRemoteGA(NULL, &rga, rtsTrue); // create GALA entry with this given rga
  // ASSERT: GALA entry is at the head of outPtrs list
  // setFetchedLA(t); // HACK: the la in the liveRemoteGAs list points to the Ghost TSO we created;
  // UNACCEPTABLE HWL HACK: we abuse a TSO field of the TSO to hold a ptr to the GA
  t->blocked_exceptions = (struct StgTSO_ *)new_rga;
  // bad choice of field
  // t->blocked_exceptions = (struct StgTSO_ *)new_rga; // LAGAlookup(closure);
  // ASSERT(LOOKS_LIKE_GA((globalAddr*)t->block_info.tso));
  /* copy the GA of the demander of this value 
  t->ga.pe = rga.pe;
  t->ga.slot = rga.slot;
  t->ga.weight = rga.weight;
  */
  // bf->link = NULL;  debugging

  IF_PAR_DEBUG(fetch, // mpcomm,
  		 { char str1[MAX_GA_STR_LEN];
  		   char str2[MAX_GA_STR_LEN];
  		   showGA(&rga, str1);
  		   showGA(new_rga, str2);
  		   debugBelch("%%%%++ Created Ghost TSO %d (%p) for closure  %p (%s) with GA %s (new_rga %s) @ %p\n",
  			      t->id, t, closure, info_type(closure), str1, str2, new_rga);};);

  /* Can we assert something on the remote GA? */
  // ASSERT(GALAlookup(&rga) == NULL);
  return t;
}
#endif

#ifdef DEBUG
// JB need forward declarations for my gcc (4.0)
void DebugPrintGAGAMap(globalAddr *gagamap, int nGAs);
#endif

/*
 * SendFetch packs the two global addresses and load info a message +
 * sends it.  

//@cindex FETCH

   Structure of a FETCH message:

         |    GA 1     |        GA 2          |

         +------------------------------------+------+
	 | gtid | slot | weight | gtid | slot | load |
	 +------------------------------------+------+
 */

//@cindex sendFetch
void
sendFetch(globalAddr *rga, globalAddr *lga, int load)
{
  ASSERT(LOOKS_LIKE_GA(lga));
  ASSERT(LOOKS_LIKE_GA(rga));
  ASSERT(rga->weight >= (rtsWeight)MIN_GA_WEIGHT);
  // TODO: is the weight of the local GA always 0? ASSERT(lga->weight == (rtsWeight)0);
  IF_PAR_DEBUG(fetch, // mpcomm,
	       { char str1[MAX_GA_STR_LEN];
		 char str2[MAX_GA_STR_LEN];
		 showGA(rga, str1);
		 showGA(lga, str2);
		 debugBelch("~^** Sending Fetch for %s; locally %s, load = %d\n",
			    str1, str2, load);};);

   
  sendOpV(PP_FETCH, rga->pe, 6,  // FIXME: PEs numbered 1..n but sendOp expects 0..n-1
	  (StgWord) rga->pe, (StgWord) rga->slot, 
	  (StgWord) lga->weight, (StgWord) lga->pe, 
	  (StgWord) lga->slot, (StgWord) load);
}

/*
 * unpackFetch unpacks a FETCH message into two Global addresses and a load
 * figure.  
*/

//@cindex unpackFetch
static void
unpackFetch(long/*rtsPackBuffer*/  *buf, globalAddr *lga, globalAddr *rga, int *load)
{
  // NB: low-level header size is 2: OpCode, target PE
  buf++; 
  buf++; 

  lga->weight = (rtsWeight)1; // dummy
  lga->pe = (nat) buf[0];
  lga->slot = (nat) buf[1];
  ASSERT(LOOKS_LIKE_GA(lga));
  ASSERT(lga->pe==thisPE);

  rga->weight = (rtsWeight) buf[2];
  rga->pe = (nat) buf[3];
  rga->slot = (nat) buf[4];
  ASSERT(LOOKS_LIKE_GA(lga));
  ASSERT(LOOKS_LIKE_PE(rga->pe));

  *load = (int) buf[5];
  ASSERT(*load == 0); // load unused at the moment; drop this assertion once it is used

  IF_PAR_DEBUG(fetch, // mpcomm,
	       { char str1[MAX_GA_STR_LEN];
		 char str2[MAX_GA_STR_LEN];
		 showGA(lga, str1);
		 showGA(rga, str2);
		 debugBelch("~^** Unpacking Fetch for %s, to be stored in %s, load = %d", 
			     str1, str2);};);

  // TODO: can we assert something like this? ASSERT(rga->weight >= (rtsWeight)MIN_GA_WEIGHT);
}

/*
 * SendResume packs the remote blocking queue's GA and data into a message 
 * and sends it.

//@cindex RESUME

   Structure of a RESUME message:

      -------------------------------
      | weight | slot | n | data ...
      -------------------------------

   data is a packed graph represented as an rtsPackBuffer
   n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size
 */

//@cindex sendResume
void
sendResume(globalAddr *rga, int nelem, rtsPackBuffer *packBuffer)
{
   
  IF_PAR_DEBUG(fetch, // mpcomm,
	       { char str1[MAX_GA_STR_LEN];
		 showGA(rga, str1);
		 debugBelch("^^[] Sending Resume to %s to [%u] (packet <<%d>> with %d elems)\n",
			    str1, rga->pe, packBuffer->id, nelem);};);
  /*
  IF_PAR_DEBUG(packet,
	       PrintPacket(packBuffer));
  */
  ASSERT(nelem==packBuffer->size);
  /* check for magic end-of-buffer word */
  //IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
  //fprintf(stderr,"\n\n op = %x \n  task  = %d \n  arg2 = %d \n", PP_RESUME, rga->pe,(nelem + PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM) );  
  sendOpNV(PP_RESUME, rga->pe,   // FIXME: PEs numbered 1..n but sendOp expects 0..n-1
	   nelem + PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM, (StgPtr)packBuffer, 
           3, (StgWord)rga->weight, (StgWord)rga->pe, (StgWord)rga->slot);

	   //	   2, (rtsWeight) rga->weight, (StgWord) rga->slot);
}

/*
 * unpackResume unpacks a Resume message into two Global addresses and
 * a data array.
 */

//@cindex unpackResume
static void
unpackResume(long/*rtsPackBuffer*/ *buf, globalAddr *lga, int *nelem, rtsPackBuffer *packBuffer)
{
    /*
      RESUME event is written in awaken_blocked_queue
    DumpRawGranEvent(thisPE, taskIDtoPE(lga->pe), 
		     GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0);
    */

    // NB: low-level header size is 2: OpCode, target PE
    buf++; 
    buf++; 

    /* GA of the closure, whose evaluation we resume */
    lga->weight = (rtsWeight) buf[0];
    lga->pe = buf[1];
    lga->slot = (int) buf[2];
    ASSERT(lga->pe==thisPE);

    ASSERT(LOOKS_LIKE_GA(lga));

    *nelem = (int) buf[3] - PACK_BUFFER_HDR_SIZEW - DEBUG_HEADROOM;
    //GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM);
    memcpy(packBuffer,buf+4,(*nelem+ PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM)*sizeof(long));
    IF_PAR_DEBUG(fetch, // mpcomm,
		 { char str1[MAX_GA_STR_LEN];
		   showGA(lga, str1);
		   debugBelch("~^[] Unpacking Resume (packet <<%d>> with %d elems) for %s\n", // ((%u, %d, %x))\n", 
			      packBuffer->id, *nelem, str1);};);
			      // thisPE, (int) buf[2], (unsigned) buf[1]));

    /* check for magic end-of-buffer word */
    // TODO: RE-ENABLE: IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
}


/*
 * SendAck packs the global address being acknowledged, together with
 * an array of global addresses for any closures shipped and sends them.

//@cindex ACK

   Structure of an ACK message:

      |        GA 1          |        GA 2          | 
      +---------------------------------------------+-------
      | weight | PE | slot | weight | PE | slot |  .....  ngas times
      + --------------------------------------------+------- 

 */

//@cindex sendAck
void
sendAck(nat pe, int ngas, globalAddr *gagamap)
{
  static long *buffer;
  long *p;
  int i;

  ASSERT(LOOKS_LIKE_PE(pe));

  if(ngas==0)
    return; //don't send unnecessary messages!!
  
  buffer = (long *) gumPackBuffer;
 
 
  for(i = 0, p = buffer; i < ngas; i++, p += 6) {
    //    ASSERT((int)gagamap[1].weight >=  MIN_GA_WEIGHT );
    ASSERT(LOOKS_LIKE_PE(gagamap->pe));
    ASSERT(LOOKS_LIKE_GA(gagamap));
    p[0] = (long) gagamap->weight;
    p[1] = (long) gagamap->pe;
    p[2] = (long) gagamap->slot;
    gagamap++;
    ASSERT(LOOKS_LIKE_PE(gagamap->pe));
    ASSERT(LOOKS_LIKE_GA(gagamap));
    p[3] = (long) gagamap->weight;
    p[4] = (long) gagamap->pe;
    p[5] = (long) gagamap->slot;
    ASSERT(p[4] == thisPE);  // the new GA must be on this PE!
    gagamap++;
  }
  IF_PAR_DEBUG(mpcomm,
	       debugBelch("~^,, Sending Ack (%d pairs) to [%u]\n", 
		     ngas, pe));
 
  sendOpN(PP_ACK, pe, p - buffer, (StgPtr)buffer); // FIXME: PEs numbered 1..n but sendOp expects 0..n-1
}

/*
 * unpackAck unpacks an Acknowledgement message into a Global address,
 * a count of the number of global addresses following and a map of 
 * Global addresses
 */

//@cindex unpackAck
static void
unpackAck(long/*rtsPackBuffer*/  *buf, int *ngas, globalAddr *gagamap)
{
  // TODO: nuke GetArgs and read directly from buf
  long GAarraysize;
  //long buf[6];
 
  // NB: low-level header size is 2: OpCode, target PE
  buf++; 
  buf++; 

  //GetArgs(&GAarraysize, 1);
  GAarraysize = buf[0] ;
  ASSERT(GAarraysize <= RtsFlags.ParFlags.packBufferSize);

  *ngas = GAarraysize / 6; /* TODO: nuke MAGIC constant */
  IF_PAR_DEBUG(mpcomm,
	       debugBelch("~^,, Unpacking Ack (%d pairs) on [%x]\n", 
			  *ngas, thisPE));
  // skip size field
  buf++; 
  while (GAarraysize > 0) {
    // GetArgs(buf, 6);
    gagamap->weight = (rtsWeight) buf[0];
    gagamap->pe = (nat) buf[1];
    gagamap->slot = (nat) buf[2];
    // @@@@@@@@@@@@@@@@@@@@@@ TODO: ASSERT(buf[1] == thisPE);  // ENABLE the old GA must be on this PE!
    if (buf[1] != thisPE) { 
      char str1[MAX_GA_STR_LEN];
      showGA(gagamap, str1);
      belch("@@@@ WARNING: unpackAck: old GA is not on this PE [%u], but on [%u]: %s",
	    thisPE, buf[1], str1);
    }
    ASSERT(LOOKS_LIKE_PE(gagamap->pe));
    ASSERT(LOOKS_LIKE_GA(gagamap));
    gagamap++;
    gagamap->weight = (rtsWeight) buf[3];
    gagamap->pe = (nat) buf[4];
    gagamap->slot = (nat) buf[5];
    ASSERT(LOOKS_LIKE_PE(gagamap->pe));
    ASSERT(LOOKS_LIKE_GA(gagamap));
    ASSERT(gagamap->weight >= (rtsWeight)MIN_GA_WEIGHT);
    gagamap++;
    GAarraysize -= 6;
    buf+=6; 
  } 
}

/*
 * SendFish packs the global address being acknowledged, together with
 * an array of global addresses for any closures shipped and sends them.

//@cindex FISH

 Structure of a FISH message:

     +----------------------------------+
     | orig PE | age | history | hunger |
     +----------------------------------+
 */

//@cindex sendFish
void
sendFish(nat destPE, nat origPE, 
	 int age, int history, int hunger)
{  
  
 IF_PAR_DEBUG(fish,
	      debugBelch("$$>> Sending Fish from [%u] to [%u] (%d outstanding fishes)\n", 
			 origPE, destPE, outstandingFishes)); 
 ASSERT(LOOKS_LIKE_PE(destPE));
 ASSERT(LOOKS_LIKE_PE(origPE));
 ASSERT(hunger==RtsFlags.ParFlags.fishHunger); // sanity check

 sendOpV(PP_FISH, destPE,(int) 4,   // FIXME: PEs numbered 1..n but sendOp expects 0..n-1
	 (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger);

  if (origPE == thisPE) {
    //fishing = rtsTrue;
    outstandingFishes++;
  }
}

/*
 * unpackFish unpacks a FISH message into the global task id of the
 * originating PE and 3 data fields: the age, history and hunger of the
 * fish. The history + hunger are not currently used.

 */

//@cindex unpackFish
static void
unpackFish(long/*rtsPackBuffer*/  *buf, nat *origPE, int *age, int *history, int *hunger)
{

  // NB: low-level header size is 2: OpCode, target PE
  buf++; 
  buf++; 

  IF_PAR_DEBUG(fish,
	       debugBelch("$$__ Unpacking Fish from [%u] (age=%d, hunger=%d); time=%ld; spark pool size=??\n", 
		     buf[0], (int) buf[1],(int) buf[2],
                     msTime()/*CURRENT_TIME*/
		     /*,spark_queue_len(&(MainRegTable.rSparks))*/));

  // *opcode = (OpCode) buf[0];
  *origPE = (nat) buf[0];
  *age = (int) buf[1];
  *history = (int) buf[2];
  *hunger = (int) buf[3];
  ASSERT(*hunger==RtsFlags.ParFlags.fishHunger); // sanity check

  ASSERT(LOOKS_LIKE_PE(*origPE));
}


/*
 * SendFree sends (weight, slot) pairs for GAs that we no longer need
 * references to.  

//@cindex FREE

   Structure of a FREE message:
   
       +-----------------------------
       | n | weight_1 | slot_1 | ...
       +-----------------------------
 */
//@cindex sendFree
void
sendFree(nat pe, int nelem, StgPtr data)
{
  ASSERT(LOOKS_LIKE_PE(pe));
#if !defined(DUMMY_FREE)
    IF_PAR_DEBUG(mpcomm,
		 debugBelch("~^!! Sending Free (%d GAs) to [%u]", 
		       nelem/2, pe));

#if defined(PAR_TICKY)
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	totFREE++;}
#endif  
 sendOpN(PP_FREE, pe, nelem, data);  // FIXME: PEs numbered 1..n but sendOp expects 0..n-1
#else
    IF_PAR_DEBUG(mpcomm,
		 debugBelch("~^!! HACK HACK HACK HACK HACK : refusing to send Free message (%d GAs) to [%u]\n", 
		       nelem/2, pe));
    // HWL HACK: don't send any free messages at all
#endif
}

/*
 * unpackFree unpacks a FREE message into the amount of data shipped and
 * a data block.
 */
//@cindex unpackFree
static void
unpackFree(long/*rtsPackBuffer*/  *buf, int *nelem, StgWord *data)
{
  *nelem = (int) buf[0];

  IF_PAR_DEBUG(mpcomm,
	       debugBelch("~^!! Unpacking Free (%d GAs)", 
		     *nelem/2));

  //  GetArgs(data, *nelem);
  memcpy(data,buf+2,(*nelem)*sizeof(long));
}

/*
 * SendSchedule sends a closure to be evaluated in response to a Fish
 * message. The message is directed to the PE that originated the Fish
 * (origPE), and includes the packed closure (data) along with its size
 * (nelem).

//@cindex SCHEDULE

   Structure of a SCHEDULE message:

       +------------------------------------
       | PE | n | pack buffer of a graph ...
       +------------------------------------
 */
//@cindex sendSchedule
void
sendSchedule(nat origPE, int nelem, rtsPackBuffer *packBuffer) 
{
  IF_PAR_DEBUG(mpcomm,
	       debugBelch("~^-- Sending Schedule (packet <<%d>> with %d elems) to [%u]\n", 
		     packBuffer->id, nelem, origPE));
  ASSERT(LOOKS_LIKE_PE(origPE));
  /*
  IF_PAR_DEBUG(packet,
	       PrintPacket(packBuffer));
  */
  // TODO: re-enable this assertion; currently FAILS!!
  //       maybe we need to add the PACK_BUFFER_HDR_SIZEW here too? -- HWL
   
  /* check for magic end-of-buffer word */
  //IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER));
 sendOpN(PP_SCHEDULE, origPE, // FIXME: PEs numbered 1..n but sendOp expects 0..n-1
	  nelem + PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM, (StgPtr)packBuffer);

}

/*
 * unpackSchedule unpacks a SCHEDULE message into the Global address of
 * the closure shipped, the amount of data shipped (nelem) and the data
 * block (data).
 */

//@cindex unpackSchedule
static void
unpackSchedule(long/*rtsPackBuffer*/  *buf, int *nelem, rtsPackBuffer *packBuffer)
{
  // long buf[1];
 
  /* first, just unpack 1 word containing the total size (including header) */
  //GetArgs(buf, 1);
  /* no. of elems, not counting the header of the pack buffer */
  *nelem = (int) buf[2]- PACK_BUFFER_HDR_SIZEW - DEBUG_HEADROOM;
 
  // GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM);
  memcpy(packBuffer,buf+3,(*nelem + PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM)*sizeof(long));
 
  /* automatic cast of flat pvm-data to rtsPackBuffer */
 
  IF_PAR_DEBUG(pack,
	       debugBelch("~^-- Unpacking Schedule (packet <<%d>> with %d elems) on [%u]\n", 
			  packBuffer->id, *nelem, thisPE));
  
  // ASSERT(*nelem==packBuffer->size);
  /* check for magic end-of-buffer word */
 
  //  IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER));
}



//@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines
//@subsection Message-Processing Functions

/*
 * Message-Processing Functions
 *
 * The following routines process incoming GUM messages. Often reissuing
 * messages in response.
 *
 * processFish unpacks a fish message, reissuing it if it's our own,
 * sending work if we have it or sending it onwards otherwise.
 */

//@cindex prepareFreeMsgBuffers
void
prepareFreeMsgBuffers(void)
{
  nat i;
  /* Allocate the freeMsg buffers just once and then hang onto them. */
  if (freeMsgIndex == NULL) {
    
    freeMsgIndex = (nat *) stgMallocBytes(nPEs * sizeof(nat), 
					  "prepareFreeMsgBuffers (Index)");
    freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *), 
					  "prepareFreeMsgBuffers (Buffer)");
    
    for(i = 0; i < nPEs; i++)
       if (i != (thisPE-1)){
		freeMsgBuffer[i] = (StgPtr) stgMallocBytes(sizeof(StgWord)*RtsFlags.ParFlags.packBufferSize,
					       "prepareFreeMsgBuffers (Buffer #i)");

  } 
       else
	 {
	freeMsgBuffer[i] = 0;

         }
  }
  
  /* Initialize the freeMsg buffer pointers to point to the start of their
     buffers */
  for (i = 0; i < nPEs; i++)
    freeMsgIndex[i] = 0;
}

void
packAndSendResume(StgClosure *closure, GALA *bf) {
  StgTSO *next;
  StgInfoTable *ip;
  globalAddr rga;
  static rtsPackBuffer *packBuffer;
  OpCode unused_tag;

  ASSERT(UNTAG_CHECK(closure));
  packBuffer = gumPackBuffer;

  if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &unused_tag)) == NULL) {
    // Put current BF back on list	//bf->link = (StgBlockingQueueElement *)PendingFetches;
    //PendingFetches = (StgBlockedFetch *)bf;
    // ToDo: check that nothing more has to be done to prepare for GC!
    barf("PANIC: packAndSendResume: out of heap while packing graph; ToDo: call GC here\n");
    //GarbageCollect(GetRoots, rtsFalse); 
    //bf = PendingFetches;
    //PendingFetches = (StgBlockedFetch *)(bf->link);
    //closure = bf->node;
    //packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.pe);
    //ASSERT(packBuffer != (rtsPackBuffer *)NULL);
  }

  rga = bf->ga;

  /* THIS IS UNACCEPTABLE, since it abuses fields in a TSO; TODO: recode
  rga.pe = ((globalAddr*)tso->blocked_exceptions)->pe;
  rga.slot = ((globalAddr*)tso->blocked_exceptions)->slot;
  rga.weight = ((globalAddr*)tso->blocked_exceptions)->weight; */

  ASSERT(LOOKS_LIKE_PE(rga.pe));
  ASSERT(LOOKS_LIKE_GA(&rga));

  IF_PAR_DEBUG(fetch, //mpcomm,
		   { char str1[MAX_GA_STR_LEN];
	             char str2[MAX_GA_STR_LEN];
	             globalAddr *root = LAGAlookup(closure);
		     showGA(root, str1);
		     showGA(&rga, str2);
		     debugBelch("__*> packAndSendResume: sending closure %p (%s) with GA %s to GA %s\n",
				closure, info_type(closure), str1, str2);};);

  sendResume(&rga, PACK_BUFFER_PAYLOAD_SIZE(packBuffer), packBuffer);

  /* explicitly mark this Ghost TSO as garbage
  tso->what_next = 55; // UNACCEPTABLE HWL HACK: constant 55 to identify GARBAGE ghost TSO
  IF_PAR_DEBUG(fetch, // mpcomm,
	       debugBelch("__-> packAndSendResume: marking served Ghost TSO %d (%p) as GARBAGE (what_next=55)\n",
			  tso->id, tso));
  */
#if defined(PAR_TICKY)
      // Global statistics: count no. of fetches
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	globalParStats.tot_resume_mess++;
      }
#endif
}

// Contents has been unpacked into recvBuffer by MP_recv already
void
processFish(Capability *cap, OpCode opcode, nat origPE, long *recvBuffer)
{
  nat ope;
  int age, history, hunger;
  StgClosure *spark = NULL;
  
  static rtsPackBuffer *packBuffer; 
  
#if defined(PAR_TICKY)
      // Global statistics: received fishes
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	globalParStats.rec_fish_mess++;
	totFISH++;
      }
#endif

  unpackFish(recvBuffer, &origPE, &age, &history, &hunger);
  /* inlined version
  IF_PAR_DEBUG(fish,
	       debugBelch("$$__ Fish from [%u] (age=%d); time=%ld; spark pool size=??\n", 
		     origPE, (int) recvBuffer[1],
                     msTime()));

  ope = (nat) recvBuffer[0]; // redundant
  age = (int) recvBuffer[1];
  history = (int) recvBuffer[2];
  hunger = (int) recvBuffer[3];
  */


  ASSERT(LOOKS_LIKE_PE(thisPE));
  ASSERT(LOOKS_LIKE_PE(origPE));
  
  if (origPE == thisPE) {
    //fishing = rtsFalse;                   // fish has come home
    outstandingFishes--;
    last_fish_arrived_at = msTime()/*CURRENT_TIME*/;  // remember time (see schedule fct)
    return;                               // that's all
  }
  
  ASSERT(origPE != thisPE);
 
  int mus =1 ;
  // GUM4: while ((spark = findSpark(cap /*, rtsTrue (for_export)??*/)) != NULL) {
  //IF_PAR_DEBUG(verbose, fprintf(stderr,"\n\n createSparkThread: estim. spark pool size = %d \n\n", sparkPoolSizeCap(cap)));
  // we only try to get a spark, if the spark pool size is above the low watermark
 if ((sparkPoolSizeCap(cap) >= RtsFlags.ParFlags.lowWater ) && ((spark = tryStealSpark(cap)) != NULL)){
    OpCode unused_tag;
    // StgClosure *graph;
    // fprintf(stderr," found spark: remaining spark pool size = %d \n",sparkPoolSizeCap(cap) );
    IF_PAR_DEBUG(verbose, 
                 debugBelch(" found spark: remaining spark pool size = %d \n",sparkPoolSizeCap(cap) ));
    packBuffer = gumPackBuffer; 

    // TODO: check and enable this assertion; tryStealSpark should only return something worth sparking
    // ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));

    // add event to profile
    //if (RtsFlags.ParFlags.ParStats.Sparks) 
      /*DumpRawGranEvent(thisPE, origPE,
	                 SP_EXPORTED, ((StgTSO *)NULL), spark, 
			 0, spark_queue_len(cap));*/

    if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &unused_tag)) == NULL) {
      IF_PAR_DEBUG(fish,
		   debugBelch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p\n",
			 (StgClosure *)spark));
      // TODO: enable GC code here
      barf("processFish: out of heap while packing graph; ToDo: call GC here\n");
      // GarbageCollect(GetRoots, rtsFalse);
      /* Now go back and try again */
    } else {
      IF_PAR_DEBUG(verbose,
		   if (RtsFlags.ParFlags.ParStats.Sparks)
		     debugBelch("==== STEALING spark %x; sending to %x\n", spark, origPE));
      
      IF_PAR_DEBUG(fish,
		   debugBelch("$$-- Replying to FISH from %x by sending graph @ %p (%s)\n",
			 origPE, 
			 (StgClosure *)spark, info_type((StgClosure *)spark)));
      sendSchedule(origPE,(int) PACK_BUFFER_PAYLOAD_SIZE(packBuffer), packBuffer);
      
      // disposeSpark(spark); // not needed in GUM6, I think -- HWL GUM6
#if defined(PAR_TICKY)
      // Global statistics: count no. of fetches
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	globalParStats.tot_schedule_mess++;
      }
#endif
    }
  }
 
 if (spark == (StgClosure *)NULL) { // either no spark or below LWM
    IF_PAR_DEBUG(fish,
		 debugBelch("$$^^ No sparks available for FISH from %x\n",
		       origPE));
    /* We have no sparks to give */
    if (age < RtsFlags.ParFlags.fishLifeExpectancy) { // FISH_LIFE_EXPECTANCY) { 
      /* if the fish is still young, send it to another PE to look for work */
      // TODO: implement RtsFlags.ParFlags.fishForwardDelay
      IF_PAR_DEBUG(verbose,
		   if (RtsFlags.ParFlags.fishForwardDelay>0)
		     debugBelch("**** WARNING: RtsFlags.ParFlags.fishForwardDelay not implemented, but delay of %lu requested\n", RtsFlags.ParFlags.fishForwardDelay));
      
      sendFish(choosePE(), origPE,
	       (age + 1), NEW_FISH_HISTORY, RtsFlags.ParFlags.fishHunger); // NEW_FISH_HUNGER);
#if defined(PAR_TICKY)
      // Global statistics: count no. of fetches
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	globalParStats.tot_fish_mess++;
      }
#endif
    } else { /* otherwise, send it home to die */
      sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, RtsFlags.ParFlags.fishHunger) ; // NEW_FISH_HUNGER);
#if 0 && defined(PAR_TICKY)
      // Global statistics: count no. of fetches
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	globalParStats.tot_fish_mess++;
      }
#endif
    }
  }
}  /* processFish */

// -----------------------------------------------------------------------------

/*
 * offLoadSpark is very similar to processFish. Difference is that we
 * randomly pick a PE to offload this spark to.
 */
//@cindex offLoadSpark
void
offLoadSpark(Capability *cap)
{
  nat toPE;
  StgClosure *spark;
  static rtsPackBuffer *packBuffer; 

  toPE = choosePE();
  ASSERT(LOOKS_LIKE_PE(toPE));

  /* search for a spark to off-load */
  // GUM4: if ((spark = findSpark(cap /*, rtsTrue for_export??*/)) != NULL) {
  if ((spark = tryStealSpark(cap)) != NULL) {
    nat size;
    OpCode unused_tag;
  
    packBuffer = gumPackBuffer; 
    ASSERT(closure_SHOULD_SPARK((StgClosure *)spark));
  
    // add event to profile
    /* if (RtsFlags.ParFlags.ParStats.Sparks) 
	DumpRawGranEvent(thisPE, toPE,
			 SP_EXPORTED, ((StgTSO *)NULL), spark, 
			 0, /*spark_queue_len(cap));*/

    if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &unused_tag)) == NULL) {
      IF_PAR_DEBUG(fish,
		   debugBelch("$$ GC while trying to offload spark via PackNearbyGraph of node %p\n",
			 (StgClosure *)spark));
      barf("offLoadSpark: out of heap while packing graph; ToDo: call GC here\n");
      // GarbageCollect(GetRoots, rtsFalse);
      /* Now go back and try again */
    } else {
      IF_PAR_DEBUG(verbose,
		   if (RtsFlags.ParFlags.ParStats.Sparks)
		   debugBelch("==== STEALING spark [%u]; sending to %u\n", spark, toPE));
      
      IF_PAR_DEBUG(fish,
		   debugBelch("$$-- Offloading Spark to [%u] by sending graph @ %p (%s) [sparkq_len=??]\n",
			 toPE, 
			 (StgClosure *)spark, info_type((StgClosure *)spark)
                         /*,spark_queue_len(&(MainRegTable.rSparks))*/));
      sendSchedule(toPE, PACK_BUFFER_PAYLOAD_SIZE(packBuffer), packBuffer);
      // disposeSpark(spark); -- NOOOOO; findSpark automatically removes spark from the pool
#if defined(PAR_TICKY)
      // Global statistics: count no. of fetches
      if (RtsFlags.ParFlags.ParStats.Global &&
	    RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	  globalParStats.tot_schedule_mess++;
      }
#endif
    }
  }
}

/*
 * processFetch either returns the requested data (if available) 
 * or blocks the remote blocking queue on a black hole (if not).
 * ga ... the closure being fetched
 * rga ... the FM requesting the data
 */

// FIXME: this needs a recvBuffer argument, supplied by MP_recv (see call in Schedule.c and processFish for an pattern of process* code
//@cindex processFetch
void
processFetch (long *recvBuffer , Capability *cap )
{
  globalAddr ga, rga;
  int load;
  StgClosure *closure;
  StgInfoTable *ip;
  StgTSO *t;

#if defined(PAR_TICKY)
  // Global statistics: received fetches
  if (RtsFlags.ParFlags.ParStats.Global &&
  	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
  	globalParStats.rec_fetch_mess++;
    totFETCH++;
  }
#endif

  unpackFetch(recvBuffer,&ga, &rga, &load);
  ASSERT(LOOKS_LIKE_GA(&ga));
  ASSERT(LOOKS_LIKE_GA(&rga));
  ASSERT(ga.pe==thisPE);

  IF_PAR_DEBUG(fetch, // mpcomm,
	       { char str1[MAX_GA_STR_LEN];
		 char str2[MAX_GA_STR_LEN];
		 showGA(&ga, str1);
		 showGA(&rga, str2);
		 debugBelch("%%__ Received Fetch for %s, Resume %s (load %d) from %u\n",
			    str1, str2, 
			    load, rga.pe);};);
  
  closure = UNTAG_CLOSURE(GALAlookup(&ga));
  ASSERT(closure != (StgClosure *)NULL);
  ASSERT(UNTAG_CHECK(closure));

  ip = get_itbl(closure);
  if (ip->type == FETCH_ME && rga.pe!=thisPE) { // HWL OPT: could do this shorten FM chains:   
    IF_PAR_DEBUG(fetch, // mpcomm,
		 { char str1[MAX_GA_STR_LEN];
		   char str2[MAX_GA_STR_LEN];
		   char str3[MAX_GA_STR_LEN];
		   showGA(&ga, str1);
		   showGA(&rga, str2);
		   showGA(((StgFetchMe *)closure)->ga, str3);
		   debugBelch("%%__ Target of a Fetch for %s is FETCH_ME for %s (demanded by %s); forwarding the Fetch to %u\n",
			      str1, str3, str2, ((StgFetchMe *)closure)->ga->pe);};);
    /* Forward the Fetch to someone else */
    sendFetch(((StgFetchMe *)closure)->ga, &rga, load);

#if defined(PAR_TICKY)
    // Global statistics: count no. of fetches
    if (RtsFlags.ParFlags.ParStats.Global &&
	RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
      globalParStats.tot_fetch_mess++;
    }
#endif
  } else if (rga.pe == thisPE) { // HWL OPT: if demander is local, update the FM with an IND to the local closure
    /* Our own FETCH forwarded back around to us */
#if defined(DEBUG)
    IF_PAR_DEBUG(mpcomm,
		 { char str1[MAX_GA_STR_LEN];
		   char str2[MAX_GA_STR_LEN];
		   char str3[MAX_GA_STR_LEN];
		   showGA(&ga, str1);
		   showGA(&rga, str2);
                   debugBelch("%%%%== Fetch cycle: Fetch for %s (demanded by %s) returned to sending PE [%u]; closure=%p (%s)\n",
			      str1, str2, thisPE, closure, info_type(closure));};);
#endif  
    StgClosure * fm_closure = UNTAG_CLOSURE(GALAlookup(&rga));  // local address of the orig FM
    // HWL TODO: ASSERT(get_itbl(fm_closure)->type==FETCH_ME)
    if (!get_itbl(fm_closure)->type==FETCH_ME) { //  || get_itbl(fm_closure)->type==IND);
      IF_PAR_DEBUG(mpcomm,
		 { char str1[MAX_GA_STR_LEN];
		   char str2[MAX_GA_STR_LEN];
		   char str3[MAX_GA_STR_LEN];
		   showGA(&ga, str1);
		   showGA(&rga, str2);
                   debugBelch("**** WARNING: Fetch cycle: demanding closure %p (%s) (known as %s) is NOT a FETCH_ME on PE [%u]; closure=%p (%s)\n",
			      fm_closure, info_type(fm_closure), str2, thisPE, closure, info_type(closure));};);
    }
    // HWL CHECK: is this guaranteed never to create a loop?
    if (fm_closure==closure) { // UNACCEPTABLE HWL HACK: closure must be different!!!!!
      debugBelch("**** WARNING: Trying to update a closure with itself as result of Fetch cycle: fm_closure=%p (%s), closure=%p (%s)\n",
		 fm_closure, info_type(fm_closure), closure, info_type(closure));
    } else {
      SORT_OF_SAFE_UPD_IND(fm_closure, closure);                               // update it with an IND to the local data
    }
    // HWL CHECK: are the tags ok?

    // HWL CHECK: is the code below still necessary?
    /* We may have already discovered that the fetch target is our own. 
    if ((StgClosure *)fmbq != closure) 
      CommonUp((StgClosure *)fmbq, closure);
    (void) addWeight(&rga);
    */
  } else if (IsBlackhole(closure)) {
     // fprintf(stderr, "\n  F: BH \n "); 
#if defined(DUMMY_BLOCKED_FETCH)
    GALA *bf;
    // t = createGhostTSO(cap, closure, rga);
    bf = createBlockedFetch(cap, closure, rga);
    bf->next = blockedFetches;
    blockedFetches = bf;

    IF_PAR_DEBUG(fetch,
		 { char str1[MAX_GA_STR_LEN];
		   char str2[MAX_GA_STR_LEN];
		   char str3[MAX_GA_STR_LEN];
		   showGA(&ga, str1);
		   showGA(&rga, str2);
		   showGA(&(bf->ga), str3);
		   debugBelch("%%%%++ Blocking Fetch on %s @ %p (%s) , requested by %s; BF is %s @ %p",
			      str1, closure, info_type(closure), str2, str3, bf);};);

    /* If we're hitting a BH or RBH or FMBQ we have to put a special form of TSO
       closure into the BQ in order to denote that when updating this node
       the result should be sent to the originator of this fetch message. 
       In GUM4 we used a special BLOCKED_FETCH closure for this. 
    */
    // put it on the global blackhole_queue
    // ASSUMES: sched_mutex

    /* WAS:
    setTSOLink (cap, t, ghost_TSO_queue);
    ghost_TSO_queue = t;
    t->link = blackhole_queue;
    blackhole_queue = t;
    */
#else
# error "should compile the PARALLEL_RTS with DUMMY_BLOCKED_FETCH"
    // ngoq ngo'!!
    /* This includes RBH's and FMBQ's */
    StgBlockedFetch *bf = (StgBlockedFetch *)createBlockedFetch(ga, rga);
    IF_PAR_DEBUG(mpcomm,
		 debugBelch("%%%%++ Blocking Fetch ((%u, %d, %x)) on %p (%s)",
		       rga.pe, rga.slot, rga.weight, 
		       closure, info_type(closure)));

    /* Can we assert something on the remote GA? */
    // ASSERT(GALAlookup(&rga) == NULL);

    /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH
       closure into the BQ in order to denote that when updating this node
       the result should be sent to the originator of this fetch message. */
    IF_PAR_DEBUG(fetch,
		 debugBelch("%%%%++ Blocking Fetch ((%u, %d, %x)) on %p (%s)",
		       rga.pe, rga.slot, rga.weight, 
		       closure, info_type(closure)));

    // put it on the global blackhole_queue
    // ASSUMES: sched_mutex
    bf->link = blackhole_queue;
    blackhole_queue = bf;
#endif

/*     IF_PAR_DEBUG(mpcomm, */
/* 		 debugBelch("##++ processFetch: after block the global BHQ is:"); */
/* 		 print_bq()); */
  } else {			
    /* The target of the FetchMe is some local graph */
    nat size;
    OpCode unused_tag;
    // StgClosure *graph;
    rtsPackBuffer *buffer = (rtsPackBuffer *)NULL;
    //fprintf(stderr, "\n  F: main case \n "); 
    //fprintf(stderr,"%%%%== Fetch returned to sending PE; closure=%p (%s)\n",
    //  closure, info_type(closure));

#if defined(DEBUG)
    if (rga.pe == thisPE) { // TODO: this should be handled by the previous branch (disabled for now)
      IF_PAR_DEBUG(mpcomm,
		   debugBelch("%%%%== Fetch returned to sending PE; closure=%p (%s)\n",
			      closure, info_type(closure)));
      debugBelch("WARNING: cycle in fetching remote work: closure %p (%s) is a FETCH_ME to its own PE [%u]\n",
		 closure, info_type(closure), thisPE);  
      // for now, send a resume anyway 
    }
#endif  

    if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &unused_tag)) == NULL) {
      barf("processFetch: out of heap while packing graph; ToDo: call GC here");
      // GarbageCollect(GetRoots, rtsFalse); 
      // closure = GALAlookup(&ga);
      // buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.pe);
      // ASSERT(buffer != (rtsPackBuffer *)NULL);
    }
    sendResume(&rga, PACK_BUFFER_PAYLOAD_SIZE(buffer), buffer);
 
#if defined(PAR_TICKY)
    // Global statistics: count no. of fetches
    if (RtsFlags.ParFlags.ParStats.Global &&
	RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
      globalParStats.tot_resume_mess++;
    }
#endif
  }
}

/*
 * processFree unpacks a FREE message and adds the weights to our GAs.
 */
// FIXME: this needs a recvBuffer argument, supplied by MP_recv (see call in Schedule.c and processFish for an pattern of process* code
//@cindex processFree
void
processFree(long/*rtsPackBuffer*/ *recvBuffer, Capability *cap)
{
  int nelem;
  static StgWord *buffer;
  int i;
  globalAddr ga;

  buffer = (StgWord *)gumPackBuffer;
  unpackFree(recvBuffer ,&nelem, buffer);
  IF_PAR_DEBUG(mpcomm,
	       debugBelch("!!__ Rcvd Free (%d GAs)\n", nelem / 2));

  ga.pe = thisPE;
  for (i = 0; i < nelem;) {
    ga.weight = (rtsWeight) buffer[i++];
    ga.slot = (int) buffer[i++];
    /*IF_PAR_DEBUG(mpcomm,
                 { char str[MAX_GA_STR_LEN];
		   showGA(&ga, str);
		   debugBelch("!!-- Processing free for %s\n", str);};);*/
    (void) addWeight(&ga);
  }
}

/*
 * processResume unpacks a RESUME message into the graph, filling in
 * the LA -> GA, and GA -> LA tables. Threads blocked on the original
 * FetchMe (now a blocking queue) are awakened, and the blocking queue
 * is converted into an indirection.  Finally it sends an ACK in response
 * which contains any newly allocated GAs.
 */

// FIXME: this needs a recvBuffer argument, supplied by MP_recv (see call in Schedule.c and processFish for an pattern of process* code
//@cindex processResume
void
processResume(Capability *cap, nat sender, long/*rtsPackBuffer*/  *recvBuffer)
{
  int nelem;
  nat nGAs;
  static long/*rtsPackBuffer*/  *packBuffer;
  StgClosure *newGraphT, *old;
  globalAddr lga;
  globalAddr *gagamap;
#if defined(DEBUG)
  StgInfoTable *info;
  nat size, ptrs, nonptrs, vhs;
#endif

  ASSERT(LOOKS_LIKE_PE(sender));

  packBuffer = (long/*rtsPackBuffer*/  *)gumPackBuffer;
  unpackResume(recvBuffer,&lga, &nelem, packBuffer);
  ASSERT(LOOKS_LIKE_GA(&lga));
  IF_PAR_DEBUG(fetch,
               { char str[MAX_GA_STR_LEN];
	          showGA(&lga, str);
		  debugBelch("[]__ Rcvd Resume for %s \n", str);};); 

#if defined(PAR_TICKY)
      // Global statistics: recieved resumes
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	globalParStats.rec_resume_mess++;
      }
#endif

  /*
  IF_PAR_DEBUG(packet,
	       PrintPacket((rtsPackBuffer *)packBuffer));
  */
  /* 
   * We always unpack the incoming graph, even if we've received the
   * requested node in some other data packet (and already awakened
   * the blocking queue).
  if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) {
    ReallyPerformThreadGC(packBuffer[0], rtsFalse);
    SAVE_Hp -= packBuffer[0];
  }
   */

  // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

  /* Do this *after* GC; we don't want to release the object early! */

  if (lga.weight > 0) // TODO: check if this should be: > MIN_GA_WEIGHT
    (void) addWeight(&lga);

  old = UNTAG_CLOSURE(GALAlookup(&lga));

#if defined(DEBUG)
  /* only needed for sanity checks below */  
  info = get_closure_info(old, &size, &ptrs, &nonptrs, &vhs);

  if (!(get_itbl(old)->type == FETCH_ME || 
	get_itbl(old)->type == RBH)) {
    /* ToDo: Check whether this can be made an assertion */
     IF_DEBUG(sanity,
             {  char str[MAX_GA_STR_LEN];
	        showGA(&lga, str);
		debugBelch("[]** non-FETCH_ME-ish closure when receiving resume: %p (%s) GA %s\n",
		old, info_type_by_ip(info), str);};);
  }
#endif

  if (RtsFlags.ParFlags.ParStats.Full) {
    // StgBlockingQueueElement *bqe, *last_bqe;

    IF_PAR_DEBUG(fetch,
		 debugBelch("[]-- Resume is REPLY to closure %p\n", old));

    /* NB: now that all blocked TSOs are on the BHQ (rather than on a Q rooted
           at the FM closure) we can't dump a reply event here, since we don't
	   don't know the TSOs blocked on it directly. Instead we dump an
	   event when traversing the BHQ, finding a TSO that's BlockedOnFetch
	   and now has data in the closure_info field -- HWL GUM6 */
    // HACK: reply for a dummy TSO; best change .gr protocol and drop replies
    /* if (RtsFlags.ParFlags.ParStats.Full) 
      DumpRawGranEvent(thisPE, sender, 
		       GR_REPLY, 0, old,
		       0, spark_queue_len(cap));*/

    /* Write REPLY events to the log file, indicating that the remote
       data has arrived 
       NB: we emit a REPLY only for the *last* elem in the queue; this is
           the one that triggered the fetch message; all other entries
	   have just added themselves to the queue, waiting for the data 
	   they know that has been requested (see entry code for FETCH_ME_BQ)
    if ((get_itbl(old)->type == FETCH_ME_BQ ||
	 get_itbl(old)->type == RBH)) {
      for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue,
	   last_bqe = END_BQ_QUEUE;
	     get_itbl(bqe)->type==TSO || 
	     get_itbl(bqe)->type==BLOCKED_FETCH;
	   last_bqe = bqe, bqe = bqe->link) {  nothing  }

      ASSERT(last_bqe==END_BQ_QUEUE || 
	     get_itbl((StgClosure *)last_bqe)->type == TSO);

      // last_bqe now points to the TSO that triggered the FETCH 
      if (get_itbl((StgClosure *)last_bqe)->type == TSO)
	DumpRawGranEvent(thisPE, taskIDtoPE(sender), 
			 GR_REPLY, ((StgTSO *)last_bqe), ((StgTSO *)last_bqe)->block_info.closure,
			 0, spark_queue_len(&(MainRegTable.rSparks)));
    }
    */

  }

  newGraphT = UnpackGraph(packBuffer, &gagamap, &nGAs, NoPort/* UNUSED in GUM-style comm */, cap); // tagged!
  ASSERT(newGraphT != NULL);
  ASSERT(LOOKS_LIKE_CLOSURE_PTR(UNTAG_CLOSURE(newGraphT)));

  /* 
   * Sometimes, unpacking will common up the resumee with the
   * incoming graph, but if it hasn't, we'd better do so now.
   */
  if (get_itbl(old)->type == FETCH_ME) // UnpackGraph may have turned it into an IND already
    SORT_OF_SAFE_UPD_IND(old, UNTAG_CLOSURE(newGraphT)); // HWL TODO: add tag to the indirection

  IF_PAR_DEBUG(fetch,
	       { if (get_itbl(old)->type == IND_OLDGEN) {
		   debugBelch("[]** Warning: found IND_OLDGEN in old closure @ %p, pointing to unpacked graph @ %p\n",
			      old, UNTAG_CLOSURE(newGraphT));
		 }
	       });
  IF_PAR_DEBUG(fetch,
	       debugBelch("[]-- Ready to resume unpacked graph at %p (%s)\n",
			  UNTAG_CLOSURE(newGraphT), info_type(UNTAG_CLOSURE(newGraphT))));
  IF_PAR_DEBUG(pack,
	       IF_PAR_DEBUG(packet,
			    debugBelch("** PrintGraph of %p is:", UNTAG_CLOSURE(newGraphT));
			    PrintGraph(UNTAG_CLOSURE(newGraphT),0)));
	       

  IF_DEBUG(sanity, check_IND_cycle(UNTAG_CLOSURE(newGraphT)));

  IF_PAR_DEBUG(tables, 
 	       DebugPrintGAGAMap(gagamap, nGAs)); 

#if 0
  IF_DEBUG(sanity, 
	   { StgTSO *tso; 
             for (tso = blackhole_queue; 
		  tso != END_TSO_QUEUE; 
		  tso = tso->_link) {
	       if (old==tso->block_info.closure) {
		 debugBelch("%%%% TSO %d was blocked on OLD closure %p; expect it to be re-awoken",
			    tso->id, old);
		 if (get_itbl(UNTAG(tso->block_info.closure))->type==FETCH_ME) {
  		   char str[MAX_GA_STR_LEN];
  		   showGA(((StgFetchMe *)(UNTAG(tso->block_info.closure)))->ga, str);
		   debugBelch("%%%%     was blocked on FETCH_ME to GA %s ;",
			      str);
		 }
	       }
	     }
	   };);
#endif


  sendAck(sender, nGAs, gagamap);
}

/*
 * processSchedule unpacks a SCHEDULE message into the graph, filling
 * in the LA -> GA, and GA -> LA tables. The root of the graph is added to
 * the local spark queue.  Finally it sends an ACK in response
 * which contains any newly allocated GAs.
 */
// FIXME: this needs a recvBuffer argument, supplied by MP_recv (see call in Schedule.c and processFish for an pattern of process* code
//@cindex processSchedule
void
processSchedule(Capability *cap, nat sender,  long/*rtsPackBuffer*/  *recvBuffer)
{
  nat nelem, nGAs;
  StgInt success;
  static rtsPackBuffer *packBuffer;
  StgClosure *newGraph;
  globalAddr *gagamap;


  packBuffer = gumPackBuffer;		/* HWL */
  unpackSchedule(recvBuffer, &nelem, packBuffer);
  
  IF_PAR_DEBUG(fetch, // mpcomm,
	       debugBelch("--__ Rcvd Schedule (%d elems)\n", nelem));
  /*
  IF_PAR_DEBUG(packet,
	       PrintPacket(packBuffer));
    */
#if defined(PAR_TICKY)
      // Global statistics: received schedules
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	globalParStats.rec_schedule_mess++;
        totSCHEDULE++;
      }
#endif

  /*
   * For now, the graph is a closure to be sparked as an advisory
   * spark, but in future it may be a complete spark with
   * required/advisory status, priority etc.
   */

  /*
  space_required = packBuffer[0];
  if (SAVE_Hp + space_required >= SAVE_HpLim) {
    ReallyPerformThreadGC(space_required, rtsFalse);
    SAVE_Hp -= space_required;
    }*/
  
  // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!!

   newGraph = UnpackGraph((rtsPackBuffer*) packBuffer, &gagamap, &nGAs, NoPort/* UNUSED in GUM-style comm */, cap); 
   ASSERT(newGraph != NULL);
   ASSERT(LOOKS_LIKE_CLOSURE_PTR(newGraph));
   IF_DEBUG(sanity,
	    checkGAGAMap(gagamap, nGAs));
   success = newImportedSpark(cap, newGraph);
   // GUM4: success = add_to_spark_queue(newGraph, &(cap->r.rSparks));
   
  /* TODO: dump an event here */

  IF_PAR_DEBUG(fetch,
	       if (success)
  	         debugBelch("--^^  added spark to unpacked graph %p (%s)\n", 
			    newGraph, info_type(newGraph) /*, spark_queue_len(&(MainRegTable.rSparks)), mytid*/);
	       else
                 debugBelch("--^^  received non-sparkable closure %p (%s); nothing added to spark pool\n", 
			    newGraph, info_type(newGraph)/*, spark_queue_len(&(MainRegTable.rSparks)), mytid*/));
  IF_PAR_DEBUG(packet,
	       debugBelch("*<    Unpacked graph with root at %p (%s):\n", 
			  newGraph, info_type(newGraph)));

  IF_PAR_DEBUG(tables,
  	       DebugPrintGAGAMap(gagamap, nGAs));
  sendAck(sender, nGAs, gagamap); 
  //fishing = rtsFalse;
  //ASSERT(outstandingFishes>0); TODO: add assertion again!!
  outstandingFishes--;
}

/*
 * processAck unpacks an ACK, and uses the GAGA map to convert RBH's
 * (which represent shared thunks that have been shipped) into fetch-mes
 * to remote GAs.
 */
// FIXME: this needs a recvBuffer argument, supplied by MP_recv (see call in Schedule.c and processFish for an pattern of process* code
//@cindex processAck
void
processAck(Capability *cap , long/*rtsPackBuffer*/  *recvBuffer)
{
  nat nGAs;
  globalAddr *gaga;
  globalAddr *gagamap; // HWL CHECK: where is this buffer freed again?
#warning "wasteful handling of gagamap"
  gagamap = (globalAddr *) stgMallocBytes(RtsFlags.ParFlags.packBufferSize, "gagamap"); // HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK 
  unpackAck(recvBuffer,&nGAs, gagamap); 
  IF_PAR_DEBUG(fetch, //tables,
	       debugBelch(",,,, Rcvd Ack (%d pairs)\n", nGAs);
	       DebugPrintGAGAMap(gagamap, nGAs));

  IF_DEBUG(sanity,
	   checkGAGAMap(gagamap, nGAs));

#if defined(PAR_TICKY)
      // Global statistics: received acks
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	globalParStats.rec_ack_mess++;
	totACK++;
      }
#endif

  /*
   * For each (oldGA, newGA) pair, set the GA of the corresponding
   * thunk to the newGA, convert the thunk to a FetchMe, and return
   * the weight from the oldGA.
   */
  for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) {
    StgClosure *old_closure = GALAlookup(gaga);     // TAGGED
    StgClosure *new_closure = GALAlookup(gaga + 1); // TAGGED


    ASSERT(old_closure != NULL);
    if (new_closure == NULL) {
      /* We don't have this closure, so we make a fetchme for it */
      /*IF_PAR_DEBUG(weight,
                   { char str[MAX_GA_STR_LEN];
		   showGA(gaga+1, str);
		   debugBelch("  Ack: setting remote GA of closure %p (%s) to %s\n",
		   old_closure, info_type(old_closure), str);};);*/

      globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsFalse /* WAS: rtsTrue; HWL TODO: check whether rtsTrue is safe */);
     
      /* convertToFetchMe should be done unconditionally here.
	 Currently, we assign GAs to CONSTRs, too, (a bit of a hack),
	 so we have to check whether it is an RBH before converting

	 ASSERT(get_itbl(old_closure)==RBH);
      */
    if (get_itbl(UNTAG_CLOSURE(old_closure))->type==RBH)
       convertToFetchMe((StgRBH *)old_closure, ga);
    } else {
      /* 
       * Oops...we've got this one already; update the RBH to
       * point to the object we already know about, whatever it
       * happens to be.
       */
      ASSERT(old_closure != (StgClosure *)NULL && new_closure  != (StgClosure *)NULL);
      ASSERT(old_closure !=  new_closure);
      if (old_closure == new_closure) {
	belch("@@@@ WARNING: processAck: old_closure == new_closure: %s @ %p",
	      info_type(UNTAG_CLOSURE(old_closure)), old_closure);
      }
      
      ASSERT(IsBlackhole((UNTAG_CLOSURE(old_closure)))); // we can update only these closures
      SORT_OF_SAFE_UPD_IND(old_closure, new_closure);
      // WAS: CommonUp(old_closure, new_closure);
      
      /* 
       * Increase the weight of the object by the amount just
       * received in the second part of the ACK pair.
       */
      (void) addWeight(gaga + 1);
    } 
    (void) addWeight(gaga); 
  }

#if 0
  /* check the sanity of the LAGA and GALA tables after mincing them */
  IF_DEBUG(sanity, checkLAGAtable(rtsFalse));
#endif
}

static void
processUnexpected(StgInt p) {
  debugBelch("PANIC: unexpected packet at %p\n", &p);
  exit(1); // ToDo: proper shutdown
}


/*
 * waitForTermination enters a loop ignoring spurious messages while
 * waiting for the termination sequence to be completed.  
 */
//@cindex waitForTermination
void
waitForTermination(void)
{
  do {
    IF_PAR_DEBUG(mpcomm, debugBelch("  waitForTermination... \n"));  
    int p = GetPacket();
    processUnexpected(p);
  } while (rtsTrue);
}

//@cindex sendFreeMessages
void
sendFreeMessages(void)
{
  nat i;
  
  for (i = 0; i < nPEs; i++) 
    if (freeMsgIndex[i] > 0)
      sendFree(i, freeMsgIndex[i], freeMsgBuffer[i]); // allPEs is hidden in PVM/MPIComms.c
}

void
freeRemoteGA(int pe, globalAddr *ga)
{
  nat i;
 ASSERT(GALAlookup(ga) == NULL);
 
 if ((i =  freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) {
    IF_PAR_DEBUG(mpcomm,
		 debugBelch("!! Filled a free message buffer (sending remaining messages indivisually)"));	
    sendFree(ga->pe, i, freeMsgBuffer[pe]);
    i = 0;
  }

   freeMsgBuffer[pe][i++] = (rtsWeight) ga->weight; 
   freeMsgBuffer[pe][i++] = (StgWord) ga->slot;
   freeMsgIndex[pe] = i;
   IF_DEBUG(sanity, 
 	   ga->weight = 0xdead0add;
 	   ga->pe = 0xbbbbbbbb; 
 	   ga->slot = 0xbbbbbbbb;); 

}

/* Do we use sendMsg in GUM AAZ */
/* void sendMsg(OpCode tag,  */
/* 	     Port sendport, Port recvport,  */
/* 	     rtsPackBuffer* dataBuffer)  */
/* { fprintf(stderr,"Unimplemented HLComms function sendMsg\n"); exit(1); }; */

/*
 * Allocate space for message processing
 */

//@cindex gumPackBuffer
static rtsPackBuffer *gumPackBuffer;

/*
 * Allocate space for message processing
 */
//@cindex initMoreBuffers
rtsBool
initMoreBuffers(void)
{
  // Start value (for RR) and random-generator seed used to select
  // rFork/fish targets (lives in HLComms.c)
  //srand48(msTime()); // JB Hack: changed from time(NULL) * getpid() to msTime
  targetPE = (thisPE == nPEs) ? 1 : (thisPE + 1);

  // pack buffers for the parallel system always work in units of
  // StgWord. 
  // TODO: Packing/sending assumes sizeof(long) == sizeof(StgWord)
  gumPackBuffer = (rtsPackBuffer *)
    stgMallocBytes(sizeof(StgWord) * PACK_BUFFER_HDR_SIZEW + // in words
		   RtsFlags.ParFlags.packBufferSize, // given in bytes
		   "initMoreBuffers");
  return (gumPackBuffer != NULL);
}

#if 0
{
  if ((gumPackBuffer = (rtsPackBuffer *)stgMallocBytes(sizeof(StgWord)*RtsFlags.ParFlags.packBufferSize+1/*dbging*/, 
					     "initMoreBuffers")) == NULL)
    return rtsFalse;
  return rtsTrue;
}
#endif

# ifdef DEBUG
//@cindex DebugPrintGAGAMap
void DebugPrintGAGAMap(globalAddr *gagamap, int nGAs)
{
  nat i;
  
  for (i = 0; i < nGAs; ++i, gagamap += 2)
    debugBelch0("__ gagamap[%d] = ((%u, %d, %x)) -> ((%u, %d, %x))\n", i,
		gagamap[0].pe, gagamap[0].slot, gagamap[0].weight,
		gagamap[1].pe, gagamap[1].slot, gagamap[1].weight);
}

//@cindex checkGAGAMap
void
checkGAGAMap(globalAddr *gagamap, int nGAs)
{
  nat i;
  
  ASSERT(nGAs<RtsFlags.ParFlags.packBufferSize/6);
  for (i = 0; i < (nat)nGAs; ++i, gagamap += 2) {
    ASSERT(looks_like_ga(gagamap));
    ASSERT(looks_like_ga(gagamap+1));
  }
}
# endif

// -----------------------------------------------------------------------------
// ngoq ngo'
// DEAD CODE

#if 0
/* 
   We don't use the PendingFetches queue any more.
   Rather we pack and send a graph directly using packAndSendResume,
   called from Schedule.c::checkGhostTSOs 
*/

//@cindex processPendingFetches
void
processPendingFetches(Capability *cap) {
  StgTSO *tso, *next;
  StgClosure *closure;
  StgInfoTable *ip;
  globalAddr rga;
  static rtsPackBuffer *packBuffer;
  
/*   IF_PAR_DEBUG(fetch, */
/* 	       debugBelch("____ processPendingFetches: %d pending fetches (root @ %p)\n", */
/* 		     pending_fetches_len(), PendingFetches)); */
  
  for (tso = PendingFetches; 
       tso != END_TSO_QUEUE;
       tso=next) {
    /* the PendingFetches list contains only Ghost TSOs (these were BLOCKED_FETCH closures in GUM4) */
    ASSERT(get_itbl(tso)->type==TSO && 
	   isGhostTSO(tso));
    /* store link (we might overwrite it via blockFetch later on */
    next = tso->_link;

    /*
     * Find the target at the end of the indirection chain, and
     * process it in much the same fashion as the original target
     * of the fetch.  Though we hope to find graph here, we could
     * find a black hole (of any flavor) or even a FetchMe.
     */
    // TODO: check whether tso->block_info.closure is tagged
    closure = UNTAG_CLOSURE(tso->block_info.closure);
    /*
      We evacuate BQs and update the node fields where necessary in GC.c
      So, if we find an EVACUATED closure, something has gone Very Wrong
      (and therefore we let the RTS crash most ungracefully).
    */
    ASSERT(!IS_FORWARDING_PTR(closure));
    //  closure = ((StgEvacuated *)closure)->evacuee;
    ASSERT(UNTAG_CHECK(closure));
    closure = UNTAG_CLOSURE(UNWIND_IND(closure));
    //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; }

    ip = get_itbl(closure);
    if (ip->type == FETCH_ME) {
      /* Forward the Fetch to someone else */
      debugBelch("processPendingFetches: abusing blocked_exceptions field of TSO for GA in GhostTSOs!");

      /* THIS IS UNACCEPTABLE, since it abuses fields in a TSO; TODO: recode */
      rga.pe = ((globalAddr*)tso->blocked_exceptions)->pe;
      rga.slot = ((globalAddr*)tso->blocked_exceptions)->slot;
      rga.weight = ((globalAddr*)tso->blocked_exceptions)->weight;

      /* we are about to send off a FETCH message, so dump a FETCH event */
      /*DumpRawGranEvent(thisPE, 
			 rga.pe,
			 GR_FETCH, tso, tso->block_info.closure, 
	                 0, 0 spark_queue_len(cap));*/

      sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);

      // explicitly mark this Ghost TSO as garbage
#if 0
      // hardwired GC for Ghost TSOs; shouldn't be necessary!!!!
      tso->why_blocked==0x99;
      IF_PAR_DEBUG(paranoid,
		   debugBelch("__-> processPendingFetches: marking served Ghost TSO %d (%p) as 0x99 (GARBAGE)\n",
			 tso->id, tso));
#endif

#if defined(PAR_TICKY)
      // Global statistics: count no. of fetches
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	globalParStats.tot_fetch_mess++;
      }
#endif

      IF_PAR_DEBUG(fetch,
		   debugBelch("__-> processPendingFetches: Forwarding fetch from %lx to %lx\n",
			 thisPE, rga.pe));

    } else if (IsBlackhole(closure)) {
      /* put it back on BH Q; shouldn't happen, since no graph-red btw checkBHs and here*/
      IF_PAR_DEBUG(fetch,
		   debugBelch("__++ processPendingFetches: trying to send a BLACK_HOLE on closure $p (%s) => putting Ghost TSO %d (%p) back on the global BHQ\n",
			 closure, info_type(closure), tso->id, tso));
      setTSOLink (cap, tso, ghost_TSO_queue);
      ghost_TSO_queue = tso;
      /* WAS, but MUST use the above fct now (see comment in includes/TSO.h
      tso->link = blackhole_queue;
      blackhole_queue = tso;
      */
    } else {
      /* We now have some local graph to send back */
      OpCode unused_tag;

      packBuffer = gumPackBuffer;
      IF_PAR_DEBUG(fetch,
		   debugBelch("__*> processPendingFetches: PackNearbyGraph of closure %p (%s)\n",
			 closure, info_type(closure)));

      if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &unused_tag)) == NULL) {
	// Put current BF back on list	//bf->link = (StgBlockingQueueElement *)PendingFetches;
	//PendingFetches = (StgBlockedFetch *)bf;
	// ToDo: check that nothing more has to be done to prepare for GC!
	barf("PANIC: processPendingFetches: out of heap while packing graph; ToDo: call GC here\n");
	//GarbageCollect(GetRoots, rtsFalse); 
	//bf = PendingFetches;
	//PendingFetches = (StgBlockedFetch *)(bf->link);
	//closure = bf->node;
	//packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.pe);
	//ASSERT(packBuffer != (rtsPackBuffer *)NULL);
      }
      debugBelch("processPendingFetches: abusing blocked_exceptions field of TSO for GA in GhostTSOs!");

      /* THIS IS UNACCEPTABLE, since it abuses fields in a TSO; TODO: recode */
      rga.pe = ((globalAddr*)tso->blocked_exceptions)->pe;
      rga.slot = ((globalAddr*)tso->blocked_exceptions)->slot;
      rga.weight = ((globalAddr*)tso->blocked_exceptions)->weight;

      sendResume(&rga, PACK_BUFFER_PAYLOAD_SIZE(packBuffer), packBuffer);

      // explicitly mark this Ghost TSO as garbage
      // tso->why_blocked==0x99;
      IF_PAR_DEBUG(fetch,
		   debugBelch("__-> processPendingFetches: marking served Ghost TSO %d (%p) as 0x99 (GARBAGE)\n",
			 tso->id, tso));

#if defined(PAR_TICKY)
      // Global statistics: count no. of fetches
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	globalParStats.tot_resume_mess++;
      }
#endif
    }
  }
  PendingFetches = END_TSO_QUEUE;
}

/* 
   The list of pending fetches must be a root-list for GC.
   This routine is called from GC.c (same as marking GAs etc).
*/
void
markPendingFetches(evac_fn evac, rtsBool major_gc) {
  StgTSO **bf, *tso;

  /* We traverse the entire list, because we don't rely on the TSO scavenge
     code to go down the link field. */

  for (bf = &PendingFetches; 
       *bf != END_TSO_QUEUE;
       bf = &((*bf)->_link)) {
    // check whether it has been evacuated already?
    // if (!IS_FORWARDING_PTR((StgClosure*)bf))) { // (get_itbl(*bf)->type != EVACUATED) { // don't evac twice
    tso = *bf;

    ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
    evac((void*)NULL/*STG_UNUSED*/, bf); // evac(bf); // Updates PendingFetches
      
    IF_PAR_DEBUG(fetch,
		   debugBelch("@@@@ PendingFetches: evaced Ghost TSO %d (%s) from %p to %p (%s)\n",
			      tso->id, info_type(tso), tso, *bf, info_type(*bf)));
  }
}

//@cindex rebuildPendingFetches
void
rebuildPendingFetches(void) {
  StgTSO **bf, *tso;
  for (bf = &PendingFetches; 
       (*bf != END_TSO_QUEUE  ) ;
       ) {
    /* the PendingFetches list contains only Ghost TSOs */
    tso = (StgTSO *) isAlive((StgClosure *)*bf); //UNWIND_IND(((StgEvacuated *)bf)->evacuee);
    //fprintf(stderr,"   fixed EVACUATED closure ptr (%s)(%s) \n",info_type(*bf),info_type(tso) ) ; 


    if (tso != NULL) {
      IF_PAR_DEBUG(tables,
		   debugBelch("   Ghost TSO %d in PendingFetches list: %p (%s) (old ptr %p (%s))\n",
			   tso->id, tso, info_type(tso), *bf, info_type(*bf)));
      *bf = tso;
      bf = &((*bf)->_link);
      if (IS_FORWARDING_PTR((StgClosure*)tso->block_info.closure)) { // (get_itbl((StgClosure *)tso->block_info.closure)->type == EVACUATED) {
	StgClosure *c = tso->block_info.closure; //debugging
	tso->block_info.closure = (StgClosure *) isAlive(((StgClosure *)tso->block_info.closure));
	/* isAlive should perform the loop below:
	while (get_itbl(tso->block_info.closure)->type == EVACUATED) {
	  tso->block_info.closure = (StgClosure *) isAlive(((StgClosure *)tso->block_info.closure));
	}
	*/
	if (tso->block_info.closure == NULL) { // something is VERY wrong
	  IF_PAR_DEBUG(tables,
		       debugBelch("**** PANIC: rebuildPendingFetches: the closure on which Ghost TSO %d is blocked has become garbage (was %p (%s) before GC)\n",
				  tso->id, c, info_type(c)));
	} else {
	  ASSERT(!IS_FORWARDING_PTR((StgClosure*)tso->block_info.closure)); 
	  IF_PAR_DEBUG(tables,
		       debugBelch("   fixed EVACUATED closure ptr in Ghost TSO %d from %p (%s) to %p (%s)\n",
				  tso->id, c, info_type(c), tso->block_info.closure, info_type(tso->block_info.closure)));
	}
      }
    } else {
      *bf = (*bf)->_link;
    }
  }
}

//@cindex checkPendingFetches
void
checkPendingFetches(rtsBool in_gc) {
  StgTSO *bf, *next;
    
  for (bf = PendingFetches; 
       bf != END_TSO_QUEUE;
       bf=next) {
    /* the PendingFetches list contains only Ghost TSOs */
    if (in_gc) { 
      /* if we are in GC, check that all TSOs have been evacuated */
/*       ASSERT(get_itbl(bf)->type==EVACUATED && */
/* 	     bf->why_blocked==BlockedOnFetch); */
    } else {
      ASSERT(get_itbl(bf)->type==TSO && 
	     isGhostTSO(bf));
    }
    /* the blocked_exceptions field must be a valid GA */
    IF_DEBUG(sanity, ASSERT(LOOKS_LIKE_GA((globalAddr*)bf->blocked_exceptions)));
    /* the closure must not be a BH, otw the TSO would still be on the BHQ */
    ASSERT(!IsBlackhole(UNTAG_CLOSURE(bf->block_info.closure)));

    /* store link (we might overwrite it via blockFetch later on */
    next = bf->_link;
  }
}

#endif  /* 0 */

#if 0
// this should now be done in MP_sync (PVM/MPI specific) -- HWL GUM6
static void
processPeTids()
{ long newPE;
  nat i,sentPEs,currentPEs;
  
  currentPEs=nPEs;
  
  GetArgs(&sentPEs,1);
  ASSERT(sentPEs > currentPEs);
  ASSERT(sentPEs < MAX_PES); /* inforced by SysMan too*/  
  
  for (i = 0; i < sentPEs; i++) 
  { GetArgs(&newPE,1);
    if(i<currentPEs)
    { ASSERT(newPE == allPEs[i]);
    }
    else
    { 
/*       allPEs[i]=newPE;       */
      nPEs++;
      registerTask(newPE); 
    }
  }
}
#endif

#if 0
/*
 * processFetches constructs and sends resume messages for every
 * BlockedFetch which is ready to be awakened.
 * awaken_blocked_queue (in Schedule.c) is responsible for moving 
 * BlockedFetches from a blocking queue to the PendingFetches queue.
 */
void GetRoots(void);
// extern StgBlockedFetch *PendingFetches;

nat
pending_fetches_len(void)
{
  StgTSO *bf;
  nat n;

  for (n=0, bf=PendingFetches; bf != END_TSO_QUEUE; n++, bf = bf->link) {
    ASSERT(get_itbl((StgClosure*)bf)->type==TSO); //  || IS_FORWARDING_PTR((StgClosure*)bf)); 
  }
  return n;
}


// obsolete! see processPendingFetches -- HWL GUM6

//@cindex processFetches
void
processFetches(void) {
  StgBlockedFetch *bf, *next;
  StgClosure *closure;
  StgInfoTable *ip;
  globalAddr rga;
 

  static rtsPackBuffer *packBuffer;
 
  IF_PAR_DEBUG(fetch,
	       belch("____ processFetches: %d pending fetches (root @ %p)\n",
		     pending_fetches_len(), PendingFetches));
  
  for (bf = PendingFetches; 
       bf != END_BF_QUEUE;
       bf=next) {
    /* the PendingFetches list contains only BLOCKED_FETCH closures */
    ASSERT(get_itbl(bf)->type==BLOCKED_FETCH);
    /* store link (we might overwrite it via blockFetch later on */
    next = (StgBlockedFetch *)(bf->link);

    /*
     * Find the target at the end of the indirection chain, and
     * process it in much the same fashion as the original target
     * of the fetch.  Though we hope to find graph here, we could
     * find a black hole (of any flavor) or even a FetchMe.
     */
    closure = bf->node;
    /*
      We evacuate BQs and update the node fields where necessary in GC.c
      So, if we find an EVACUATED closure, something has gone Very Wrong
      (and therefore we let the RTS crash most ungracefully).
    */
    ASSERT(get_itbl(closure)->type != EVACUATED);
      //  closure = ((StgEvacuated *)closure)->evacuee;
    ASSERT(UNTAG_CHECK(closure)); 
    closure = UNWIND_IND_TGS(closure);
    //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; }

    ip = get_itbl(closure);
    if (ip->type == FETCH_ME) {
      /* Forward the Fetch to someone else */
      rga.pe = bf->ga.pe;
      rga.slot = bf->ga.slot;
      rga.weight = bf->ga.weight;
      
      sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */);

      // Global statistics: count no. of fetches
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	globalParStats.tot_fetch_mess++;
	totFETCH++;
      }

      IF_PAR_DEBUG(fetch,
		   belch("__-> processFetches: Forwarding fetch from %lx to %lx\n",
			 mytid, rga.pe));

    } else if (IS_BLACK_HOLE(closure)) {
      IF_PAR_DEBUG(fetch,
		   belch("__++ processFetches: trying to send a BLACK_HOLE => doing a blockFetch on closure %p (%s)\n",
			 closure, info_type(closure)));
      bf->node = closure;
      //blockFetch(bf, closure);
    } else {
      /* We now have some local graph to send back */
      nat size;

      packBuffer = gumPackBuffer;
      IF_PAR_DEBUG(fetch,
		   belch("__*> processFetches: PackNearbyGraph of closure %p (%s)\n",
			 closure, info_type(closure)));

      if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) {
	// Put current BF back on list
	// bf->link = (StgBlockingQueueElement *)PendingFetches;
	PendingFetches = (StgBlockedFetch *)bf;
	// ToDo: check that nothing more has to be done to prepare for GC!
	barf("processFetches: out of heap while packing graph; ToDo: call GC here");
	GarbageCollect(GetRoots, rtsFalse); 
	bf = PendingFetches;
	PendingFetches = (StgBlockedFetch *)(bf->link);
	closure = bf->node;
	packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size);
	ASSERT(packBuffer != (rtsPackBuffer *)NULL);
      }
      rga.pe = bf->ga.pe;
      rga.slot = bf->ga.slot;
      rga.weight = bf->ga.weight;
      
      sendResume(&rga, size, packBuffer);

      // Global statistics: count no. of fetches
      if (RtsFlags.ParFlags.ParStats.Global &&
	  RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
	globalParStats.tot_resume_mess++;
      }
    }
  }
  PendingFetches = END_BF_QUEUE;
}
#endif /* 0 */

/*
  NB: blockFetch doesn't exist any more; it's inlined into processFetch
  Also, we now don't use BLOCKED_FETCH closures but dummy TSOs there -- HWL GUM6
  NB: blockThread doesn't exist any more; it's inlined into checkBlackHoles
  in Schedule.c and if necessary a sendFetch is called there -- HWL GUM6
 */

/* 
 * allocate a BLOCKED_FETCH closure and fill it with the relevant fields
 * of the ga argument; called from processFetch when the local closure is
 * under evaluation
 */
#if 0
// !defined(DUMMY_BLOCKED_FETCH)
StgClosure *
createBlockedFetch (globalAddr ga, globalAddr rga)
{
  StgBlockedFetch *bf;
  StgClosure *closure;

  closure = GALAlookup(&ga);
  if ((bf = (StgBlockedFetch *)allocate(HEADERSIZE + sizeofW(StgBlockedFetch))) == NULL) {
    barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here");
    //GarbageCollect(GetRoots, rtsFalse); 
    //closure = GALAlookup(&ga);
    //bf = (StgBlockedFetch *)allocate(HEADERSIZE + sizeofW(StgBlockedFetch));
    // ToDo: check whether really guaranteed to succeed 2nd time around
  }

  ASSERT(bf != (StgBlockedFetch *)NULL);
  // SET_INFO((StgClosure *)bf, &stg_BLOCKED_FETCH_info);
  // ToDo: check whether other header info is needed
  bf->node = closure;
  bf->ga.pe = rga.pe;
  bf->ga.slot = rga.slot;
  bf->ga.weight = rga.weight;
  // bf->link = NULL;  debugging

  /* IF_PAR_DEBUG(fetch,
               {  char str[MAX_GA_STR_LEN];
	          showGA(&(bf->ga),str);
		  debugBelch("%%%%// created BF: bf=%p (%s) of closure , GA: %s",
		  bf, info_type((StgClosure*)bf), str);};);*/
  return (StgClosure *)bf;
}
#endif



#endif /* PARALLEL_RTS -- whole file */

//@index
//* FETCH::  @cindex\s-+FETCH
//* sendFetch::  @cindex\s-+sendFetch
//* unpackFetch::  @cindex\s-+unpackFetch
//* RESUME::  @cindex\s-+RESUME
//* sendResume::  @cindex\s-+sendResume
//* unpackResume::  @cindex\s-+unpackResume
//* ACK::  @cindex\s-+ACK
//* sendAck::  @cindex\s-+sendAck
//* unpackAck::  @cindex\s-+unpackAck
//* FISH::  @cindex\s-+FISH
//* sendFish::  @cindex\s-+sendFish
//* unpackFish::  @cindex\s-+unpackFish
//* FREE::  @cindex\s-+FREE
//* sendFree::  @cindex\s-+sendFree
//* unpackFree::  @cindex\s-+unpackFree
//* SCHEDULE::  @cindex\s-+SCHEDULE
//* sendSchedule::  @cindex\s-+sendSchedule
//* unpackSchedule::  @cindex\s-+unpackSchedule
//* freeMsgBuffer::  @cindex\s-+freeMsgBuffer
//* freeMsgIndex::  @cindex\s-+freeMsgIndex
//* prepareFreeMsgBuffers::  @cindex\s-+prepareFreeMsgBuffers
//* processPendingFetches::  @cindex\s-+processPendingFetches
//* offLoadSpark::  @cindex\s-+offLoadSpark
//* processFetch::  @cindex\s-+processFetch
//* processFree::  @cindex\s-+processFree
//* processResume::  @cindex\s-+processResume
//* processSchedule::  @cindex\s-+processSchedule
//* processAck::  @cindex\s-+processAck
//* createBlockedFetch::  @cindex\s-+createBlockedFetch
//* waitForTermination::  @cindex\s-+waitForTermination
//* sendFreeMessages::  @cindex\s-+sendFreeMessages
//* gumPackBuffer::  @cindex\s-+gumPackBuffer
//* initMoreBuffers::  @cindex\s-+initMoreBuffers
//@end index

