/* * Generalised RTE for parallel Haskells. * * File: ghc/rts/parallel/MPIComm.c * * * * * Purpose: map generalised Comm.functions to MPI, * abstracting from the concrete MP-System */ #if defined(PARALLEL_RTS)&&defined(USE_MPI) /* whole file */ #include "Rts.h" // general Rts definitions #include "MPSystem.h" // general interface for message passing #include "mpi.h" // MPI standard include, appropriate include path // needed #include "RtsUtils.h" // utilities for error msg., allocation, etc. #include "PEOpCodes.h" // message codes // TODO eliminate Bsend, use double buffer and Isend/Wait #define MPIBUFFERS /* Global conditions defined here. */ // main thread (PE 1 in logical numbering) rtsBool IAmMainThread = rtsFalse; // Set for the main thread // nPEs, thisPE nat nPEs = 0; // number of PEs in system nat thisPE=0; // node's own ID /* overall variables for MPI (for internal use only) */ // group size, own ID, Buffers int mpiWorldSize; int mpiMyRank; // request for send operations, one for each comm. partner (dynamic!) // UNUSED YET, see MP_send() MPI_Request *commReq[MAX_PES]; #if defined(MPIBUFFERS) int maxMsgs; // MPI Message buffer, can hold maxMsgs messages to every PE. int bufsize; void *mpiMsgBuffer; int msgCount; // to detach/attach buffer to avoid buffer overflow #endif // status for receive and probe operations: MPI_Status status; /* mpi.h sez: struct _status int MPI_SOURCE; int MPI_TAG; int MPI_ERROR; .. more, implementation-dependent */ /************************************************************** * Startup and Shutdown routines (used inside ParInit.c only) */ /* MP_start starts up the node: * - connects to the MP-System used, * - determines wether we are main thread * - starts up other nodes in case we are first and * the MP-System requires to spawn nodes from here. * Parameters: * IN argv - char**: program arguments * Sets: * nPEs - int: no. of PEs to expect/start * IAmMainThread - rtsBool: wether this node is main PE * Returns: Bool: success or failure * * MPI Version: * nodes are spawned by startup script calling mpirun * This function only connects to MPI and determines the main PE. */ rtsBool MP_start(int* argc, char** argv) { IF_PAR_DEBUG(mpcomm, debugBelch("MPI_Init: starting MPI-Comm...\n")); MPI_Init(argc, &argv); // MPI sez: can modify args MPI_Comm_rank(MPI_COMM_WORLD, &mpiMyRank); IF_PAR_DEBUG(mpcomm, debugBelch("I am node %d.\n", mpiMyRank)); if (!mpiMyRank) // we declare node 0 as main PE. IAmMainThread = rtsTrue; MPI_Comm_size(MPI_COMM_WORLD, &mpiWorldSize); // we should have a correct argument... ASSERT(argv && argv[1]); nPEs = atoi(argv[1]); if (nPEs) { // we have been given a size, so check it: IF_PAR_DEBUG(mpcomm, debugBelch("Expecting to find %d processors, found %d.", nPEs, mpiWorldSize)); if ((int)nPEs > mpiWorldSize) IF_PAR_DEBUG(mpcomm, debugBelch("WARNING: Too few processors started!")); } else { // otherwise, no size was given IF_PAR_DEBUG(mpcomm, debugBelch("No size, given, started program on %d processors.", mpiWorldSize)); } nPEs = mpiWorldSize; // (re-)set size from MPI (in any case) return rtsTrue; } /* MP_sync synchronises all nodes in a parallel computation: * sets: * thisPE - GlobalTaskId: node's own task Id * (logical node address for messages) * Returns: Bool: success (1) or failure (0) * * MPI Version: * the number of nodes is checked by counting nodes in WORLD * (could also be done by sync message) * Own ID is known before, but returned only here. */ rtsBool MP_sync(void) { #if defined(MPIBUFFERS) // initialise counters/constants and allocate/attach the buffer int temp; // guessed: 2 msg.s to every PE enough buffer for pending messages maxMsgs = 2 * nPEs; // calculate maximum space for one packet MPI_Pack_size(DATASPACEWORDS, MPI_LONG, MPI_COMM_WORLD, &temp); // and resulting buffer space bufsize = maxMsgs * (temp + MPI_BSEND_OVERHEAD); mpiMsgBuffer = (void*) stgMallocBytes(bufsize, "mpiMsgBuffer"); MPI_Buffer_attach(mpiMsgBuffer,bufsize); msgCount = 0; // when maxMsgs reached #endif thisPE = mpiMyRank + 1; IF_PAR_DEBUG(mpcomm, debugBelch("Node %d synchronising.\n", thisPE)); MPI_Barrier(MPI_COMM_WORLD); // unnecessary... return rtsTrue; } /* MP_quit disconnects current node from MP-System: * Parameters: * IN isError - error number, 0 if normal exit * Returns: Bool: success (1) or failure (0) * * MPI Version: MPI requires that all sent messages must be received * before quitting. Receive or cancel all pending messages (using msg. * count), then quit from MPI. */ rtsBool MP_quit(int isError) { long data[2]; if (IAmMainThread) { int i; IF_PAR_DEBUG(mpcomm, debugBelch("Main PE stopping MPI system (exit code: %d)\n", isError)); // bcast FINISH to other PEs data[0] = PP_FINISH; data[1] = isError; for (i=2; i<=(int)nPEs; i++) // synchronous send operation in order 2..nPEs ... might slow down. MPI_Send(data,2,MPI_LONG,i-1, PP_FINISH, MPI_COMM_WORLD); } IF_PAR_DEBUG(mpcomm, debugBelch("shutting down MPI now (exit code: %d)\n", isError)); // TODO: receive or cancel all pending messages... /* ------------------------------------------------ *q&d solution: * receive anything retrievable by MPI_Probe * then get in sync * then again receive remaining messages * * (since buffering is used, and buffers are detached to force * messages, a PE might get stuck detaching its mpiMsgBuffer, and * send another message as soon as buffer space is available again. * The other PEs will not ) * * ---------------------------------------------- */ { // allocate fresh buffer to avoid overflow void* voidbuffer; voidbuffer = (void*) stgMallocBytes(DATASPACEWORDS * sizeof(long), "voidBuffer"); // receive whatever is out there... while (MP_probe()) { MPI_Recv(voidbuffer, DATASPACEWORDS, MPI_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } MPI_Barrier(MPI_COMM_WORLD); // all in sync (noone sends further messages), receive rest while (MP_probe()) { MPI_Recv(voidbuffer, DATASPACEWORDS, MPI_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE); } stgFree(voidbuffer); } // end of q&d #ifdef MPIBUFFERS IF_PAR_DEBUG(mpcomm, debugBelch("detaching MPI buffer\n")); MPI_Buffer_detach(mpiMsgBuffer,&bufsize); stgFree(mpiMsgBuffer); #endif IF_PAR_DEBUG(mpcomm, debugBelch("Goodbye\n")); MPI_Finalize(); return rtsTrue; } /* Dummies, TODO */ void MP_send(int node, OpCode tag, long *data, int length){ /* MPI normally uses blocking send operations (MPI_*send). When * using nonblocking operations (MPI_I*send), dataspace must remain * untouched until the message has been delivered (MPI_Wait)! * * 1st solution: use Bsend convenience functions (MPI manual sez "bad idea") * => Use one dedicated MPI send buffer for all PEs. * When in doubt of buffer overflow, force receive operation * by MPI_Buffer_detach/attach (will not return until all delivered). * Caveats: 1. buffering additional to the one above in LLComms. * 2. detach/attach forces *all* messages to be sent! * No nice way to find out what has already been received. * * 2nd, better, more complex solution: * i. PE sends same kind of msg (PACKET) to same receiver (other PE k) * ii. data to send is always in packContext[k]->dataptr * iii. by switching between 2 dataspaces for every packContext, we * can achieve that data remains untouched. Sending away will * preceed switching back. * => Use prepared send operations, keep message to PE n in mind using * commReq[n] (previous send must complete before current is started), * switch packContext to point to free space. * * caveats: 1. switching buffer space affects LLComms * 2.system messages (FINISH,FISH,REVAL) exceptional * */ ASSERT(node > 0 && (nat)node <= nPEs); IF_PAR_DEBUG(mpcomm, debugBelch("MPI sending message to PE %d " "(tag %d, datasize %d)\n", node, tag,length)); // adjust node no. node--; #if defined(MPIBUFFERS) if (msgCount > maxMsgs) { // could be too many pending messages, force by detach/attach IF_PAR_DEBUG(mpcomm, debugBelch("MPI flushing buffer\n")); MPI_Buffer_detach(&mpiMsgBuffer,&bufsize); MPI_Buffer_attach(mpiMsgBuffer,bufsize); msgCount = 0; } // send the message msgCount++; IF_PAR_DEBUG(mpcomm, debugBelch("MPI sending message to PE %d " "(tag %d, datasize %d)\n", node, tag,length)); MPI_Bsend(data, length, MPI_LONG, node, tag, MPI_COMM_WORLD); #else debugBelch("MP_send for MPI: implement me!!\n"); #endif } /* - a blocking receive operation where system messages from main node have priority! */ int MP_recv(int maxlength, long *destination, OpCode *retcode, nat *sender) { /* MPI: Use MPI_Probe to get size, sender and code; check maxlength; * receive required data size (must be known when receiving in * MPI) * No special buffer is needed here, can receive into *destination. */ int source, size, code; int haveSysMsg = rtsFalse; code = MIN_PEOPS-1; // IF_PAR_DEBUG(mpcomm, debugBelch("MP_recv for MPI.\n")); // priority for system messages, probed before accepting anything // todo: use different communicator! while ( !haveSysMsg && code <=MAX_PEOPS ) { code++; if (ISSYSCODE(code)) // non-blocking probe, MPI_Iprobe(MPI_ANY_SOURCE, code, MPI_COMM_WORLD, &haveSysMsg, &status); } // if SysMsg flag is set: we receive a system message if (haveSysMsg) { // TODO: insert smart assertion } else { // blocking probe for any message, returns source and tag in status MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); } if (status.MPI_ERROR != MPI_SUCCESS) { debugBelch("MPI: Error receiving message.\n"); barf("PE %d aborting execution.\n", thisPE); } // get and check msg. size // size = status.st_length; MPI_Get_count(&status, MPI_LONG, &size); if (maxlength < size) barf("wrong MPI message length (%d, too big)!!!", size); // really receive (exactly!) the message probed above: source = status.MPI_SOURCE; code = status.MPI_TAG; MPI_Recv(destination, size, MPI_LONG, source, code, MPI_COMM_WORLD, &status); *retcode = status.MPI_TAG; *sender = 1+status.MPI_SOURCE; IF_PAR_DEBUG(mpcomm, debugBelch("MPI Message from PE %d with code %d.\n", *sender, *retcode)); ASSERT(*sender == (nat)source+1 && *retcode == code); return size; } /* - a non-blocking probe operation (unspecified sender) */ rtsBool MP_probe(void){ int flag = 0; // non-blocking probe: either flag is true and status filled, or no // message waiting to be received. Using ignore-status... MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE); return (flag != 0); } #endif /* whole file */