/* -------------------------------------------------------------------------- Initialising the parallel RTS An extension based on Kevin Hammond's GRAPH for PVM version P. Trinder, January 17th 1995. Adapted for the new RTS P. Trinder, July 1997. H-W. Loidl, November 1999. rewrite for Eden-6.x, Jost Berthold, 2006 adapted to Eden-6.11, August 2009 ------------------------------------------------------------------------ */ //@menu //* Includes:: //* Variables:: //* Main functions:: //@end menu //@node Includes, Variables //@section Includes #include "Rts.h" #include "RtsUtils.h" #include "GetTime.h" #include "LLC.h" #include "HLC.h" #include "MPSystem.h" /* wraps middleware usage */ #include "Trace.h" #include #include #include "rts/storage/GC.h" #ifndef PARALLEL_RTS /* provide constants nPE and thisPe for foreign import */ nat nPEs = 1; nat thisPE = 1; #endif #ifdef PARALLEL_RTS /* whole rest of the file */ //@node Variables, Main functions, Includes //@section Variables #ifdef TRACING StgWord64 startupTicks; char *argvsave; struct timeval startupTime; struct timezone startupTimeZone; #endif //TRACING /* For flag handling see RtsFlags.h */ // global statistics on no. of sparks, messages etc //@cindex globalParStats #if defined(PAR_TICKY) GlobalParStats globalParStats; nat sparksIgnored = 0, sparksCreated = 0,sparksImported = 0, threadsIgnored = 0, threadsCreated = 0,threadsImported = 0 ; nat totFISH = 0, totSCHEDULE = 0, totACK = 0, totSPARK = 0, totFREE = 0, totFETCH = 0; nat totPackets = 0, totUnpacked = 0; nat censusCnt = 0, censusTSOs = 0; #endif //rtsBool fishing = rtsFalse; /* We have no fish out in the stream *// nat advisory_thread_count = 0; /* number of threads (for par) created */ rtsTime next_fish_to_send_at; /* expected time of next fish */ rtsTime last_fish_arrived_at = 0 ; /* Time of arrival of most recent fish*/ nat outstandingFishes = 0; /* Number of active fishes */ //@cindex PendingFetches /* A list of fetch reply messages not yet processed; this list is filled by awaken_blocked_queue and processed by processFetches */ StgTSO *PendingFetches = END_TSO_QUEUE; FILE *par_log_file = NULL; //@node Main functions, , Variables //@section Main functions /* For flag handling see RtsFlags.h */ void shutdownParallelSystem(StgInt n) { /* use the file specified via -S */ FILE *sf = RtsFlags.GcFlags.statsFile; IF_PAR_DEBUG(verbose, if (n==0) debugBelch("==== entered shutdownParallelSystem ...\n"); else debugBelch("==== entered shutdownParallelSystem (ERROR %d)...\n", (int) n); ); /* fprintf(stderr, "\n The total time spaned execute Haskell code is %11.2fs\n", TICK_TO_DBL((double)globalParStats.time_active)); if (RtsFlags.ParFlags.ParStats.Global) { fprintf(stderr, "PE %d: Sparks: %u created, %u imported, %u ignored; Threads: %u created, %u imported, %u ignored\n", thisPE, sparksCreated,sparksImported,sparksIgnored,threadsCreated, threadsImported, threadsIgnored); fprintf(stderr, "PE %d: Total messages: FISH: %u, SCHEDULE: %u, ACK: %u, SPARK: %u, FREE: %u, FETCH: %u\n", thisPE,totFISH,totSCHEDULE,totACK , totSPARK, totFREE,totFETCH); fprintf(stderr, "PE %d: Total Packets: %u; total unpacked size: %u\n", thisPE, totPackets, totUnpacked); if (censusCnt>0) { fprintf(stderr, "PE %d: TSO census: seen %f TSOs in avg (on %u censi)\n", thisPE, ((1.0)*censusTSOs)/((1.0)*censusCnt), censusCnt); } } */ // JB 11/2006: write stop event, close trace file. Done here to // avoid a race condition if trace files merged by main node // automatically. //edentrace: traceKillMachine traceKillMachine(thisPE); MP_quit(n); // free allocated space (send/receive buffers) freePackBuffer(); freeRecvBuffer(); // freeMoreBuffxers(); // and runtime tables freeRTT(); } /* * SynchroniseSystem synchronises the reduction task with the system * manager, and initialises global structures: receive buffer for * communication, process table, and in GUM the Global address tables * (LAGA & GALA) */ //@cindex synchroniseSystem void synchroniseSystem(void) { MP_sync(); // all kinds of initialisation we can do now... // Don't buffer standard channels... setbuf(stdout,NULL); setbuf(stderr,NULL); // initialise runtime tables (Eden only) initRTT(); // pack buffer in Pack.c, used by Pack.c::packNearbyGraph // InitPackBuffer(); no, done on demand! // HWL TODO: check whether needed // init ring of send buffers (1 for each PE); and one receive buffer if (!initCommBuffers()) barf("initCommBuffers"); // unpack buffer in HLComms.c, passed to Pack.c::unpackGraph // TODO: merge this routine with initCommBuffers if (!initMoreBuffers()) barf("initMoreBuffers"); /* No: done in Schedule.c if (!initSparkPools()) barf("initSparkPools"); */ // initialise GALA and LAGA tables (GUM only) if (!initGAtables()) barf("initGAtables"); /* init log file */ { char par_log_filename[STATS_FILENAME_MAXLEN]; sprintf(par_log_filename, "par_log_%d", thisPE); // argv[0]); par_log_file = fopen(par_log_filename,"w"); if (par_log_file == NULL) { par_log_file = stderr; // errorBelch("Can't open par_log_file %s\n", par_log_filename); } } #if defined(PAR_TICKY) par_ticky_Par_start (); /* init of globalStats struct */ #endif } #ifdef TRACING void emitStartupEvents(void){ //edentrace: traceCreateMachine //startupTicks was fetched earlier, CreateMachine has //to be the first Event writen to keep the order of the //timestamps in the buffers valid traceCreateMachine(thisPE,((startupTime.tv_sec) * 100000000 + (startupTime.tv_usec) * 100),startupTicks); //edentrace: traceVersion traceVersion(ProjectVersion); //edentrace: traceProgramInvocation traceProgramInvocation(argvsave); } #endif /* Do the startup stuff (middleware-dependencies wrapped in MPSystem.h Global vars held in MPSystem: IAmMainThread, thisPE, nPEs Called at the beginning of RtsStartup.startupHaskell */ void startupParallelSystem(int* argc, char **argv[]) { // getStartTime(); // init start time (in RtsUtils.*) // write Event for machine startup here, before // communication is set up (might take a while) // JB 11/2006: thisPE is still 0 at this moment, we cannot name the // trace file here => startup time is in reality sync time. //MD/TH 03/2010: workaround: store timestamp here and use it in synchroniseSystem #ifdef TRACING startupTicks = stat_getElapsedTime() * (1000000000LL/TICKS_PER_SECOND); gettimeofday(&startupTime,&startupTimeZone); //MD: copy argument list to string for traceProgramInvocation int len = 0; int i=0; while (i < *argc){ len+=strlen((*argv)[i])+1; i++; } argvsave = (char *)calloc(len + 1, sizeof(char)); i=0; while (i < *argc){ strcat(argvsave,(*argv)[i]); strcat(argvsave," "); i++; } #endif //TRACING // possibly starts other PEs (first argv is number) // sets IAmMainThread, nPEs MP_start(argc, *argv); (*argv)[1] = (*argv)[0]; /* ignore the nPEs argument */ (*argv)++; (*argc)--; /* Only in debug mode? */ fprintf(stderr, "==== Starting parallel execution on %d processors ...\n", nPEs); setKeepCAFs(); } #endif /* PARALLEL_RTS -- whole file */