/* --------------------------------------------------------------------------- * * (c) The GHC Team, 2000-2008 * * Sparking support for PARALLEL_RTS and THREADED_RTS versions of the RTS. * -------------------------------------------------------------------------*/ #include "PosixSource.h" #include "Rts.h" #include "Schedule.h" #include "RtsUtils.h" #include "Trace.h" #include "Prelude.h" #include "Sparks.h" #if defined(PAR_TICKY) #include "ParallelRts.h" // globalParStats #endif #if defined(THREADED_RTS) || defined(PARALLEL_RTS) // FIXME // this is definitely the wrong way to do things; // should be done via RtsFlag (includes/Flags.h and rts/RtsFlags.c); // just to avoid total recompile for now unsigned int MAX_LOCAL_SPARKS_EVIL_CONSTANT = 4096; unsigned int maxLevel =3; void initSparkPools( void ) { /* walk over the capabilities, allocating a spark pool for each one */ /* should be the same in the parallel system */ nat i; for (i = 0; i < n_capabilities; i++) { capabilities[i].sparks = newWSDeque(MAX_LOCAL_SPARKS_EVIL_CONSTANT);// should be: RtsFlags.ParFlags.maxLocalSparks); } } void freeSparkPool (SparkPool *pool) { freeWSDeque(pool); } /* ----------------------------------------------------------------------------- * * Turn a spark into a real thread * * -------------------------------------------------------------------------- */ #if defined(PARALLEL_RTS) void createSparkThread (Capability *cap, Task *task) #else void createSparkThread (Capability *cap) #endif { StgTSO *tso; tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize, &base_GHCziConc_runSparks_closure); traceEventCreateSparkThread(cap, tso->id); #ifdef PARALLEL_RTS // tso->bound = task; IF_PAR_DEBUG(verbose, debugBelch("--** thread %lu created\n", (unsigned long)tso->id)); #endif appendToRunQueue(cap,tso); } /* -------------------------------------------------------------------------- * newSpark: create a new spark, as a result of calling "par" * Called directly from STG. * -------------------------------------------------------------------------- */ /* StgInt */ /* newSpark (StgRegTable *reg, StgClosure *p) */ StgInt newSpark (StgRegTable *reg, StgClosure *p) { Capability *cap = regTableToCapability(reg); SparkPool *pool = cap->sparks; StgSparkInfo *Sprk; /* I am not sure whether this is the right thing to do. * Maybe it is better to exploit the tag information * instead of throwing it away? */ StgSparkInfo *si= (StgSparkInfo *) stgMallocBytes(sizeof(StgSparkInfo) , "newsparkdata "); si->element = p; si->maxD = maxLevel; si->minD = 0; si->pars = 0; si->sz = 0; if (closure_SHOULD_SPARK(si->element)) { //ASSERT(IsStgSparkInfo(si)); pushWSDeque(pool,si); } cap->sparks_created++; return 0; } StgInt parGlobal (StgRegTable *reg, StgClosure *p , StgInt namI, StgInt granI, StgInt szI, StgInt parI) { Capability *cap = regTableToCapability(reg); SparkPool *pool = cap->sparks; StgSparkInfo *si= (StgSparkInfo *) stgMallocBytes(sizeof(StgSparkInfo) , "newsparkdata "); si->element = p; si->maxD = granI; si->minD = szI; si->pars = parI; si->sz = namI; if (closure_SHOULD_SPARK(si->element)) { // ASSERT(IsStgSparkInfo(si)); pushWSDeque(pool,si); } cap->sparks_created++; return 0; } StgInt parDist (StgRegTable *reg, StgClosure *p , StgInt minD, StgInt maxD) { Capability *cap = regTableToCapability(reg); SparkPool *pool = cap->sparks; //fprintf(stderr, " the total sparks of NamI = %d \n ",minD ); // fprintf(stderr, " the total sparks of GranI = %d \n ",maxD ); StgSparkInfo *si= (StgSparkInfo *) stgMallocBytes(sizeof(StgSparkInfo) , "newsparkdata "); // si->element = stgMallocBytes(4000,sizeof(StgClosurePtr), // "newStgSparkInfo:data space pardist function "); si->element = p; si->maxD = maxD; si->minD = minD; si->pars = 0; si->sz = 0; if (closure_SHOULD_SPARK(si->element)) { //ASSERT(IsStgSparkInfo(si)); pushWSDeque(pool,si); } cap->sparks_created++; return 0; } /* -------------------------------------------------------------------------- * newImportedSpark: same as newSpark, but referring to a graph structure * that has been imported from another PE (GUM-style). * Called from within the RTS. * -------------------------------------------------------------------------- */ StgInt newImportedSpark (Capability *cap, StgClosure *p) { SparkPool *pool = cap->sparks; /* I am not sure whether this is the right thing to do. * Maybe it is better to exploit the tag information * instead of throwing it away? */ StgSparkInfo *si= (StgSparkInfo *) stgMallocBytes(sizeof(StgSparkInfo) , "newsparkdata "); si->element = p; si->maxD = 1; si->minD = 0; si->pars = 0; si->sz = 0; if (closure_SHOULD_SPARK(p)) { // ASSERT(IsStgSparkInfo(si)); pushWSDeque(pool,si ); } #if defined(PAR_TICKY) if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { totFREE++; } #endif cap->sparks_created++; return 1; } /* ----------------------------------------------------------------------------- * * tryStealSpark: try to steal a spark from a Capability. * * Returns a valid spark, or NULL if the pool was empty, and can * occasionally return NULL if there was a race with another thread * stealing from the same pool. In this case, try again later. * -------------------------------------------------------------------------- */ StgClosure * tryStealSpark (Capability *cap, int pdist) { SparkPool *pool = cap->sparks; StgSparkInfo *stolen=NULL; //StgClosure *stolen ; do { stolen = stealWSDeque_(pool , pdist); // stolen =pop1WSDeque(pool , pdist); // use the no-loopy version, stealWSDeque_(), since if we get a // spurious NULL here the caller may want to try stealing from // other pools before trying again. //p = stolen->element } while (stolen != NULL && !closure_SHOULD_SPARK(stolen->element )); return stolen; } /* -------------------------------------------------------------------------- * Remove all sparks from the spark queues which should not spark any * more. Called after GC. We assume exclusive access to the structure * and replace all sparks in the queue, see explanation below. At exit, * the spark pool only contains sparkable closures. * -------------------------------------------------------------------------- */ void pruneSparkQueue (evac_fn evac, void *user, Capability *cap) { SparkPool *pool; StgClosurePtr spark, tmp, *elements; nat n, pruned_sparks; // stats only StgWord botInd,oldBotInd,currInd; // indices in array (always < size) const StgInfoTable *info; n = 0; pruned_sparks = 0; pool = cap->sparks; // it is possible that top > bottom, indicating an empty pool. We // fix that here; this is only necessary because the loop below // assumes it. if (pool->top > pool->bottom) pool->top = pool->bottom; // Take this opportunity to reset top/bottom modulo the size of // the array, to avoid overflow. This is only possible because no // stealing is happening during GC. pool->bottom -= pool->top & ~pool->moduloSize; pool->top &= pool->moduloSize; pool->topBound = pool->top; debugTrace(DEBUG_sparks, "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)", sparkPoolSize(pool), pool->bottom, pool->top); ASSERT_WSDEQUE_INVARIANTS(pool); elements = (StgClosurePtr *)pool->elements; /* We have exclusive access to the structure here, so we can reset bottom and top counters, and prune invalid sparks. Contents are copied in-place if they are valuable, otherwise discarded. The routine uses "real" indices t and b, starts by computing them as the modulus size of top and bottom, Copying: At the beginning, the pool structure can look like this: ( bottom % size >= top % size , no wrap-around) t b ___________***********_________________ or like this ( bottom % size < top % size, wrap-around ) b t ***********__________****************** As we need to remove useless sparks anyway, we make one pass between t and b, moving valuable content to b and subsequent cells (wrapping around when the size is reached). b t ***********OOO_______XX_X__X?********** ^____move?____/ After this movement, botInd becomes the new bottom, and old bottom becomes the new top index, both as indices in the array size range. */ // starting here currInd = (pool->top) & (pool->moduloSize); // mod // copies of evacuated closures go to space from botInd on // we keep oldBotInd to know when to stop oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod // on entry to loop, we are within the bounds ASSERT( currInd < pool->size && botInd < pool->size ); while (currInd != oldBotInd ) { /* must use != here, wrap-around at size subtle: loop not entered if queue empty */ /* check element at currInd. if valuable, evacuate and move to botInd, otherwise move on */ spark = elements[currInd]; // We have to be careful here: in the parallel GC, another // thread might evacuate this closure while we're looking at it, // so grab the info pointer just once. info = spark->header.info; if (IS_FORWARDING_PTR(info)) { tmp = (StgClosure*)UN_FORWARDING_PTR(info); /* if valuable work: shift inside the pool */ if (closure_SHOULD_SPARK(tmp)) { elements[botInd] = tmp; // keep entry (new address) botInd++; n++; } else { pruned_sparks++; // discard spark cap->sparks_pruned++; } } else { if (!(closure_flags[INFO_PTR_TO_STRUCT(info)->type] & _NS)) { elements[botInd] = spark; // keep entry (new address) evac (user, &elements[botInd]); botInd++; n++; } else { pruned_sparks++; // discard spark cap->sparks_pruned++; } } currInd++; // in the loop, we may reach the bounds, and instantly wrap around ASSERT( currInd <= pool->size && botInd <= pool->size ); if ( currInd == pool->size ) { currInd = 0; } if ( botInd == pool->size ) { botInd = 0; } } // while-loop over spark pool elements ASSERT(currInd == oldBotInd); pool->top = oldBotInd; // where we started writing pool->topBound = pool->top; pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); // first free place we did not use (corrected by wraparound) debugTrace(DEBUG_sparks, "pruned %d sparks", pruned_sparks); debugTrace(DEBUG_sparks, "new spark queue len=%ld; (hd=%ld; tl=%ld)", sparkPoolSize(pool), pool->bottom, pool->top); ASSERT_WSDEQUE_INVARIANTS(pool); } /* GC for the spark pool, called inside Capability.c for all capabilities in turn. Blindly "evac"s complete spark pool. */ void traverseSparkQueue (evac_fn evac, void *user, Capability *cap) { //StgClosure **sparkp; SparkPool *pool; StgSparkInfo **sparkp ; StgWord top,bottom, modMask; pool = cap->sparks; ASSERT_WSDEQUE_INVARIANTS(pool); top = pool->top; bottom = pool->bottom; sparkp = pool->elements; modMask = pool->moduloSize; while (top < bottom) { /* call evac for all closures in range (wrap-around via modulo) * In GHC-6.10, evac takes an additional 1st argument to hold a * GC-specific register, see rts/sm/GC.c::mark_root() */ StgSparkInfo **p = sparkp + (top & modMask); evac( user ,(StgClosure **)(*p) ); // &(p->element) ); top++; } #if defined(PAR_TICKY) if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { globalParStats.tot_sp += sparkPoolSize(pool); globalParStats.cnt_sp++; if (sparkPoolSize(pool) > globalParStats.res_sp) { globalParStats.res_sp = sparkPoolSize(pool); } if (sparkPoolSize(pool)==0) { globalParStats.emp_sp++; } } #endif debugTrace(DEBUG_sparks, "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)", sparkPoolSize(pool), pool->bottom, pool->top); } /* ---------------------------------------------------------------------------- * balanceSparkPoolsCaps: takes an array of capabilities (usually: all * capabilities) and its size. Accesses all spark pools and equally * distributes the sparks among them. * * Could be called after GC, before Cap. release, from scheduler. * -------------------------------------------------------------------------- */ void balanceSparkPoolsCaps(nat n_caps, Capability caps[]) GNUC3_ATTRIBUTE(__noreturn__); void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, Capability caps[] STG_UNUSED) { barf("not implemented"); } /* * Debugging, statistcs and such */ /* the loop in the body is taken from traverseSparkQueue */ nat spark_queue_len(Capability *cap) { //StgClosure **sparkp; StgSparkInfo **sparkp ; SparkPool *pool; StgWord top,bottom, modMask; nat n = 0; pool = cap->sparks; // ASSERT_WSDEQUE_INVARIANTS(pool); top = pool->top; bottom = pool->bottom; sparkp = pool->elements; modMask = pool->moduloSize; while (top < bottom) { /* call evac for all closures in range (wrap-around via modulo) * In GHC-6.10, evac takes an additional 1st argument to hold a * GC-specific register, see rts/sm/GC.c::mark_root() */ // evac( user , sparkp + (top & modMask) ); top++; n++; // only count the number of sparks } return n; } /* check that all sparks have been evacuated (by traverseSparkQueue or pruneSparkQueue) */ void checkSparkQueueEvac (evac_fn evac, void *user, Capability *cap) { //StgClosure **sparkp; SparkPool *pool; StgSparkInfo **sparkp ; StgWord top,bottom, modMask; pool = cap->sparks; ASSERT_WSDEQUE_INVARIANTS(pool); top = pool->top; bottom = pool->bottom; sparkp = pool->elements; modMask = pool->moduloSize; while (top < bottom) { /* call evac for all closures in range (wrap-around via modulo) * In GHC-6.10, evac takes an additional 1st argument to hold a * GC-specific register, see rts/sm/GC.c::mark_root() */ // ASSERT(IS_FORWARDING_PTR(sparkp + (top & modMask))); StgSparkInfo **p = sparkp + (top & modMask); if (!IS_FORWARDING_PTR((StgClosure **)(*p))) { evac( user , (StgClosure **)(*p)); } top++; } } #else #warning "Spark pools are only enabled in THREADED_RTS or PARALLEL_RTS builds" StgInt newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED) { /* nothing */ return 1; } #endif /* THREADED_RTS || PARALLEL_RTS */