/* -------------------------------------------------------------------------- Initialising the parallel RTS An extension based on Kevin Hammond's GRAPH for PVM version P. Trinder, January 17th 1995. Adapted for the new RTS P. Trinder, July 1997. H-W. Loidl, November 1999. rewrite for Eden-6.x, Jost Berthold, 2006 adapted to Eden-6.11, August 2009 ------------------------------------------------------------------------ */ #include "Rts.h" #include "RtsUtils.h" #include "LLC.h" #include "HLC.h" #include "Monitoring.h" #include "MPSystem.h" /* wraps middleware usage */ #ifdef PARALLEL_RTS /* whole rest of the file */ /* provide constants nPE and thisPe for foreign import */ /* now in PVMComms.c or MPIComms.c */ #if 0 nat nPEs = 1; nat thisPe = 1; #endif // global statistics on no. of sparks, messages etc //@cindex globalParStats #if defined(PAR_TICKY) GlobalParStats globalParStats; nat sparksIgnored = 0, sparksCreated = 0,sparksImported = 0, threadsIgnored = 0, threadsCreated = 0,threadsImported = 0 ; nat totFISH = 0, totSCHEDULE = 0, totACK = 0, totSPARK = 0, totFREE = 0, totFETCH = 0 ,totSPK=0 ; nat totPackets = 0, totUnpacked = 0; nat censusCnt = 0, censusTSOs = 0; #endif //rtsBool fishing = rtsFalse; /* We have no fish out in the stream *// /* extern int totConv_SRK; */ nat advisory_thread_count = 0; /* number of threads (for par) created */nat totM=0; rtsTime next_fish_to_send_at; /* expected time of next fish */ rtsTime last_fish_arrived_at = 0 ; /* Time of arrival of most recent fish*/ nat outstandingFishes = 0; /* Number of active fishes */ //@cindex PendingFetches /* A list of fetch reply messages not yet processed; this list is filled by awaken_blocked_queue and processed by processFetches */ StgTSO *PendingFetches = END_TSO_QUEUE; int sparksported =0; PESStatic *PEsStatic =NULL; PESDynamic *PEsDynamic = NULL; ComMap *comMap=NULL; StgInt total_sparks_gum_converted=0; char *name; /* For flag handling see RtsFlags.h */ void shutdownParallelSystem(StgInt n) { /* use the file specified via -S */ FILE *sf = RtsFlags.GcFlags.statsFile; IF_PAR_DEBUG(verbose, if (n==0) debugBelch("==== entered shutdownParallelSystem ...\n"); else debugBelch("==== entered shutdownParallelSystem (ERROR %d)...\n", (int) n); ); /* fprintf(stderr, "\n The total time spaned execute Haskell code is %11.2fs\n", TICK_TO_DBL((double)globalParStats.time_active)); if (RtsFlags.ParFlags.ParStats.Global) { fprintf(stderr, "PE %d: Sparks: %u created, %u imported, %u ignored; Threads: %u created, %u imported, %u ignored\n", thisPE, sparksCreated,sparksImported,sparksIgnored,threadsCreated, threadsImported, threadsIgnored); fprintf(stderr, "PE %d: Total messages: FISH: %u, SCHEDULE: %u, ACK: %u, SPARK: %u, FREE: %u, FETCH: %u\n", thisPE,totFISH,totSCHEDULE,totACK , totSPARK, totFREE,totFETCH); fprintf(stderr, "PE %d: Total Packets: %u; total unpacked size: %u\n", thisPE, totPackets, totUnpacked); if (censusCnt>0) { fprintf(stderr, "PE %d: TSO census: seen %f TSOs in avg (on %u censi)\n", thisPE, ((1.0)*censusTSOs)/((1.0)*censusCnt), censusCnt); } } */ // JB 11/2006: write stop event, close trace file. Done here to // avoid a race condition if trace files merged by main node // automatically. /* if (EDENTRACE) { traceEvent( FETE_END_MACHINE | RECORD_TRACE, 0, 0 ); endEdenTracing(); } */ MP_quit(n); // free allocated space (send/receive buffers) freePackBuffer(); freeRecvBuffer(); // freeMoreBuffxers(); // and runtime tables freeRTT(); } /* * SynchroniseSystem synchronises the reduction task with the system * manager, and initialises global structures: receive buffer for * communication, process table, and in GUM the Global address tables * (LAGA & GALA) */ //@cindex synchroniseSystem void synchroniseSystem(void) { MP_sync(); // static table information mka19 // JB 11/2006: now that we know our number (thisPE), we start // tracing. Too late, see startupParallelSystem below. And the // EVENTLOG solution, up to now, is not even ready at this point. /* if (EDENTRACE) { initEdenTracing(thisPE); // TODO: include real start time in this event, see below // => changes in EdenTV tool(s) needed! traceEvent( FETE_START_MACHINE | RECORD_TRACE, 0, 0 ); } */ // all kinds of initialisation we can do now... // Don't buffer standard channels... setbuf(stdout,NULL); setbuf(stderr,NULL); // initialise runtime tables (Eden only) initRTT(); // pack buffer in Pack.c, used by Pack.c::packNearbyGraph // InitPackBuffer(); no, done on demand! // init ring of send buffers (1 for each PE); and one receive buffer initCommBuffers(); if (nPEs > 1) { startPEMonitoring(thisPE); distPEsStatic(thisPE , name ); // clusterPEs(); //PEsStaticprint(); } // int n = pvm_barrier(name , nPEs ); // fprintf(stderr,"\n before the barrier %s \n ",name); // int n = pvm_barrier(name , nPEs ); //lxparafprintf(stderr,"\n the program pass the barrier %s \n ",name); // init unpack buffer in HLComms.c, passed to Pack.c::unpackGraph // TODO: merge this routine with initCommBuffers if (!initMoreBuffers()) barf("initMoreBuffers"); /* No: done in Schedule.c if (!initSparkPools()) barf("initSparkPools"); */ // initialise GALA and LAGA tables (GUM only) if (!initGAtables()) barf("initGAtables"); #if defined(PAR_TICKY) par_ticky_Par_start (); /* init of globalStats struct */ #endif } /* Do the startup stuff (middleware-dependencies wrapped in MPSystem.h Global vars held in MPSystem: IAmMainThread, thisPE, nPEs Called at the beginning of RtsStartup.startupHaskell */ void startupParallelSystem(int* argc, char **argv[]) { name = *argv[0]; // getStartTime(); // init start time (in RtsUtils.*) // write Event for machine startup here, before // communication is set up (might take a while) // JB 11/2006: thisPE is still 0 at this moment, we cannot name the // trace file here => startup time is in reality sync time. // TODO: store start time HERE, in a variable, write event later. // drawback: changes the trace file format! // possibly starts other PEs (first argv is number) // sets IAmMainThread, nPEs MP_start(argc, *argv); (*argv)[1] = (*argv)[0]; /* ignore the nPEs argument */ (*argv)++; (*argc)--; /* Only in debug mode? */ fprintf(stderr, "==== Starting parallel execution on %d processors ...\n", nPEs); } #endif /* PARALLEL_RTS -- whole file */