/* -------------------------------------------------------------------------- Time-stamp: Low Level Communications Header (LLC.h) Contains the definitions used by the Low-level Communications module of the GUM Haskell runtime environment. Based on the Graph for PVM implementation. Phil Trinder, Glasgow University, 13th Dec 1994 Adapted for the 4.xx RTS H-W. Loidl, Heriot-Watt, November 1999 Restructuring, generic parallel RTE (JB 2004): New purpose of LLC/LLComms is to introduce a buffering layer above the real communication (the latter abstracted in MPSystem.h). Collect packets and save real messages by sending them together. The interface of LLComms remains almost identical to the former. ----------------------------------------------------------------------- */ #ifndef __LLC_H #define __LLC_H #ifdef PARALLEL_RTS /* whole file */ //@node Low Level Communications Header, , , //@section Low Level Communications Header //@menu //* Includes:: //* Macros and Constants:: //* PVM macros:: //* Externs:: //@end menu //@node Includes, Macros and Constants, Low Level Communications Header, Low Level Communications Header //@subsection Includes // Rts.h should come before this file... #include "Rts.h" // absolutely required for correct operation: #include "PEOpCodes.h" #ifdef HAVE_STRING_H # include // for memory manipulation #else # error string.h missing. Buffering will not work, compiler will not build! #endif // #include "Parallel.h" // it's dead, Jim #define ANY_TASK (-1) /* receive messages from any task */ #define ANY_GLOBAL_TASK ANY_TASK #define ANY_OPCODE (-1) /* receive any opcode */ #define ALL_GROUP (-1) /* wait for barrier from every group member */ #define PEGROUP "PE" #define MGRGROUP "MGR" #define SYSGROUP "SYS" #define PETASK "PE" // magic markers to be used in packets (message separators) #define NEXT_MSG 0x1111feed #define END_OF_PACKET 0x1111dead // Buffer type definition (suitable for sending and receiving): typedef struct MessageBuffer_ { rtsTime age; // when sent / started ? nat pe; // to / from where ? struct MessageBuffer_ *next;// link for ring of all buffers nat active; // 0 == inactive, otherwise should be OpCode of actual packet (or dummy) long counter; // message counter (statistics) when sending / data size when receiving long* position; // actual unpacking position in buffer long* PackData; // data in the buffer (pointer into a separate data space, unusual in this code) } MessageBuffer; #define sync(gp,op) do { \ broadcast(gp,op); \ pvm_barrier(gp,ALL_GROUP); \ } while(0) #define broadcast(gp,op) do { \ pvm_initsend(PvmDataDefault); \ pvm_bcast(gp,op); \ } while(0) #define checkComms(c,s) do { \ if ((c)<0) { \ pvm_perror(s); \ stg_exit(EXIT_FAILURE); \ }} while(0) #define _my_gtid pvm_mytid() #define GetPacketMACRO() pvm_recv(ANY_TASK,ANY_OPCODE) #define PacketsWaitingMACRO() (pvm_probe(ANY_TASK,ANY_OPCODE) != 0) #define SPARK_THREAD_DESCRIPTOR 1 #define GLOBAL_THREAD_DESCRIPTOR 2 #define _extract_jump_field(v) (v) #define MAX_DATA_WORDS_IN_PACKET 1024 // loads next packet into buffer, sets ReceiveBuffer.active=Opcode; or delivers data int GetPacket(void); // checks if there are packets to process (does not load them) rtsBool PacketsWaiting(void); // packing data into buffer: // refers to a global packing context which must be set before (internal) // attention: unprotected, can provoque buffer overflows without warning! #define PutArg1(a) *(packContext->position++) = (long )(a) #define PutArg2(a) *(packContext->position++) = (long )(a) #define PutArgN(n,a) *(packContext->position++) = (long )(a) #define PutArgs(b,n) do {\ memcpy(packContext->position, (long *)b,(n)*sizeof(long)); \ packContext->position+=n;} \ while(0) #define PutLit(l) { int a = l; PutArgN(?,a); } /* basic PVM unpacking */ // refers to a global unpacking context (must be set before) extern MessageBuffer *unpackContext; #define GetArg1(a) a=*(unpackContext->position++) #define GetArg2(a) a=*(unpackContext->position++) #define GetArgN(n,a) a=*(unpackContext->position++) #define GetArgs(b,n) do {\ memcpy((long *)b, unpackContext->position, (n)*sizeof(long)); \ unpackContext->position+=n;} \ while(0) // buffer setup: void initMessageBuffers(nat pe); // init send buffer for a new PE void initCommBuffers(void); // init send buffers for all PEs (nPEs) and receive buffer void insertNewCommBuffer(nat pe);// stub, for open systems where PEs can join void freeCommBuffers(void); // free the allocated space // administrative functions for sending messages: void sendImmediately(nat toThisPE); // request immediate message sending (for urgent messages) void sendOldBuffers(void); // check all buffers for age and send old ones (Scheduler) // statistics output: void showPacketStats(void); //@node Externs, , PVM macros, Low Level Communications Header //@subsection Externs /* basic message passing routines */ extern void sendOp (OpCode,nat), sendOp1 (OpCode,nat,StgWord), sendOp2 (OpCode,nat,StgWord,StgWord), sendOpV (OpCode,nat,int,...), sendOpN (OpCode,nat,int,StgPtr), sendOpNV (OpCode,nat,int,StgPtr,int,...); extern void broadcastOpN(OpCode op, char *group, int n, StgPtr args); /* extracting header of a message */ void getOpcodeAndSender (OpCode *popcode, nat *psender_id); #endif /*PARALLEL_RTS */ #endif /*defined __LLC_H */