/* ---------------------------------------------------------------------------- * Time-stamp: * * High Level Communications Routines (HLComms.c) * * Contains the high-level routines (i.e. communication * subsystem independent) used by GUM * * GUM 0.2x: Phil Trinder, Glasgow University, 12 December 1994 * GUM 3.xx: Phil Trinder, Simon Marlow July 1998 * GUM 4.xx: H-W. Loidl, Heriot-Watt University, November 1999 - * * GenPar 6.x: J.Berthold, Philipps-Universität Marburg 2006 * this file contains routines for *data* communication only * ------------------------------------------------------------------------- */ /* ------------------------------------------------------- This version is the inital GUM-Eden merge developed just aftyer the Hackathon, St Andrews, Dec 2009 ------------------------------------------------------- */ #ifdef PARALLEL_RTS /* whole file */ #include "Rts.h" #include "RtsUtils.h" #include "RtsFlags.h" #include "sm/Storage.h" // allocation etc. #include "Updates.h" // UPD_IND #include "HLC.h" #include "RTTables.h" #include "PEOpCodes.h" #include "Sparks.h" // for spark pool management #include "Capability.h" // for sparkPoolSizeCap #include "Capability.h" //#include "StgMiscClosures.h" // END_TSO_QUEUE etc //#include "DerivedConstants.h" // STDR_HDR_SIZE #include "ParallelRts.h" // #include "RtsTypes.h" #if !defined(DUMMY_BLOCKED_FETCH) # error "PANIC: DUMMY_BLOCKED_FETCH is off, but GUM6 does not support BLOCKED_FETCH nodes any more (replaced by Ghost TSOs)" #endif // #include "Parallel.h" #include "ParallelDebug.h" #include "Sparks.h" // #include "FetchMe.h" // for BLOCKED_FETCH_info etc // JB HACK // #include "SchedAPI.h" // createThread for GhostTSOs //@cindex freeMsgBuffer static StgWord **freeMsgBuffer = NULL; //@cindex freeMsgIndex static nat *freeMsgIndex = NULL; nat choosePE(void); /* debugging */ extern rtsBool check_IND_cycle(StgClosure *pT); /* dummy settings */ // HWL NO: this is now in ParallelRts.h // extern Port RtsPort, NoPort; extern StgTSO *blackhole_queue; extern StgTSO *ghost_TSO_queue; // Global.c // void setFetchedLA (StgTSO *t); // gumPackBuffer: for receiving messages static rtsPackBuffer *gumPackBuffer; // variable for round-robin distribution, will start at thisPE + 1 // (set by initMoreBuffers) // static // JB: conflict with Parallel.h // defined in DataComms.c // nat targetPE = 0; /* * ChoosePE selects a PE number between 1 and nPEs, either 'at * random' or round-robin, starting with thisPE+1 */ nat choosePE(void) { nat temp; IF_PAR_DEBUG(paranoia, debugBelch("choosePE (current = %d)\n", targetPE)); if (RtsFlags.ParFlags.placement & 1) // random temp = 1 + (nat) (lrand48() % nPEs); else {// round-robin temp = targetPE; targetPE = (targetPE >= nPEs) ? 1 : (targetPE + 1); } if ((RtsFlags.ParFlags.placement & 2) // no local placement && (temp == thisPE)) { temp = (temp == nPEs) ? 1 : (temp + 1); } IF_PAR_DEBUG(paranoia, debugBelch("chosen: %d, new targetPE == %d\n", temp,targetPE)); return temp; } #if 0 StgTSO * createGhostTSO(Capability *cap, StgClosure *closure, globalAddr rga) { StgTSO *t; globalAddr *new_rga; /* This includes (seq) BHs and RBHs */ // closure = GALAlookup(&ga); t = createThread (cap, 0/*stack_size*/); /* create a Ghost TSO; GUM-6x -- HWL */ t->block_info.closure = closure; /* ptr to local closure; GUM-6x -- HWL */ t->what_next = ThreadNeverDoAnything; // this marks a ghost TSO t->why_blocked = BlockedOnRemoteFetch; //MKA BlockedOnFetchMe /*BlockedOnRemoteFetch*/; /* characterises Ghost TSO; GUM-6x -- HWL */ /* we create a GALA entry, which we can point to from the Ghost TSO */ new_rga = setRemoteGA(NULL, &rga, rtsTrue); // create GALA entry with this given rga // ASSERT: GALA entry is at the head of outPtrs list // setFetchedLA(t); // HACK: the la in the liveRemoteGAs list points to the Ghost TSO we created; // UNACCEPTABLE HWL HACK: we abuse a TSO field of the TSO to hold a ptr to the GA t->blocked_exceptions = (struct StgTSO_ *)new_rga; // bad choice of field // t->blocked_exceptions = (struct StgTSO_ *)new_rga; // LAGAlookup(closure); // ASSERT(LOOKS_LIKE_GA((globalAddr*)t->block_info.tso)); /* copy the GA of the demander of this value t->ga.pe = rga.pe; t->ga.slot = rga.slot; t->ga.weight = rga.weight; */ // bf->link = NULL; debugging IF_PAR_DEBUG(fetch, // mpcomm, { char str1[MAX_GA_STR_LEN]; char str2[MAX_GA_STR_LEN]; showGA(&rga, str1); showGA(new_rga, str2); debugBelch("%%%%++ Created Ghost TSO %d (%p) for closure %p (%s) with GA %s (new_rga %s) @ %p\n", t->id, t, closure, info_type(closure), str1, str2, new_rga);};); /* Can we assert something on the remote GA? */ // ASSERT(GALAlookup(&rga) == NULL); return t; } #endif #ifdef DEBUG // JB need forward declarations for my gcc (4.0) void DebugPrintGAGAMap(globalAddr *gagamap, int nGAs); #endif /* * SendFetch packs the two global addresses and load info a message + * sends it. //@cindex FETCH Structure of a FETCH message: | GA 1 | GA 2 | +------------------------------------+------+ | gtid | slot | weight | gtid | slot | load | +------------------------------------+------+ */ //@cindex sendFetch void sendFetch(globalAddr *rga, globalAddr *lga, int load) { ASSERT(LOOKS_LIKE_GA(lga)); ASSERT(LOOKS_LIKE_GA(rga)); ASSERT(rga->weight >= (rtsWeight)MIN_GA_WEIGHT); // TODO: is the weight of the local GA always 0? ASSERT(lga->weight == (rtsWeight)0); IF_PAR_DEBUG(fetch, // mpcomm, { char str1[MAX_GA_STR_LEN]; char str2[MAX_GA_STR_LEN]; showGA(rga, str1); showGA(lga, str2); debugBelch("~^** Sending Fetch for %s; locally %s, load = %d\n", str1, str2, load);};); sendOpV(PP_FETCH, rga->pe, 6, // FIXME: PEs numbered 1..n but sendOp expects 0..n-1 (StgWord) rga->pe, (StgWord) rga->slot, (StgWord) lga->weight, (StgWord) lga->pe, (StgWord) lga->slot, (StgWord) load); } /* * unpackFetch unpacks a FETCH message into two Global addresses and a load * figure. */ //@cindex unpackFetch static void unpackFetch(long/*rtsPackBuffer*/ *buf, globalAddr *lga, globalAddr *rga, int *load) { // NB: low-level header size is 2: OpCode, target PE buf++; buf++; lga->weight = (rtsWeight)1; // dummy lga->pe = (nat) buf[0]; lga->slot = (nat) buf[1]; ASSERT(LOOKS_LIKE_GA(lga)); ASSERT(lga->pe==thisPE); rga->weight = (rtsWeight) buf[2]; rga->pe = (nat) buf[3]; rga->slot = (nat) buf[4]; ASSERT(LOOKS_LIKE_GA(lga)); ASSERT(LOOKS_LIKE_PE(rga->pe)); *load = (int) buf[5]; ASSERT(*load == 0); // load unused at the moment; drop this assertion once it is used IF_PAR_DEBUG(fetch, // mpcomm, { char str1[MAX_GA_STR_LEN]; char str2[MAX_GA_STR_LEN]; showGA(lga, str1); showGA(rga, str2); debugBelch("~^** Unpacking Fetch for %s, to be stored in %s, load = %d", str1, str2);};); // TODO: can we assert something like this? ASSERT(rga->weight >= (rtsWeight)MIN_GA_WEIGHT); } /* * SendResume packs the remote blocking queue's GA and data into a message * and sends it. //@cindex RESUME Structure of a RESUME message: ------------------------------- | weight | slot | n | data ... ------------------------------- data is a packed graph represented as an rtsPackBuffer n is the size of the graph (as returned by PackNearbyGraph) + packet hdr size */ //@cindex sendResume void sendResume(globalAddr *rga, int nelem, rtsPackBuffer *packBuffer) { IF_PAR_DEBUG(fetch, // mpcomm, { char str1[MAX_GA_STR_LEN]; showGA(rga, str1); debugBelch("^^[] Sending Resume to %s to [%u] (packet <<%d>> with %d elems)\n", str1, rga->pe, packBuffer->id, nelem);};); /* IF_PAR_DEBUG(packet, PrintPacket(packBuffer)); */ ASSERT(nelem==packBuffer->size); /* check for magic end-of-buffer word */ //IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER)); //fprintf(stderr,"\n\n op = %x \n task = %d \n arg2 = %d \n", PP_RESUME, rga->pe,(nelem + PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM) ); sendOpNV(PP_RESUME, rga->pe, // FIXME: PEs numbered 1..n but sendOp expects 0..n-1 nelem + PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM, (StgPtr)packBuffer, 3, (StgWord)rga->weight, (StgWord)rga->pe, (StgWord)rga->slot); // 2, (rtsWeight) rga->weight, (StgWord) rga->slot); } /* * unpackResume unpacks a Resume message into two Global addresses and * a data array. */ //@cindex unpackResume static void unpackResume(long/*rtsPackBuffer*/ *buf, globalAddr *lga, int *nelem, rtsPackBuffer *packBuffer) { /* RESUME event is written in awaken_blocked_queue DumpRawGranEvent(thisPE, taskIDtoPE(lga->pe), GR_RESUME, END_TSO_QUEUE, (StgClosure *)NULL, 0, 0); */ // NB: low-level header size is 2: OpCode, target PE buf++; buf++; /* GA of the closure, whose evaluation we resume */ lga->weight = (rtsWeight) buf[0]; lga->pe = buf[1]; lga->slot = (int) buf[2]; ASSERT(lga->pe==thisPE); ASSERT(LOOKS_LIKE_GA(lga)); *nelem = (int) buf[3] - PACK_BUFFER_HDR_SIZEW - DEBUG_HEADROOM; //GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM); memcpy(packBuffer,buf+4,(*nelem+ PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM)*sizeof(long)); IF_PAR_DEBUG(fetch, // mpcomm, { char str1[MAX_GA_STR_LEN]; showGA(lga, str1); debugBelch("~^[] Unpacking Resume (packet <<%d>> with %d elems) for %s\n", // ((%u, %d, %x))\n", packBuffer->id, *nelem, str1);};); // thisPE, (int) buf[2], (unsigned) buf[1])); /* check for magic end-of-buffer word */ // TODO: RE-ENABLE: IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER)); } /* * SendAck packs the global address being acknowledged, together with * an array of global addresses for any closures shipped and sends them. //@cindex ACK Structure of an ACK message: | GA 1 | GA 2 | +---------------------------------------------+------- | weight | PE | slot | weight | PE | slot | ..... ngas times + --------------------------------------------+------- */ //@cindex sendAck void sendAck(nat pe, int ngas, globalAddr *gagamap) { static long *buffer; long *p; int i; ASSERT(LOOKS_LIKE_PE(pe)); if(ngas==0) return; //don't send unnecessary messages!! buffer = (long *) gumPackBuffer; for(i = 0, p = buffer; i < ngas; i++, p += 6) { // ASSERT((int)gagamap[1].weight >= MIN_GA_WEIGHT ); ASSERT(LOOKS_LIKE_PE(gagamap->pe)); ASSERT(LOOKS_LIKE_GA(gagamap)); p[0] = (long) gagamap->weight; p[1] = (long) gagamap->pe; p[2] = (long) gagamap->slot; gagamap++; ASSERT(LOOKS_LIKE_PE(gagamap->pe)); ASSERT(LOOKS_LIKE_GA(gagamap)); p[3] = (long) gagamap->weight; p[4] = (long) gagamap->pe; p[5] = (long) gagamap->slot; ASSERT(p[4] == thisPE); // the new GA must be on this PE! gagamap++; } IF_PAR_DEBUG(mpcomm, debugBelch("~^,, Sending Ack (%d pairs) to [%u]\n", ngas, pe)); sendOpN(PP_ACK, pe, p - buffer, (StgPtr)buffer); // FIXME: PEs numbered 1..n but sendOp expects 0..n-1 } /* * unpackAck unpacks an Acknowledgement message into a Global address, * a count of the number of global addresses following and a map of * Global addresses */ //@cindex unpackAck static void unpackAck(long/*rtsPackBuffer*/ *buf, int *ngas, globalAddr *gagamap) { // TODO: nuke GetArgs and read directly from buf long GAarraysize; //long buf[6]; // NB: low-level header size is 2: OpCode, target PE buf++; buf++; //GetArgs(&GAarraysize, 1); GAarraysize = buf[0] ; ASSERT(GAarraysize <= RtsFlags.ParFlags.packBufferSize); *ngas = GAarraysize / 6; /* TODO: nuke MAGIC constant */ IF_PAR_DEBUG(mpcomm, debugBelch("~^,, Unpacking Ack (%d pairs) on [%x]\n", *ngas, thisPE)); // skip size field buf++; while (GAarraysize > 0) { // GetArgs(buf, 6); gagamap->weight = (rtsWeight) buf[0]; gagamap->pe = (nat) buf[1]; gagamap->slot = (nat) buf[2]; // @@@@@@@@@@@@@@@@@@@@@@ TODO: ASSERT(buf[1] == thisPE); // ENABLE the old GA must be on this PE! if (buf[1] != thisPE) { char str1[MAX_GA_STR_LEN]; showGA(gagamap, str1); belch("@@@@ WARNING: unpackAck: old GA is not on this PE [%u], but on [%u]: %s", thisPE, buf[1], str1); } ASSERT(LOOKS_LIKE_PE(gagamap->pe)); ASSERT(LOOKS_LIKE_GA(gagamap)); gagamap++; gagamap->weight = (rtsWeight) buf[3]; gagamap->pe = (nat) buf[4]; gagamap->slot = (nat) buf[5]; ASSERT(LOOKS_LIKE_PE(gagamap->pe)); ASSERT(LOOKS_LIKE_GA(gagamap)); ASSERT(gagamap->weight >= (rtsWeight)MIN_GA_WEIGHT); gagamap++; GAarraysize -= 6; buf+=6; } } /* * SendFish packs the global address being acknowledged, together with * an array of global addresses for any closures shipped and sends them. //@cindex FISH Structure of a FISH message: +----------------------------------+ | orig PE | age | history | hunger | +----------------------------------+ */ //@cindex sendFish void sendFish(nat destPE, nat origPE, int age, int history, int hunger) { IF_PAR_DEBUG(fish, debugBelch("$$>> Sending Fish from [%u] to [%u] (%d outstanding fishes)\n", origPE, destPE, outstandingFishes)); ASSERT(LOOKS_LIKE_PE(destPE)); ASSERT(LOOKS_LIKE_PE(origPE)); ASSERT(hunger==RtsFlags.ParFlags.fishHunger); // sanity check sendOpV(PP_FISH, destPE,(int) 4, // FIXME: PEs numbered 1..n but sendOp expects 0..n-1 (StgWord) origPE, (StgWord) age, (StgWord) history, (StgWord) hunger); if (origPE == thisPE) { //fishing = rtsTrue; outstandingFishes++; } } /* * unpackFish unpacks a FISH message into the global task id of the * originating PE and 3 data fields: the age, history and hunger of the * fish. The history + hunger are not currently used. */ //@cindex unpackFish static void unpackFish(long/*rtsPackBuffer*/ *buf, nat *origPE, int *age, int *history, int *hunger) { // NB: low-level header size is 2: OpCode, target PE buf++; buf++; IF_PAR_DEBUG(fish, debugBelch("$$__ Unpacking Fish from [%u] (age=%d, hunger=%d); time=%ld; spark pool size=??\n", buf[0], (int) buf[1],(int) buf[2], msTime()/*CURRENT_TIME*/ /*,spark_queue_len(&(MainRegTable.rSparks))*/)); // *opcode = (OpCode) buf[0]; *origPE = (nat) buf[0]; *age = (int) buf[1]; *history = (int) buf[2]; *hunger = (int) buf[3]; ASSERT(*hunger==RtsFlags.ParFlags.fishHunger); // sanity check ASSERT(LOOKS_LIKE_PE(*origPE)); } /* * SendFree sends (weight, slot) pairs for GAs that we no longer need * references to. //@cindex FREE Structure of a FREE message: +----------------------------- | n | weight_1 | slot_1 | ... +----------------------------- */ //@cindex sendFree void sendFree(nat pe, int nelem, StgPtr data) { ASSERT(LOOKS_LIKE_PE(pe)); #if !defined(DUMMY_FREE) IF_PAR_DEBUG(mpcomm, debugBelch("~^!! Sending Free (%d GAs) to [%u]", nelem/2, pe)); #if defined(PAR_TICKY) if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { totFREE++;} #endif sendOpN(PP_FREE, pe, nelem, data); // FIXME: PEs numbered 1..n but sendOp expects 0..n-1 #else IF_PAR_DEBUG(mpcomm, debugBelch("~^!! HACK HACK HACK HACK HACK : refusing to send Free message (%d GAs) to [%u]\n", nelem/2, pe)); // HWL HACK: don't send any free messages at all #endif } /* * unpackFree unpacks a FREE message into the amount of data shipped and * a data block. */ //@cindex unpackFree static void unpackFree(long/*rtsPackBuffer*/ *buf, int *nelem, StgWord *data) { *nelem = (int) buf[0]; IF_PAR_DEBUG(mpcomm, debugBelch("~^!! Unpacking Free (%d GAs)", *nelem/2)); // GetArgs(data, *nelem); memcpy(data,buf+2,(*nelem)*sizeof(long)); } /* * SendSchedule sends a closure to be evaluated in response to a Fish * message. The message is directed to the PE that originated the Fish * (origPE), and includes the packed closure (data) along with its size * (nelem). //@cindex SCHEDULE Structure of a SCHEDULE message: +------------------------------------ | PE | n | pack buffer of a graph ... +------------------------------------ */ //@cindex sendSchedule void sendSchedule(nat origPE, int nelem, rtsPackBuffer *packBuffer) { IF_PAR_DEBUG(mpcomm, debugBelch("~^-- Sending Schedule (packet <<%d>> with %d elems) to [%u]\n", packBuffer->id, nelem, origPE)); ASSERT(LOOKS_LIKE_PE(origPE)); /* IF_PAR_DEBUG(packet, PrintPacket(packBuffer)); */ // TODO: re-enable this assertion; currently FAILS!! // maybe we need to add the PACK_BUFFER_HDR_SIZEW here too? -- HWL /* check for magic end-of-buffer word */ //IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+nelem) == END_OF_BUFFER_MARKER)); sendOpN(PP_SCHEDULE, origPE, // FIXME: PEs numbered 1..n but sendOp expects 0..n-1 nelem + PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM, (StgPtr)packBuffer); } /* * unpackSchedule unpacks a SCHEDULE message into the Global address of * the closure shipped, the amount of data shipped (nelem) and the data * block (data). */ //@cindex unpackSchedule static void unpackSchedule(long/*rtsPackBuffer*/ *buf, int *nelem, rtsPackBuffer *packBuffer) { // long buf[1]; /* first, just unpack 1 word containing the total size (including header) */ //GetArgs(buf, 1); /* no. of elems, not counting the header of the pack buffer */ *nelem = (int) buf[2]- PACK_BUFFER_HDR_SIZEW - DEBUG_HEADROOM; // GetArgs(packBuffer, *nelem + PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM); memcpy(packBuffer,buf+3,(*nelem + PACK_BUFFER_HDR_SIZEW + DEBUG_HEADROOM)*sizeof(long)); /* automatic cast of flat pvm-data to rtsPackBuffer */ IF_PAR_DEBUG(pack, debugBelch("~^-- Unpacking Schedule (packet <<%d>> with %d elems) on [%u]\n", packBuffer->id, *nelem, thisPE)); // ASSERT(*nelem==packBuffer->size); /* check for magic end-of-buffer word */ // IF_DEBUG(sanity, ASSERT(*(packBuffer->buffer+*nelem) == END_OF_BUFFER_MARKER)); } //@node Message-Processing Functions, GUM Message Processor, GUM Message Sending and Unpacking Functions, High Level Communications Routines //@subsection Message-Processing Functions /* * Message-Processing Functions * * The following routines process incoming GUM messages. Often reissuing * messages in response. * * processFish unpacks a fish message, reissuing it if it's our own, * sending work if we have it or sending it onwards otherwise. */ //@cindex prepareFreeMsgBuffers void prepareFreeMsgBuffers(void) { nat i; /* Allocate the freeMsg buffers just once and then hang onto them. */ if (freeMsgIndex == NULL) { freeMsgIndex = (nat *) stgMallocBytes(nPEs * sizeof(nat), "prepareFreeMsgBuffers (Index)"); freeMsgBuffer = (StgWord **) stgMallocBytes(nPEs * sizeof(long *), "prepareFreeMsgBuffers (Buffer)"); for(i = 0; i < nPEs; i++) if (i != (thisPE-1)){ freeMsgBuffer[i] = (StgPtr) stgMallocBytes(sizeof(StgWord)*RtsFlags.ParFlags.packBufferSize, "prepareFreeMsgBuffers (Buffer #i)"); } else { freeMsgBuffer[i] = 0; } } /* Initialize the freeMsg buffer pointers to point to the start of their buffers */ for (i = 0; i < nPEs; i++) freeMsgIndex[i] = 0; } void packAndSendResume(StgClosure *closure, GALA *bf) { StgTSO *next; StgInfoTable *ip; globalAddr rga; static rtsPackBuffer *packBuffer; OpCode unused_tag; ASSERT(UNTAG_CHECK(closure)); packBuffer = gumPackBuffer; if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &unused_tag)) == NULL) { // Put current BF back on list //bf->link = (StgBlockingQueueElement *)PendingFetches; //PendingFetches = (StgBlockedFetch *)bf; // ToDo: check that nothing more has to be done to prepare for GC! barf("PANIC: packAndSendResume: out of heap while packing graph; ToDo: call GC here\n"); //GarbageCollect(GetRoots, rtsFalse); //bf = PendingFetches; //PendingFetches = (StgBlockedFetch *)(bf->link); //closure = bf->node; //packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.pe); //ASSERT(packBuffer != (rtsPackBuffer *)NULL); } rga = bf->ga; /* THIS IS UNACCEPTABLE, since it abuses fields in a TSO; TODO: recode rga.pe = ((globalAddr*)tso->blocked_exceptions)->pe; rga.slot = ((globalAddr*)tso->blocked_exceptions)->slot; rga.weight = ((globalAddr*)tso->blocked_exceptions)->weight; */ ASSERT(LOOKS_LIKE_PE(rga.pe)); ASSERT(LOOKS_LIKE_GA(&rga)); IF_PAR_DEBUG(fetch, //mpcomm, { char str1[MAX_GA_STR_LEN]; char str2[MAX_GA_STR_LEN]; globalAddr *root = LAGAlookup(closure); showGA(root, str1); showGA(&rga, str2); debugBelch("__*> packAndSendResume: sending closure %p (%s) with GA %s to GA %s\n", closure, info_type(closure), str1, str2);};); sendResume(&rga, PACK_BUFFER_PAYLOAD_SIZE(packBuffer), packBuffer); /* explicitly mark this Ghost TSO as garbage tso->what_next = 55; // UNACCEPTABLE HWL HACK: constant 55 to identify GARBAGE ghost TSO IF_PAR_DEBUG(fetch, // mpcomm, debugBelch("__-> packAndSendResume: marking served Ghost TSO %d (%p) as GARBAGE (what_next=55)\n", tso->id, tso)); */ #if defined(PAR_TICKY) // Global statistics: count no. of fetches if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.tot_resume_mess++; } #endif } // Contents has been unpacked into recvBuffer by MP_recv already void processFish(Capability *cap, OpCode opcode, nat origPE, long *recvBuffer) { nat ope; int age, history, hunger; StgClosure *spark = NULL; static rtsPackBuffer *packBuffer; #if defined(PAR_TICKY) // Global statistics: received fishes if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.rec_fish_mess++; totFISH++; } #endif unpackFish(recvBuffer, &origPE, &age, &history, &hunger); /* inlined version IF_PAR_DEBUG(fish, debugBelch("$$__ Fish from [%u] (age=%d); time=%ld; spark pool size=??\n", origPE, (int) recvBuffer[1], msTime())); ope = (nat) recvBuffer[0]; // redundant age = (int) recvBuffer[1]; history = (int) recvBuffer[2]; hunger = (int) recvBuffer[3]; */ ASSERT(LOOKS_LIKE_PE(thisPE)); ASSERT(LOOKS_LIKE_PE(origPE)); if (origPE == thisPE) { //fishing = rtsFalse; // fish has come home outstandingFishes--; last_fish_arrived_at = msTime()/*CURRENT_TIME*/; // remember time (see schedule fct) return; // that's all } ASSERT(origPE != thisPE); int mus =1 ; // GUM4: while ((spark = findSpark(cap /*, rtsTrue (for_export)??*/)) != NULL) { //IF_PAR_DEBUG(verbose, fprintf(stderr,"\n\n createSparkThread: estim. spark pool size = %d \n\n", sparkPoolSizeCap(cap))); // we only try to get a spark, if the spark pool size is above the low watermark if ((sparkPoolSizeCap(cap) >= RtsFlags.ParFlags.lowWater ) && ((spark = tryStealSpark(cap)) != NULL)){ OpCode unused_tag; // StgClosure *graph; // fprintf(stderr," found spark: remaining spark pool size = %d \n",sparkPoolSizeCap(cap) ); IF_PAR_DEBUG(verbose, debugBelch(" found spark: remaining spark pool size = %d \n",sparkPoolSizeCap(cap) )); packBuffer = gumPackBuffer; // TODO: check and enable this assertion; tryStealSpark should only return something worth sparking // ASSERT(closure_SHOULD_SPARK((StgClosure *)spark)); // add event to profile //if (RtsFlags.ParFlags.ParStats.Sparks) /*DumpRawGranEvent(thisPE, origPE, SP_EXPORTED, ((StgTSO *)NULL), spark, 0, spark_queue_len(cap));*/ if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &unused_tag)) == NULL) { IF_PAR_DEBUG(fish, debugBelch("$$ GC while trying to satisfy FISH via PackNearbyGraph of node %p\n", (StgClosure *)spark)); // TODO: enable GC code here barf("processFish: out of heap while packing graph; ToDo: call GC here\n"); // GarbageCollect(GetRoots, rtsFalse); /* Now go back and try again */ } else { IF_PAR_DEBUG(verbose, if (RtsFlags.ParFlags.ParStats.Sparks) debugBelch("==== STEALING spark %x; sending to %x\n", spark, origPE)); IF_PAR_DEBUG(fish, debugBelch("$$-- Replying to FISH from %x by sending graph @ %p (%s)\n", origPE, (StgClosure *)spark, info_type((StgClosure *)spark))); sendSchedule(origPE,(int) PACK_BUFFER_PAYLOAD_SIZE(packBuffer), packBuffer); // disposeSpark(spark); // not needed in GUM6, I think -- HWL GUM6 #if defined(PAR_TICKY) // Global statistics: count no. of fetches if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.tot_schedule_mess++; } #endif } } if (spark == (StgClosure *)NULL) { // either no spark or below LWM IF_PAR_DEBUG(fish, debugBelch("$$^^ No sparks available for FISH from %x\n", origPE)); /* We have no sparks to give */ if (age < RtsFlags.ParFlags.fishLifeExpectancy) { // FISH_LIFE_EXPECTANCY) { /* if the fish is still young, send it to another PE to look for work */ // TODO: implement RtsFlags.ParFlags.fishForwardDelay IF_PAR_DEBUG(verbose, if (RtsFlags.ParFlags.fishForwardDelay>0) debugBelch("**** WARNING: RtsFlags.ParFlags.fishForwardDelay not implemented, but delay of %lu requested\n", RtsFlags.ParFlags.fishForwardDelay)); sendFish(choosePE(), origPE, (age + 1), NEW_FISH_HISTORY, RtsFlags.ParFlags.fishHunger); // NEW_FISH_HUNGER); #if defined(PAR_TICKY) // Global statistics: count no. of fetches if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.tot_fish_mess++; } #endif } else { /* otherwise, send it home to die */ sendFish(origPE, origPE, (age + 1), NEW_FISH_HISTORY, RtsFlags.ParFlags.fishHunger) ; // NEW_FISH_HUNGER); #if 0 && defined(PAR_TICKY) // Global statistics: count no. of fetches if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.tot_fish_mess++; } #endif } } } /* processFish */ // ----------------------------------------------------------------------------- /* * offLoadSpark is very similar to processFish. Difference is that we * randomly pick a PE to offload this spark to. */ //@cindex offLoadSpark void offLoadSpark(Capability *cap) { nat toPE; StgClosure *spark; static rtsPackBuffer *packBuffer; toPE = choosePE(); ASSERT(LOOKS_LIKE_PE(toPE)); /* search for a spark to off-load */ // GUM4: if ((spark = findSpark(cap /*, rtsTrue for_export??*/)) != NULL) { if ((spark = tryStealSpark(cap)) != NULL) { nat size; OpCode unused_tag; packBuffer = gumPackBuffer; ASSERT(closure_SHOULD_SPARK((StgClosure *)spark)); // add event to profile /* if (RtsFlags.ParFlags.ParStats.Sparks) DumpRawGranEvent(thisPE, toPE, SP_EXPORTED, ((StgTSO *)NULL), spark, 0, /*spark_queue_len(cap));*/ if ((packBuffer = PackNearbyGraph(spark, END_TSO_QUEUE, &unused_tag)) == NULL) { IF_PAR_DEBUG(fish, debugBelch("$$ GC while trying to offload spark via PackNearbyGraph of node %p\n", (StgClosure *)spark)); barf("offLoadSpark: out of heap while packing graph; ToDo: call GC here\n"); // GarbageCollect(GetRoots, rtsFalse); /* Now go back and try again */ } else { IF_PAR_DEBUG(verbose, if (RtsFlags.ParFlags.ParStats.Sparks) debugBelch("==== STEALING spark [%u]; sending to %u\n", spark, toPE)); IF_PAR_DEBUG(fish, debugBelch("$$-- Offloading Spark to [%u] by sending graph @ %p (%s) [sparkq_len=??]\n", toPE, (StgClosure *)spark, info_type((StgClosure *)spark) /*,spark_queue_len(&(MainRegTable.rSparks))*/)); sendSchedule(toPE, PACK_BUFFER_PAYLOAD_SIZE(packBuffer), packBuffer); // disposeSpark(spark); -- NOOOOO; findSpark automatically removes spark from the pool #if defined(PAR_TICKY) // Global statistics: count no. of fetches if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.tot_schedule_mess++; } #endif } } } /* * processFetch either returns the requested data (if available) * or blocks the remote blocking queue on a black hole (if not). * ga ... the closure being fetched * rga ... the FM requesting the data */ // FIXME: this needs a recvBuffer argument, supplied by MP_recv (see call in Schedule.c and processFish for an pattern of process* code //@cindex processFetch void processFetch (long *recvBuffer , Capability *cap ) { globalAddr ga, rga; int load; StgClosure *closure; StgInfoTable *ip; StgTSO *t; #if defined(PAR_TICKY) // Global statistics: received fetches if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.rec_fetch_mess++; totFETCH++; } #endif unpackFetch(recvBuffer,&ga, &rga, &load); ASSERT(LOOKS_LIKE_GA(&ga)); ASSERT(LOOKS_LIKE_GA(&rga)); ASSERT(ga.pe==thisPE); IF_PAR_DEBUG(fetch, // mpcomm, { char str1[MAX_GA_STR_LEN]; char str2[MAX_GA_STR_LEN]; showGA(&ga, str1); showGA(&rga, str2); debugBelch("%%__ Received Fetch for %s, Resume %s (load %d) from %u\n", str1, str2, load, rga.pe);};); closure = UNTAG_CLOSURE(GALAlookup(&ga)); ASSERT(closure != (StgClosure *)NULL); ASSERT(UNTAG_CHECK(closure)); ip = get_itbl(closure); if (ip->type == FETCH_ME && rga.pe!=thisPE) { // HWL OPT: could do this shorten FM chains: IF_PAR_DEBUG(fetch, // mpcomm, { char str1[MAX_GA_STR_LEN]; char str2[MAX_GA_STR_LEN]; char str3[MAX_GA_STR_LEN]; showGA(&ga, str1); showGA(&rga, str2); showGA(((StgFetchMe *)closure)->ga, str3); debugBelch("%%__ Target of a Fetch for %s is FETCH_ME for %s (demanded by %s); forwarding the Fetch to %u\n", str1, str3, str2, ((StgFetchMe *)closure)->ga->pe);};); /* Forward the Fetch to someone else */ sendFetch(((StgFetchMe *)closure)->ga, &rga, load); #if defined(PAR_TICKY) // Global statistics: count no. of fetches if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.tot_fetch_mess++; } #endif } else if (rga.pe == thisPE) { // HWL OPT: if demander is local, update the FM with an IND to the local closure /* Our own FETCH forwarded back around to us */ #if defined(DEBUG) IF_PAR_DEBUG(mpcomm, { char str1[MAX_GA_STR_LEN]; char str2[MAX_GA_STR_LEN]; char str3[MAX_GA_STR_LEN]; showGA(&ga, str1); showGA(&rga, str2); debugBelch("%%%%== Fetch cycle: Fetch for %s (demanded by %s) returned to sending PE [%u]; closure=%p (%s)\n", str1, str2, thisPE, closure, info_type(closure));};); #endif StgClosure * fm_closure = UNTAG_CLOSURE(GALAlookup(&rga)); // local address of the orig FM // HWL TODO: ASSERT(get_itbl(fm_closure)->type==FETCH_ME) if (!get_itbl(fm_closure)->type==FETCH_ME) { // || get_itbl(fm_closure)->type==IND); IF_PAR_DEBUG(mpcomm, { char str1[MAX_GA_STR_LEN]; char str2[MAX_GA_STR_LEN]; char str3[MAX_GA_STR_LEN]; showGA(&ga, str1); showGA(&rga, str2); debugBelch("**** WARNING: Fetch cycle: demanding closure %p (%s) (known as %s) is NOT a FETCH_ME on PE [%u]; closure=%p (%s)\n", fm_closure, info_type(fm_closure), str2, thisPE, closure, info_type(closure));};); } // HWL CHECK: is this guaranteed never to create a loop? if (fm_closure==closure) { // UNACCEPTABLE HWL HACK: closure must be different!!!!! debugBelch("**** WARNING: Trying to update a closure with itself as result of Fetch cycle: fm_closure=%p (%s), closure=%p (%s)\n", fm_closure, info_type(fm_closure), closure, info_type(closure)); } else { SORT_OF_SAFE_UPD_IND(fm_closure, closure); // update it with an IND to the local data } // HWL CHECK: are the tags ok? // HWL CHECK: is the code below still necessary? /* We may have already discovered that the fetch target is our own. if ((StgClosure *)fmbq != closure) CommonUp((StgClosure *)fmbq, closure); (void) addWeight(&rga); */ } else if (IsBlackhole(closure)) { // fprintf(stderr, "\n F: BH \n "); #if defined(DUMMY_BLOCKED_FETCH) GALA *bf; // t = createGhostTSO(cap, closure, rga); bf = createBlockedFetch(cap, closure, rga); bf->next = blockedFetches; blockedFetches = bf; IF_PAR_DEBUG(fetch, { char str1[MAX_GA_STR_LEN]; char str2[MAX_GA_STR_LEN]; char str3[MAX_GA_STR_LEN]; showGA(&ga, str1); showGA(&rga, str2); showGA(&(bf->ga), str3); debugBelch("%%%%++ Blocking Fetch on %s @ %p (%s) , requested by %s; BF is %s @ %p", str1, closure, info_type(closure), str2, str3, bf);};); /* If we're hitting a BH or RBH or FMBQ we have to put a special form of TSO closure into the BQ in order to denote that when updating this node the result should be sent to the originator of this fetch message. In GUM4 we used a special BLOCKED_FETCH closure for this. */ // put it on the global blackhole_queue // ASSUMES: sched_mutex /* WAS: setTSOLink (cap, t, ghost_TSO_queue); ghost_TSO_queue = t; t->link = blackhole_queue; blackhole_queue = t; */ #else # error "should compile the PARALLEL_RTS with DUMMY_BLOCKED_FETCH" // ngoq ngo'!! /* This includes RBH's and FMBQ's */ StgBlockedFetch *bf = (StgBlockedFetch *)createBlockedFetch(ga, rga); IF_PAR_DEBUG(mpcomm, debugBelch("%%%%++ Blocking Fetch ((%u, %d, %x)) on %p (%s)", rga.pe, rga.slot, rga.weight, closure, info_type(closure))); /* Can we assert something on the remote GA? */ // ASSERT(GALAlookup(&rga) == NULL); /* If we're hitting a BH or RBH or FMBQ we have to put a BLOCKED_FETCH closure into the BQ in order to denote that when updating this node the result should be sent to the originator of this fetch message. */ IF_PAR_DEBUG(fetch, debugBelch("%%%%++ Blocking Fetch ((%u, %d, %x)) on %p (%s)", rga.pe, rga.slot, rga.weight, closure, info_type(closure))); // put it on the global blackhole_queue // ASSUMES: sched_mutex bf->link = blackhole_queue; blackhole_queue = bf; #endif /* IF_PAR_DEBUG(mpcomm, */ /* debugBelch("##++ processFetch: after block the global BHQ is:"); */ /* print_bq()); */ } else { /* The target of the FetchMe is some local graph */ nat size; OpCode unused_tag; // StgClosure *graph; rtsPackBuffer *buffer = (rtsPackBuffer *)NULL; //fprintf(stderr, "\n F: main case \n "); //fprintf(stderr,"%%%%== Fetch returned to sending PE; closure=%p (%s)\n", // closure, info_type(closure)); #if defined(DEBUG) if (rga.pe == thisPE) { // TODO: this should be handled by the previous branch (disabled for now) IF_PAR_DEBUG(mpcomm, debugBelch("%%%%== Fetch returned to sending PE; closure=%p (%s)\n", closure, info_type(closure))); debugBelch("WARNING: cycle in fetching remote work: closure %p (%s) is a FETCH_ME to its own PE [%u]\n", closure, info_type(closure), thisPE); // for now, send a resume anyway } #endif if ((buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &unused_tag)) == NULL) { barf("processFetch: out of heap while packing graph; ToDo: call GC here"); // GarbageCollect(GetRoots, rtsFalse); // closure = GALAlookup(&ga); // buffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, rga.pe); // ASSERT(buffer != (rtsPackBuffer *)NULL); } sendResume(&rga, PACK_BUFFER_PAYLOAD_SIZE(buffer), buffer); #if defined(PAR_TICKY) // Global statistics: count no. of fetches if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.tot_resume_mess++; } #endif } } /* * processFree unpacks a FREE message and adds the weights to our GAs. */ // FIXME: this needs a recvBuffer argument, supplied by MP_recv (see call in Schedule.c and processFish for an pattern of process* code //@cindex processFree void processFree(long/*rtsPackBuffer*/ *recvBuffer, Capability *cap) { int nelem; static StgWord *buffer; int i; globalAddr ga; buffer = (StgWord *)gumPackBuffer; unpackFree(recvBuffer ,&nelem, buffer); IF_PAR_DEBUG(mpcomm, debugBelch("!!__ Rcvd Free (%d GAs)\n", nelem / 2)); ga.pe = thisPE; for (i = 0; i < nelem;) { ga.weight = (rtsWeight) buffer[i++]; ga.slot = (int) buffer[i++]; /*IF_PAR_DEBUG(mpcomm, { char str[MAX_GA_STR_LEN]; showGA(&ga, str); debugBelch("!!-- Processing free for %s\n", str);};);*/ (void) addWeight(&ga); } } /* * processResume unpacks a RESUME message into the graph, filling in * the LA -> GA, and GA -> LA tables. Threads blocked on the original * FetchMe (now a blocking queue) are awakened, and the blocking queue * is converted into an indirection. Finally it sends an ACK in response * which contains any newly allocated GAs. */ // FIXME: this needs a recvBuffer argument, supplied by MP_recv (see call in Schedule.c and processFish for an pattern of process* code //@cindex processResume void processResume(Capability *cap, nat sender, long/*rtsPackBuffer*/ *recvBuffer) { int nelem; nat nGAs; static long/*rtsPackBuffer*/ *packBuffer; StgClosure *newGraphT, *old; globalAddr lga; globalAddr *gagamap; #if defined(DEBUG) StgInfoTable *info; nat size, ptrs, nonptrs, vhs; #endif ASSERT(LOOKS_LIKE_PE(sender)); packBuffer = (long/*rtsPackBuffer*/ *)gumPackBuffer; unpackResume(recvBuffer,&lga, &nelem, packBuffer); ASSERT(LOOKS_LIKE_GA(&lga)); IF_PAR_DEBUG(fetch, { char str[MAX_GA_STR_LEN]; showGA(&lga, str); debugBelch("[]__ Rcvd Resume for %s \n", str);};); #if defined(PAR_TICKY) // Global statistics: recieved resumes if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.rec_resume_mess++; } #endif /* IF_PAR_DEBUG(packet, PrintPacket((rtsPackBuffer *)packBuffer)); */ /* * We always unpack the incoming graph, even if we've received the * requested node in some other data packet (and already awakened * the blocking queue). if (SAVE_Hp + packBuffer[0] >= SAVE_HpLim) { ReallyPerformThreadGC(packBuffer[0], rtsFalse); SAVE_Hp -= packBuffer[0]; } */ // ToDo: Check for GC here !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! /* Do this *after* GC; we don't want to release the object early! */ if (lga.weight > 0) // TODO: check if this should be: > MIN_GA_WEIGHT (void) addWeight(&lga); old = UNTAG_CLOSURE(GALAlookup(&lga)); #if defined(DEBUG) /* only needed for sanity checks below */ info = get_closure_info(old, &size, &ptrs, &nonptrs, &vhs); if (!(get_itbl(old)->type == FETCH_ME || get_itbl(old)->type == RBH)) { /* ToDo: Check whether this can be made an assertion */ IF_DEBUG(sanity, { char str[MAX_GA_STR_LEN]; showGA(&lga, str); debugBelch("[]** non-FETCH_ME-ish closure when receiving resume: %p (%s) GA %s\n", old, info_type_by_ip(info), str);};); } #endif if (RtsFlags.ParFlags.ParStats.Full) { // StgBlockingQueueElement *bqe, *last_bqe; IF_PAR_DEBUG(fetch, debugBelch("[]-- Resume is REPLY to closure %p\n", old)); /* NB: now that all blocked TSOs are on the BHQ (rather than on a Q rooted at the FM closure) we can't dump a reply event here, since we don't don't know the TSOs blocked on it directly. Instead we dump an event when traversing the BHQ, finding a TSO that's BlockedOnFetch and now has data in the closure_info field -- HWL GUM6 */ // HACK: reply for a dummy TSO; best change .gr protocol and drop replies /* if (RtsFlags.ParFlags.ParStats.Full) DumpRawGranEvent(thisPE, sender, GR_REPLY, 0, old, 0, spark_queue_len(cap));*/ /* Write REPLY events to the log file, indicating that the remote data has arrived NB: we emit a REPLY only for the *last* elem in the queue; this is the one that triggered the fetch message; all other entries have just added themselves to the queue, waiting for the data they know that has been requested (see entry code for FETCH_ME_BQ) if ((get_itbl(old)->type == FETCH_ME_BQ || get_itbl(old)->type == RBH)) { for (bqe = ((StgFetchMeBlockingQueue *)old)->blocking_queue, last_bqe = END_BQ_QUEUE; get_itbl(bqe)->type==TSO || get_itbl(bqe)->type==BLOCKED_FETCH; last_bqe = bqe, bqe = bqe->link) { nothing } ASSERT(last_bqe==END_BQ_QUEUE || get_itbl((StgClosure *)last_bqe)->type == TSO); // last_bqe now points to the TSO that triggered the FETCH if (get_itbl((StgClosure *)last_bqe)->type == TSO) DumpRawGranEvent(thisPE, taskIDtoPE(sender), GR_REPLY, ((StgTSO *)last_bqe), ((StgTSO *)last_bqe)->block_info.closure, 0, spark_queue_len(&(MainRegTable.rSparks))); } */ } newGraphT = UnpackGraph(packBuffer, &gagamap, &nGAs, NoPort/* UNUSED in GUM-style comm */, cap); // tagged! ASSERT(newGraphT != NULL); ASSERT(LOOKS_LIKE_CLOSURE_PTR(UNTAG_CLOSURE(newGraphT))); /* * Sometimes, unpacking will common up the resumee with the * incoming graph, but if it hasn't, we'd better do so now. */ if (get_itbl(old)->type == FETCH_ME) // UnpackGraph may have turned it into an IND already SORT_OF_SAFE_UPD_IND(old, UNTAG_CLOSURE(newGraphT)); // HWL TODO: add tag to the indirection IF_PAR_DEBUG(fetch, { if (get_itbl(old)->type == IND_OLDGEN) { debugBelch("[]** Warning: found IND_OLDGEN in old closure @ %p, pointing to unpacked graph @ %p\n", old, UNTAG_CLOSURE(newGraphT)); } }); IF_PAR_DEBUG(fetch, debugBelch("[]-- Ready to resume unpacked graph at %p (%s)\n", UNTAG_CLOSURE(newGraphT), info_type(UNTAG_CLOSURE(newGraphT)))); IF_PAR_DEBUG(pack, IF_PAR_DEBUG(packet, debugBelch("** PrintGraph of %p is:", UNTAG_CLOSURE(newGraphT)); PrintGraph(UNTAG_CLOSURE(newGraphT),0))); IF_DEBUG(sanity, check_IND_cycle(UNTAG_CLOSURE(newGraphT))); IF_PAR_DEBUG(tables, DebugPrintGAGAMap(gagamap, nGAs)); #if 0 IF_DEBUG(sanity, { StgTSO *tso; for (tso = blackhole_queue; tso != END_TSO_QUEUE; tso = tso->_link) { if (old==tso->block_info.closure) { debugBelch("%%%% TSO %d was blocked on OLD closure %p; expect it to be re-awoken", tso->id, old); if (get_itbl(UNTAG(tso->block_info.closure))->type==FETCH_ME) { char str[MAX_GA_STR_LEN]; showGA(((StgFetchMe *)(UNTAG(tso->block_info.closure)))->ga, str); debugBelch("%%%% was blocked on FETCH_ME to GA %s ;", str); } } } };); #endif sendAck(sender, nGAs, gagamap); } /* * processSchedule unpacks a SCHEDULE message into the graph, filling * in the LA -> GA, and GA -> LA tables. The root of the graph is added to * the local spark queue. Finally it sends an ACK in response * which contains any newly allocated GAs. */ // FIXME: this needs a recvBuffer argument, supplied by MP_recv (see call in Schedule.c and processFish for an pattern of process* code //@cindex processSchedule void processSchedule(Capability *cap, nat sender, long/*rtsPackBuffer*/ *recvBuffer) { nat nelem, nGAs; StgInt success; static rtsPackBuffer *packBuffer; StgClosure *newGraph; globalAddr *gagamap; packBuffer = gumPackBuffer; /* HWL */ unpackSchedule(recvBuffer, &nelem, packBuffer); IF_PAR_DEBUG(fetch, // mpcomm, debugBelch("--__ Rcvd Schedule (%d elems)\n", nelem)); /* IF_PAR_DEBUG(packet, PrintPacket(packBuffer)); */ #if defined(PAR_TICKY) // Global statistics: received schedules if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.rec_schedule_mess++; totSCHEDULE++; } #endif /* * For now, the graph is a closure to be sparked as an advisory * spark, but in future it may be a complete spark with * required/advisory status, priority etc. */ /* space_required = packBuffer[0]; if (SAVE_Hp + space_required >= SAVE_HpLim) { ReallyPerformThreadGC(space_required, rtsFalse); SAVE_Hp -= space_required; }*/ // ToDo: check whether GC is necessary !!!!!!!!!!!!!!!!!!!!! newGraph = UnpackGraph((rtsPackBuffer*) packBuffer, &gagamap, &nGAs, NoPort/* UNUSED in GUM-style comm */, cap); ASSERT(newGraph != NULL); ASSERT(LOOKS_LIKE_CLOSURE_PTR(newGraph)); IF_DEBUG(sanity, checkGAGAMap(gagamap, nGAs)); success = newImportedSpark(cap, newGraph); // GUM4: success = add_to_spark_queue(newGraph, &(cap->r.rSparks)); /* TODO: dump an event here */ IF_PAR_DEBUG(fetch, if (success) debugBelch("--^^ added spark to unpacked graph %p (%s)\n", newGraph, info_type(newGraph) /*, spark_queue_len(&(MainRegTable.rSparks)), mytid*/); else debugBelch("--^^ received non-sparkable closure %p (%s); nothing added to spark pool\n", newGraph, info_type(newGraph)/*, spark_queue_len(&(MainRegTable.rSparks)), mytid*/)); IF_PAR_DEBUG(packet, debugBelch("*< Unpacked graph with root at %p (%s):\n", newGraph, info_type(newGraph))); IF_PAR_DEBUG(tables, DebugPrintGAGAMap(gagamap, nGAs)); sendAck(sender, nGAs, gagamap); //fishing = rtsFalse; //ASSERT(outstandingFishes>0); TODO: add assertion again!! outstandingFishes--; } /* * processAck unpacks an ACK, and uses the GAGA map to convert RBH's * (which represent shared thunks that have been shipped) into fetch-mes * to remote GAs. */ // FIXME: this needs a recvBuffer argument, supplied by MP_recv (see call in Schedule.c and processFish for an pattern of process* code //@cindex processAck void processAck(Capability *cap , long/*rtsPackBuffer*/ *recvBuffer) { nat nGAs; globalAddr *gaga; globalAddr *gagamap; // HWL CHECK: where is this buffer freed again? #warning "wasteful handling of gagamap" gagamap = (globalAddr *) stgMallocBytes(RtsFlags.ParFlags.packBufferSize, "gagamap"); // HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK HACK unpackAck(recvBuffer,&nGAs, gagamap); IF_PAR_DEBUG(fetch, //tables, debugBelch(",,,, Rcvd Ack (%d pairs)\n", nGAs); DebugPrintGAGAMap(gagamap, nGAs)); IF_DEBUG(sanity, checkGAGAMap(gagamap, nGAs)); #if defined(PAR_TICKY) // Global statistics: received acks if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.rec_ack_mess++; totACK++; } #endif /* * For each (oldGA, newGA) pair, set the GA of the corresponding * thunk to the newGA, convert the thunk to a FetchMe, and return * the weight from the oldGA. */ for (gaga = gagamap; gaga < gagamap + nGAs * 2; gaga += 2) { StgClosure *old_closure = GALAlookup(gaga); // TAGGED StgClosure *new_closure = GALAlookup(gaga + 1); // TAGGED ASSERT(old_closure != NULL); if (new_closure == NULL) { /* We don't have this closure, so we make a fetchme for it */ /*IF_PAR_DEBUG(weight, { char str[MAX_GA_STR_LEN]; showGA(gaga+1, str); debugBelch(" Ack: setting remote GA of closure %p (%s) to %s\n", old_closure, info_type(old_closure), str);};);*/ globalAddr *ga = setRemoteGA(old_closure, gaga + 1, rtsFalse /* WAS: rtsTrue; HWL TODO: check whether rtsTrue is safe */); /* convertToFetchMe should be done unconditionally here. Currently, we assign GAs to CONSTRs, too, (a bit of a hack), so we have to check whether it is an RBH before converting ASSERT(get_itbl(old_closure)==RBH); */ if (get_itbl(UNTAG_CLOSURE(old_closure))->type==RBH) convertToFetchMe((StgRBH *)old_closure, ga); } else { /* * Oops...we've got this one already; update the RBH to * point to the object we already know about, whatever it * happens to be. */ ASSERT(old_closure != (StgClosure *)NULL && new_closure != (StgClosure *)NULL); ASSERT(old_closure != new_closure); if (old_closure == new_closure) { belch("@@@@ WARNING: processAck: old_closure == new_closure: %s @ %p", info_type(UNTAG_CLOSURE(old_closure)), old_closure); } ASSERT(IsBlackhole((UNTAG_CLOSURE(old_closure)))); // we can update only these closures SORT_OF_SAFE_UPD_IND(old_closure, new_closure); // WAS: CommonUp(old_closure, new_closure); /* * Increase the weight of the object by the amount just * received in the second part of the ACK pair. */ (void) addWeight(gaga + 1); } (void) addWeight(gaga); } #if 0 /* check the sanity of the LAGA and GALA tables after mincing them */ IF_DEBUG(sanity, checkLAGAtable(rtsFalse)); #endif } static void processUnexpected(StgInt p) { debugBelch("PANIC: unexpected packet at %p\n", &p); exit(1); // ToDo: proper shutdown } /* * waitForTermination enters a loop ignoring spurious messages while * waiting for the termination sequence to be completed. */ //@cindex waitForTermination void waitForTermination(void) { do { IF_PAR_DEBUG(mpcomm, debugBelch(" waitForTermination... \n")); int p = GetPacket(); processUnexpected(p); } while (rtsTrue); } //@cindex sendFreeMessages void sendFreeMessages(void) { nat i; for (i = 0; i < nPEs; i++) if (freeMsgIndex[i] > 0) sendFree(i, freeMsgIndex[i], freeMsgBuffer[i]); // allPEs is hidden in PVM/MPIComms.c } void freeRemoteGA(int pe, globalAddr *ga) { nat i; ASSERT(GALAlookup(ga) == NULL); if ((i = freeMsgIndex[pe]) + 2 >= RtsFlags.ParFlags.packBufferSize) { IF_PAR_DEBUG(mpcomm, debugBelch("!! Filled a free message buffer (sending remaining messages indivisually)")); sendFree(ga->pe, i, freeMsgBuffer[pe]); i = 0; } freeMsgBuffer[pe][i++] = (rtsWeight) ga->weight; freeMsgBuffer[pe][i++] = (StgWord) ga->slot; freeMsgIndex[pe] = i; IF_DEBUG(sanity, ga->weight = 0xdead0add; ga->pe = 0xbbbbbbbb; ga->slot = 0xbbbbbbbb;); } /* Do we use sendMsg in GUM AAZ */ /* void sendMsg(OpCode tag, */ /* Port sendport, Port recvport, */ /* rtsPackBuffer* dataBuffer) */ /* { fprintf(stderr,"Unimplemented HLComms function sendMsg\n"); exit(1); }; */ /* * Allocate space for message processing */ //@cindex gumPackBuffer static rtsPackBuffer *gumPackBuffer; /* * Allocate space for message processing */ //@cindex initMoreBuffers rtsBool initMoreBuffers(void) { // Start value (for RR) and random-generator seed used to select // rFork/fish targets (lives in HLComms.c) //srand48(msTime()); // JB Hack: changed from time(NULL) * getpid() to msTime targetPE = (thisPE == nPEs) ? 1 : (thisPE + 1); // pack buffers for the parallel system always work in units of // StgWord. // TODO: Packing/sending assumes sizeof(long) == sizeof(StgWord) gumPackBuffer = (rtsPackBuffer *) stgMallocBytes(sizeof(StgWord) * PACK_BUFFER_HDR_SIZEW + // in words RtsFlags.ParFlags.packBufferSize, // given in bytes "initMoreBuffers"); return (gumPackBuffer != NULL); } #if 0 { if ((gumPackBuffer = (rtsPackBuffer *)stgMallocBytes(sizeof(StgWord)*RtsFlags.ParFlags.packBufferSize+1/*dbging*/, "initMoreBuffers")) == NULL) return rtsFalse; return rtsTrue; } #endif # ifdef DEBUG //@cindex DebugPrintGAGAMap void DebugPrintGAGAMap(globalAddr *gagamap, int nGAs) { nat i; for (i = 0; i < nGAs; ++i, gagamap += 2) debugBelch0("__ gagamap[%d] = ((%u, %d, %x)) -> ((%u, %d, %x))\n", i, gagamap[0].pe, gagamap[0].slot, gagamap[0].weight, gagamap[1].pe, gagamap[1].slot, gagamap[1].weight); } //@cindex checkGAGAMap void checkGAGAMap(globalAddr *gagamap, int nGAs) { nat i; ASSERT(nGAstype==TSO && isGhostTSO(tso)); /* store link (we might overwrite it via blockFetch later on */ next = tso->_link; /* * Find the target at the end of the indirection chain, and * process it in much the same fashion as the original target * of the fetch. Though we hope to find graph here, we could * find a black hole (of any flavor) or even a FetchMe. */ // TODO: check whether tso->block_info.closure is tagged closure = UNTAG_CLOSURE(tso->block_info.closure); /* We evacuate BQs and update the node fields where necessary in GC.c So, if we find an EVACUATED closure, something has gone Very Wrong (and therefore we let the RTS crash most ungracefully). */ ASSERT(!IS_FORWARDING_PTR(closure)); // closure = ((StgEvacuated *)closure)->evacuee; ASSERT(UNTAG_CHECK(closure)); closure = UNTAG_CLOSURE(UNWIND_IND(closure)); //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; } ip = get_itbl(closure); if (ip->type == FETCH_ME) { /* Forward the Fetch to someone else */ debugBelch("processPendingFetches: abusing blocked_exceptions field of TSO for GA in GhostTSOs!"); /* THIS IS UNACCEPTABLE, since it abuses fields in a TSO; TODO: recode */ rga.pe = ((globalAddr*)tso->blocked_exceptions)->pe; rga.slot = ((globalAddr*)tso->blocked_exceptions)->slot; rga.weight = ((globalAddr*)tso->blocked_exceptions)->weight; /* we are about to send off a FETCH message, so dump a FETCH event */ /*DumpRawGranEvent(thisPE, rga.pe, GR_FETCH, tso, tso->block_info.closure, 0, 0 spark_queue_len(cap));*/ sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */); // explicitly mark this Ghost TSO as garbage #if 0 // hardwired GC for Ghost TSOs; shouldn't be necessary!!!! tso->why_blocked==0x99; IF_PAR_DEBUG(paranoid, debugBelch("__-> processPendingFetches: marking served Ghost TSO %d (%p) as 0x99 (GARBAGE)\n", tso->id, tso)); #endif #if defined(PAR_TICKY) // Global statistics: count no. of fetches if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.tot_fetch_mess++; } #endif IF_PAR_DEBUG(fetch, debugBelch("__-> processPendingFetches: Forwarding fetch from %lx to %lx\n", thisPE, rga.pe)); } else if (IsBlackhole(closure)) { /* put it back on BH Q; shouldn't happen, since no graph-red btw checkBHs and here*/ IF_PAR_DEBUG(fetch, debugBelch("__++ processPendingFetches: trying to send a BLACK_HOLE on closure $p (%s) => putting Ghost TSO %d (%p) back on the global BHQ\n", closure, info_type(closure), tso->id, tso)); setTSOLink (cap, tso, ghost_TSO_queue); ghost_TSO_queue = tso; /* WAS, but MUST use the above fct now (see comment in includes/TSO.h tso->link = blackhole_queue; blackhole_queue = tso; */ } else { /* We now have some local graph to send back */ OpCode unused_tag; packBuffer = gumPackBuffer; IF_PAR_DEBUG(fetch, debugBelch("__*> processPendingFetches: PackNearbyGraph of closure %p (%s)\n", closure, info_type(closure))); if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &unused_tag)) == NULL) { // Put current BF back on list //bf->link = (StgBlockingQueueElement *)PendingFetches; //PendingFetches = (StgBlockedFetch *)bf; // ToDo: check that nothing more has to be done to prepare for GC! barf("PANIC: processPendingFetches: out of heap while packing graph; ToDo: call GC here\n"); //GarbageCollect(GetRoots, rtsFalse); //bf = PendingFetches; //PendingFetches = (StgBlockedFetch *)(bf->link); //closure = bf->node; //packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size, bf->ga.pe); //ASSERT(packBuffer != (rtsPackBuffer *)NULL); } debugBelch("processPendingFetches: abusing blocked_exceptions field of TSO for GA in GhostTSOs!"); /* THIS IS UNACCEPTABLE, since it abuses fields in a TSO; TODO: recode */ rga.pe = ((globalAddr*)tso->blocked_exceptions)->pe; rga.slot = ((globalAddr*)tso->blocked_exceptions)->slot; rga.weight = ((globalAddr*)tso->blocked_exceptions)->weight; sendResume(&rga, PACK_BUFFER_PAYLOAD_SIZE(packBuffer), packBuffer); // explicitly mark this Ghost TSO as garbage // tso->why_blocked==0x99; IF_PAR_DEBUG(fetch, debugBelch("__-> processPendingFetches: marking served Ghost TSO %d (%p) as 0x99 (GARBAGE)\n", tso->id, tso)); #if defined(PAR_TICKY) // Global statistics: count no. of fetches if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.tot_resume_mess++; } #endif } } PendingFetches = END_TSO_QUEUE; } /* The list of pending fetches must be a root-list for GC. This routine is called from GC.c (same as marking GAs etc). */ void markPendingFetches(evac_fn evac, rtsBool major_gc) { StgTSO **bf, *tso; /* We traverse the entire list, because we don't rely on the TSO scavenge code to go down the link field. */ for (bf = &PendingFetches; *bf != END_TSO_QUEUE; bf = &((*bf)->_link)) { // check whether it has been evacuated already? // if (!IS_FORWARDING_PTR((StgClosure*)bf))) { // (get_itbl(*bf)->type != EVACUATED) { // don't evac twice tso = *bf; ASSERT(get_itbl((StgClosure *)tso)->type == TSO); evac((void*)NULL/*STG_UNUSED*/, bf); // evac(bf); // Updates PendingFetches IF_PAR_DEBUG(fetch, debugBelch("@@@@ PendingFetches: evaced Ghost TSO %d (%s) from %p to %p (%s)\n", tso->id, info_type(tso), tso, *bf, info_type(*bf))); } } //@cindex rebuildPendingFetches void rebuildPendingFetches(void) { StgTSO **bf, *tso; for (bf = &PendingFetches; (*bf != END_TSO_QUEUE ) ; ) { /* the PendingFetches list contains only Ghost TSOs */ tso = (StgTSO *) isAlive((StgClosure *)*bf); //UNWIND_IND(((StgEvacuated *)bf)->evacuee); //fprintf(stderr," fixed EVACUATED closure ptr (%s)(%s) \n",info_type(*bf),info_type(tso) ) ; if (tso != NULL) { IF_PAR_DEBUG(tables, debugBelch(" Ghost TSO %d in PendingFetches list: %p (%s) (old ptr %p (%s))\n", tso->id, tso, info_type(tso), *bf, info_type(*bf))); *bf = tso; bf = &((*bf)->_link); if (IS_FORWARDING_PTR((StgClosure*)tso->block_info.closure)) { // (get_itbl((StgClosure *)tso->block_info.closure)->type == EVACUATED) { StgClosure *c = tso->block_info.closure; //debugging tso->block_info.closure = (StgClosure *) isAlive(((StgClosure *)tso->block_info.closure)); /* isAlive should perform the loop below: while (get_itbl(tso->block_info.closure)->type == EVACUATED) { tso->block_info.closure = (StgClosure *) isAlive(((StgClosure *)tso->block_info.closure)); } */ if (tso->block_info.closure == NULL) { // something is VERY wrong IF_PAR_DEBUG(tables, debugBelch("**** PANIC: rebuildPendingFetches: the closure on which Ghost TSO %d is blocked has become garbage (was %p (%s) before GC)\n", tso->id, c, info_type(c))); } else { ASSERT(!IS_FORWARDING_PTR((StgClosure*)tso->block_info.closure)); IF_PAR_DEBUG(tables, debugBelch(" fixed EVACUATED closure ptr in Ghost TSO %d from %p (%s) to %p (%s)\n", tso->id, c, info_type(c), tso->block_info.closure, info_type(tso->block_info.closure))); } } } else { *bf = (*bf)->_link; } } } //@cindex checkPendingFetches void checkPendingFetches(rtsBool in_gc) { StgTSO *bf, *next; for (bf = PendingFetches; bf != END_TSO_QUEUE; bf=next) { /* the PendingFetches list contains only Ghost TSOs */ if (in_gc) { /* if we are in GC, check that all TSOs have been evacuated */ /* ASSERT(get_itbl(bf)->type==EVACUATED && */ /* bf->why_blocked==BlockedOnFetch); */ } else { ASSERT(get_itbl(bf)->type==TSO && isGhostTSO(bf)); } /* the blocked_exceptions field must be a valid GA */ IF_DEBUG(sanity, ASSERT(LOOKS_LIKE_GA((globalAddr*)bf->blocked_exceptions))); /* the closure must not be a BH, otw the TSO would still be on the BHQ */ ASSERT(!IsBlackhole(UNTAG_CLOSURE(bf->block_info.closure))); /* store link (we might overwrite it via blockFetch later on */ next = bf->_link; } } #endif /* 0 */ #if 0 // this should now be done in MP_sync (PVM/MPI specific) -- HWL GUM6 static void processPeTids() { long newPE; nat i,sentPEs,currentPEs; currentPEs=nPEs; GetArgs(&sentPEs,1); ASSERT(sentPEs > currentPEs); ASSERT(sentPEs < MAX_PES); /* inforced by SysMan too*/ for (i = 0; i < sentPEs; i++) { GetArgs(&newPE,1); if(ilink) { ASSERT(get_itbl((StgClosure*)bf)->type==TSO); // || IS_FORWARDING_PTR((StgClosure*)bf)); } return n; } // obsolete! see processPendingFetches -- HWL GUM6 //@cindex processFetches void processFetches(void) { StgBlockedFetch *bf, *next; StgClosure *closure; StgInfoTable *ip; globalAddr rga; static rtsPackBuffer *packBuffer; IF_PAR_DEBUG(fetch, belch("____ processFetches: %d pending fetches (root @ %p)\n", pending_fetches_len(), PendingFetches)); for (bf = PendingFetches; bf != END_BF_QUEUE; bf=next) { /* the PendingFetches list contains only BLOCKED_FETCH closures */ ASSERT(get_itbl(bf)->type==BLOCKED_FETCH); /* store link (we might overwrite it via blockFetch later on */ next = (StgBlockedFetch *)(bf->link); /* * Find the target at the end of the indirection chain, and * process it in much the same fashion as the original target * of the fetch. Though we hope to find graph here, we could * find a black hole (of any flavor) or even a FetchMe. */ closure = bf->node; /* We evacuate BQs and update the node fields where necessary in GC.c So, if we find an EVACUATED closure, something has gone Very Wrong (and therefore we let the RTS crash most ungracefully). */ ASSERT(get_itbl(closure)->type != EVACUATED); // closure = ((StgEvacuated *)closure)->evacuee; ASSERT(UNTAG_CHECK(closure)); closure = UNWIND_IND_TGS(closure); //while ((ind = IS_INDIRECTION(closure)) != NULL) { closure = ind; } ip = get_itbl(closure); if (ip->type == FETCH_ME) { /* Forward the Fetch to someone else */ rga.pe = bf->ga.pe; rga.slot = bf->ga.slot; rga.weight = bf->ga.weight; sendFetch(((StgFetchMe *)closure)->ga, &rga, 0 /* load */); // Global statistics: count no. of fetches if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.tot_fetch_mess++; totFETCH++; } IF_PAR_DEBUG(fetch, belch("__-> processFetches: Forwarding fetch from %lx to %lx\n", mytid, rga.pe)); } else if (IS_BLACK_HOLE(closure)) { IF_PAR_DEBUG(fetch, belch("__++ processFetches: trying to send a BLACK_HOLE => doing a blockFetch on closure %p (%s)\n", closure, info_type(closure))); bf->node = closure; //blockFetch(bf, closure); } else { /* We now have some local graph to send back */ nat size; packBuffer = gumPackBuffer; IF_PAR_DEBUG(fetch, belch("__*> processFetches: PackNearbyGraph of closure %p (%s)\n", closure, info_type(closure))); if ((packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size)) == NULL) { // Put current BF back on list // bf->link = (StgBlockingQueueElement *)PendingFetches; PendingFetches = (StgBlockedFetch *)bf; // ToDo: check that nothing more has to be done to prepare for GC! barf("processFetches: out of heap while packing graph; ToDo: call GC here"); GarbageCollect(GetRoots, rtsFalse); bf = PendingFetches; PendingFetches = (StgBlockedFetch *)(bf->link); closure = bf->node; packBuffer = PackNearbyGraph(closure, END_TSO_QUEUE, &size); ASSERT(packBuffer != (rtsPackBuffer *)NULL); } rga.pe = bf->ga.pe; rga.slot = bf->ga.slot; rga.weight = bf->ga.weight; sendResume(&rga, size, packBuffer); // Global statistics: count no. of fetches if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.tot_resume_mess++; } } } PendingFetches = END_BF_QUEUE; } #endif /* 0 */ /* NB: blockFetch doesn't exist any more; it's inlined into processFetch Also, we now don't use BLOCKED_FETCH closures but dummy TSOs there -- HWL GUM6 NB: blockThread doesn't exist any more; it's inlined into checkBlackHoles in Schedule.c and if necessary a sendFetch is called there -- HWL GUM6 */ /* * allocate a BLOCKED_FETCH closure and fill it with the relevant fields * of the ga argument; called from processFetch when the local closure is * under evaluation */ #if 0 // !defined(DUMMY_BLOCKED_FETCH) StgClosure * createBlockedFetch (globalAddr ga, globalAddr rga) { StgBlockedFetch *bf; StgClosure *closure; closure = GALAlookup(&ga); if ((bf = (StgBlockedFetch *)allocate(HEADERSIZE + sizeofW(StgBlockedFetch))) == NULL) { barf("createBlockedFetch: out of heap while allocating heap for a BlocekdFetch; ToDo: call GC here"); //GarbageCollect(GetRoots, rtsFalse); //closure = GALAlookup(&ga); //bf = (StgBlockedFetch *)allocate(HEADERSIZE + sizeofW(StgBlockedFetch)); // ToDo: check whether really guaranteed to succeed 2nd time around } ASSERT(bf != (StgBlockedFetch *)NULL); // SET_INFO((StgClosure *)bf, &stg_BLOCKED_FETCH_info); // ToDo: check whether other header info is needed bf->node = closure; bf->ga.pe = rga.pe; bf->ga.slot = rga.slot; bf->ga.weight = rga.weight; // bf->link = NULL; debugging /* IF_PAR_DEBUG(fetch, { char str[MAX_GA_STR_LEN]; showGA(&(bf->ga),str); debugBelch("%%%%// created BF: bf=%p (%s) of closure , GA: %s", bf, info_type((StgClosure*)bf), str);};);*/ return (StgClosure *)bf; } #endif #endif /* PARALLEL_RTS -- whole file */ //@index //* FETCH:: @cindex\s-+FETCH //* sendFetch:: @cindex\s-+sendFetch //* unpackFetch:: @cindex\s-+unpackFetch //* RESUME:: @cindex\s-+RESUME //* sendResume:: @cindex\s-+sendResume //* unpackResume:: @cindex\s-+unpackResume //* ACK:: @cindex\s-+ACK //* sendAck:: @cindex\s-+sendAck //* unpackAck:: @cindex\s-+unpackAck //* FISH:: @cindex\s-+FISH //* sendFish:: @cindex\s-+sendFish //* unpackFish:: @cindex\s-+unpackFish //* FREE:: @cindex\s-+FREE //* sendFree:: @cindex\s-+sendFree //* unpackFree:: @cindex\s-+unpackFree //* SCHEDULE:: @cindex\s-+SCHEDULE //* sendSchedule:: @cindex\s-+sendSchedule //* unpackSchedule:: @cindex\s-+unpackSchedule //* freeMsgBuffer:: @cindex\s-+freeMsgBuffer //* freeMsgIndex:: @cindex\s-+freeMsgIndex //* prepareFreeMsgBuffers:: @cindex\s-+prepareFreeMsgBuffers //* processPendingFetches:: @cindex\s-+processPendingFetches //* offLoadSpark:: @cindex\s-+offLoadSpark //* processFetch:: @cindex\s-+processFetch //* processFree:: @cindex\s-+processFree //* processResume:: @cindex\s-+processResume //* processSchedule:: @cindex\s-+processSchedule //* processAck:: @cindex\s-+processAck //* createBlockedFetch:: @cindex\s-+createBlockedFetch //* waitForTermination:: @cindex\s-+waitForTermination //* sendFreeMessages:: @cindex\s-+sendFreeMessages //* gumPackBuffer:: @cindex\s-+gumPackBuffer //* initMoreBuffers:: @cindex\s-+initMoreBuffers //@end index