/*
 * RTTables.c: Runtime tables for Parallel Haskell
 * Author: 
 *  Jost Berthold, 
 *  Philipps-University of Marburg
 *
 * implements RTTables.h, see there for more descriptions...
 *
 * Review 2009: use hash tables for tso->process and tso->receiver
 * (avoid changing TSO structure)
 *
 ***********************************************/


#if defined(PARALLEL_RTS) // whole file

#include "Rts.h"
#include "parallel/RTTables.h"

#include "RtsUtils.h"  // allocates system space
#include "parallel/PEOpCodes.h" // sends message when closing inports
#include "sm/GC.h"   // updates table after GC: isAlive

#include "Trace.h"

#if defined(DEBUG)
#include "sm/Sanity.h"
#include "Threads.h"
#endif

#include "Hash.h"

// the runtime table, globally in scope: 
// a linked list of processData
ProcessData* processtable;

// We maintain a hash table ThreadID->ProcessId
// as well as a hash table ThreadID->Receiver(Port). 
// TSO->par.process, TSO->par.receiver

HashTable *threadproctable, *threadrecvtable;

// ID factory; Both IDs are StgWords, unlikely to
// be too small. Any ID 0 is reserved for the system, so these vars
// hold the _existing_ max.IDs so far, respectively.
StgWord procmax = 0; // obsolete in non-debugging setup!
StgWord inportmax = 0;


// Constant System ports:
Port RtsPort = (Port) {0, 0, 0}; // machine field set by initRTT()
Port NoPort  = (Port) {0, 0, 0}; // constant for error returns

#define localPort(p, id) ((Port) {thisPE, (p), (id)})

/* Forward Declarations (if not included in header...) */
void CommCheckFailed(void);
void killProcess_(ProcessData* p);
Inport* addInport_(ProcessData *p, StgClosure *blackhole);
rtsBool updateTSOList(ProcessData *p);
void updateInports(ProcessData *p);

// action if 1 to 1 check fails:
void CommCheckFailed(void) {
#if defined(DEBUG)
  errorBelch("1 to 1 connection check failed: inspect channel usage in program");
/*   errorBelch("1 to 1 check failed: check program (RTS bug?)"); */
/*   barf("Aborting program, suspected RTS bug"); */
/*   stg_exit(EXIT_FAILURE); // in case barf returns */
#endif
}

// initialise Runtime Table: set processtable=NULL and machine ID
// Declared in Parallel.h
void initRTT(void) { 
  processtable=NULL; 
  RtsPort.machine = thisPE;
  //  dummyPort.machine = thisPE;
  threadproctable = allocHashTable();
  threadrecvtable = allocHashTable();
}

// free space allocated by runtime table: we expect it to be empty!
// Declared in Parallel.h
void freePort(void* port);
void freePort(void* port) { 
  stgFree(port); 
}
void freeRTT(void) {
  // NOPE: ASSERT(processtable == NULL);

  // there can still be processes around, childprocesses on the main
  // PE might still have threads, and thus entries on shutdown
  if (processtable!= NULL) {
    // kill all entries. The program is going to terminate.
    ProcessData* last = processtable;
    Inport *inport;
    while (last != NULL) {
      processtable = processtable->next;
      // no killProcess_, which is during normal running conditions
      // but we free any allocated inports
      while ( last->inports != NULL ) {
	inport = last->inports;
	last->inports = inport->next;
	stgFree(inport);
      }
      // and free the process table entry
      stgFree(last);
      last = processtable;
    }
  }
  freeHashTable(threadproctable, NULL); // do not free entries
  freeHashTable(threadrecvtable, freePort); // free allocated ports
}

// Port comparison, is trivial...
rtsBool equalPorts(Port p, Port q) {
  if ( p.machine == q.machine 
       && p.process == q.process 
       && p.id      == q.id)
    return rtsTrue;
  return rtsFalse; 
}

// processTable actions are implemented OO-style,
// all methods have a first argument processId
// newProcess is the constructor.

/* Process creation: on creating the first TSO.  Allocate
 *   ProcessData*, spend fresh ID, fill in TSO, set inports=NULL
 */
void newProcess(StgTSO* firstTSO) {
  ProcessData* newProc;
  
  ASSERT(firstTSO); // we have a 1st member thread
  IF_PAR_DEBUG(procs,
	       debugBelch("New Process\n"));

  newProc = (ProcessData*) stgMallocBytes(sizeof(ProcessData), 
					  "New Process");
  newProc->id = ++procmax;

  IF_PAR_DEBUG(procs,
	       debugBelch("process %d(max.%d), first thread is %d\n",
			  (int) newProc->id, (int) procmax, 
			  (int) firstTSO->id));

  newProc->inports = NULL;
  newProc->tsos = 1;

  insertHashTable(threadproctable, firstTSO->id, (void*) newProc->id);

  newProc->next = processtable;
  processtable = newProc;

  // edentrace: new process

  traceCreateProcess(newProc->id);
  //edentrace: assign thread to (new) process 
  traceAssignThreadToProcessEvent(firstTSO->cap, firstTSO->id, newProc->id);
}

// used virtually for every action messages (from other machines) only
// contain ids, not memory addresses.  

// find a process by its id:
STATIC_INLINE
ProcessData* findProcess(StgWord processId) {
  ProcessData* p = processtable;

  if ( processId > procmax ) {
    // impossible!
    debugBelch("findProcess: Process %d (> max=%d) requested.",
	       (int) processId, (int) procmax );
    return NULL; // caller supposed to handle this accurately (fail or ignore op.)
  }

  while(p != NULL) {
    if (p->id == processId) break;
    p = p->next;
  }
  if (p == NULL) 
    debugBelch("findProcess: non-existent process %d\n", 
	       (int)processId);
  return p; 

}

/* Process termination:
 *  should happen when tsos == END_TSO_QUEUE, or on external request
 *  needs to close all inports (-> could send messages!) 
 *    and to kill all TSOs (-> raiseException) 
 *    and to remove entry from linked list 
 *    and deallocate it.
 */

void killProcess_(ProcessData *p) {
  Inport *inport, *inports;
  ProcessData* temp;
  rtsPackBuffer termMsgBuffer;

  // p is our process
  ASSERT(p!= NULL);

  // only used in here, only when no TSOs left in process
  ASSERT(p->tsos == 0);
  IF_PAR_DEBUG(procs,
	       debugBelch("killing Process %d at %p.\n",
			  (int) p->id, p));
  temp = processtable;
  ASSERT(NULL != temp);
  if (p == temp) {
    processtable = temp->next;
  } else {
    while (p != temp->next )
      temp = temp->next;
    temp->next = p->next;
  }

  // edentrace kill process
  traceKillProcess(p->id);

  // process might have no inports! Otherwise close them
  inports = p->inports;

  if ( inports != NULL ) {
    // initialise shared fields of the message
    termMsgBuffer.size  =0;
    termMsgBuffer.id    =0;
    termMsgBuffer.sender=localPort(p->id, 0); // inport ID set later
  }
  while ( inports != NULL ) {
    inport = inports;
    IF_PAR_DEBUG(ports,
	 debugBelch("closing inport %d, sender (%d,%d,%d)\n",
		    (int) inport->id,inport->sender.machine,
		    (int) inport->sender.process,
		    (int) inport->sender.id));

    if (!isNoPort(inport->sender)) {
      IF_PAR_DEBUG(ports,
		   debugBelch("sending %s to sender\n", 
			      getOpName(PP_TERMINATE)));
      termMsgBuffer.sender.id= inport->id;
      termMsgBuffer.receiver = inport->sender;
      sendMsg(PP_TERMINATE, &termMsgBuffer);
      // TODO handle send failures. Since we cannot just retry, module should
      // buffer messages and send all at once, in a separate procedure.
    }
    inports = inports->next;
    stgFree(inport);
  }

  p->id = 0;
  stgFree(p);
}


/* modify a process:
 *   Event                       modification
 *  ---------------------------------------------------------
 *  make new inport     add fresh inport to process of caller
 *                       (uses: process field in calling TSO)
 *  receive data      update inport (no process modification)
 *  close inport               remove inport from linked list
 *  fork a thread        add created TSO to process of caller
 *  terminate a thread      remove TSO from process of caller 
 *                             (check no. of TSOs afterwards)
 */

// create and add new inport (placeholder already existing)
Inport* addInport_(ProcessData *p, StgClosure *blackhole) {
  Inport* newIn;

  ASSERT(p != NULL);
  ASSERT(IsBlackhole(blackhole));

  newIn = (Inport*) stgMallocBytes(sizeof(Inport),
				   "new inport");
  newIn->id = ++inportmax;
  IF_PAR_DEBUG(ports,
      debugBelch("inport %d(%d) for process %d, closure %p\n",
		 (int) newIn->id, (int) inportmax, 
		 (int) p->id, blackhole));

  newIn->closure = blackhole;
  newIn->sender = NoPort;

  newIn->next = p->inports;
  p->inports = newIn;

  return newIn;
}

// version to be used in primitive operation (returns ID only)
StgWord 
addInport(StgWord processId, StgClosure* blackhole ) {
  Inport* newIn;
  ProcessData* p = findProcess(processId);

  if (p == NULL) {
    barf("addInport: no process found!");
  }
  IF_PAR_DEBUG(ports,
      debugBelch("new inport for process %d:\n",
		    (int) processId));
  newIn = addInport_(p, blackhole);
  return newIn->id;
}



// locate an inport in a process: called when receiving data
STATIC_INLINE
Inport* findInport(StgWord processId, StgWord id){
  // FIND_IN_LIST(FIND_IN_LIST(processId,processtable)->inports,id);
  ProcessData* p = findProcess(processId);

  if (p != NULL) { 
    Inport* i = p->inports;
    while(i != NULL) {
      if (i->id == id) { return i; }
      i = i->next;
    }
  }
  return NULL;
}

// connnect an inport to a sender. Reconnecting not allowed!
void connectInport(StgWord processId, StgWord id, Port sender) {
  Inport* i;

  IF_PAR_DEBUG(ports,
      debugBelch("connect inport %d for process %d to (%d,%d,%d)\n",
		 (int) id, (int) processId, sender.machine,
		 (int) sender.process, (int) sender.id));

  i = findInport(processId, id);

  if (i == NULL) {
    IF_PAR_DEBUG(ports,
	 debugBelch("connection request for nonexistent port (.,%d,%d), "
		    "ignoring it.\n", (int) processId, (int) id));
    return;
  }
  ASSERT(i);
  
  // inports should be assigned a sender only once!
  if (!(isNoPort(i->sender))) { // 1:1 check failed!
    CommCheckFailed();
  }
  i->sender = sender;
}

// set a receiver for a TSO. Called by primop "connectToPort#"
// directly.  Reconnecting allowed (but problems might arise when
// reconnection intended: thread maybe terminated on previous inport).
void setReceiver(StgTSO* tso, nat pe, StgWord proc, StgWord id) {
  Port* portInHashTable, *oldPort;
  
  ASSERT(get_itbl(tso)->type == TSO);

  IF_PAR_DEBUG(ports,
      debugBelch("connect TSO %d to inport (%d,%d,%d)\n",
		 (int) tso->id, 
		 (int) pe, (int) proc, (int) id ));

  ASSERT(pe != 0 && proc != 0 && id != 0);

  // reconnection allowed, remove old entry.
  oldPort = lookupHashTable(threadrecvtable, tso->id);
  if (oldPort != NULL) {
    stgFree(oldPort);
    // currently not needed, insert will overwrite
    removeHashTable(threadrecvtable, tso->id, oldPort);
  }

  // we have to allocate a port which we put in the hash table
  portInHashTable = (Port*) stgMallocBytes(sizeof(Port), "setReceiver");
  portInHashTable->machine = pe;
  portInHashTable->process = proc;
  portInHashTable->id      = id;
  
  insertHashTable(threadrecvtable, tso->id, portInHashTable);

  return;
}


// remove an inport from a process (and deallocate it)
// does NOT send a PP_TERMINATE message
void removeInport(StgWord processId, StgWord inportId) {
  ProcessData *p;

  Inport **last, *remv;

  IF_PAR_DEBUG(ports,
      debugBelch("remove inport %d from process %d\n",
		 (int) inportId, (int) processId));

  p = findProcess(processId);
  if (p == NULL) {
    IF_PAR_DEBUG(ports,
	 debugBelch("removeInport: non-existent process, ignoring."));
    return; // shrug...
  }

  ASSERT(p != NULL);

  last = &(p->inports);
  remv = p->inports;
  while (remv != NULL && remv->id != inportId) {
      ASSERT(*last == remv); // INV: last points to remv (from
			     // previous one or process entry)
      last = &(remv->next);
      remv = remv->next;
  }
  
  if (remv == NULL) {// not found!
    ASSERT(last != NULL && *last == NULL);
    IF_PAR_DEBUG(ports,
	 debugBelch("Inport %d: not found in process %d.\n",
				(nat) inportId, (nat) processId));
  } else {
    // found, remove and free it.
    // last points to the field where remv was referenced in the list
    ASSERT(remv);
    *last = remv->next; 

    stgFree(remv);
    IF_PAR_DEBUG(ports,
	 debugBelch("inport %d removed (process %d)\n",
		    (int) inportId, (int) p->id));
  }
}

// add a thread to a process
void addTSO(StgWord processId, StgTSO* tso) {
  ProcessData *p;

  if (processId == 0) {
    IF_PAR_DEBUG(procs,
		 debugBelch("addTSO for process 0, skipping.\n"));
    return;
  }

  ASSERT(processId != 0);
  p = findProcess(processId);;

  IF_PAR_DEBUG(procs,
	       debugBelch("add thread %d to process %d\n",
			  (int) tso->id, (int) processId));

  ASSERT(tso != NULL);
  ASSERT(p != NULL);

  insertHashTable(threadproctable, tso->id, (void*) processId);
  p->tsos += 1;

  // newThreadEvent emitted only here (thread is created before), we
  // do not track tsos internal to the RTS (finalizers etc.)

  // edentrace: new thread (now in schedule.c)
  // edentrace: assign thread to (existing) process
  traceAssignThreadToProcessEvent(tso->cap, tso->id, processId);
  return;

}

// find a TSO in a process: needed for external termination..
// Avoiding to touch the TSO struct, we need to traverse a list of all
// threads per GC "step", for all steps. See Threads.c::printAllThreads.

StgTSO* findTSO(StgWord processId, StgWord id) {
  // Capability* cap;
  // nat i;
  nat g;
  StgTSO *t;
  StgWord proc;
  ProcessData *p = findProcess(processId);

  if (p == NULL ) {
    IF_PAR_DEBUG(procs,
		 debugBelch("findTSO %ld: Unknown process %ld.\n", 
			    id, processId));
    return NULL;
  }

  IF_PAR_DEBUG(procs, 
      debugBelch("Searching thread %ld in process %ld (%d threads).\n", 
		 id, processId, p->tsos);
      printAllThreads());

  proc = (StgWord) lookupHashTable(threadproctable, id);
  if (processId != proc) {
    IF_PAR_DEBUG(procs,
		 debugBelch("findTSO: wrong process ID %ld. "
			    "Thread %ld claims it belongs to process %ld.\n",
			    processId, id, proc));
    return NULL;
  }

  // see Threads.c::printAllThreads
  /*
  // if thread is not blocked, it is listed in one capability
  for (i=0; i < n_capabilities; i++) {
    cap = &capabilities[i];
    t = cap->run_queue_hd;
    while (t != END_TSO_QUEUE) {
      if (t->id == id) 
	return t;
      else 
	t = t->_link;
    }
  }
  */
  // otherwise, thread could be blocked. Traverse all generations.
  // NB: this looks at non-blocked threads again, so only do the 2nd loop
  for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
    t = generations[g].threads;
    while(t != END_TSO_QUEUE) {
      if (t->id == id) {
	  return t;
      } else {
	t = t->global_link;
      }
    }
  }

  // If we reach here, something is very wrong(tm): the thread does not exist,
  // but we found it earlier in the threadproctable (otherwise would be
  // proc=0, early exit. Complain (in debug mode)!
  ASSERT(rtsFalse);
  return NULL;
}

// remove a thread from a process (does NOT terminate it!)
void removeTSO(StgWord id) {
  ProcessData *p;
  StgWord proc;

  IF_PAR_DEBUG(procs,
	       debugBelch("remove thread %d from its process\n",
			  (int) id));
  proc = (StgWord) lookupHashTable(threadproctable, id);

  if (proc == (StgWord) NULL) {
    IF_PAR_DEBUG(procs,
		 debugBelch("removeTSO: no process registered\n"));
    return ; // virtually "removed", shrug (might be system thread, e.g. finalizer
  }
  p = findProcess(proc);
  if (p == NULL) {
    IF_PAR_DEBUG(procs,
		 debugBelch("removeTSO: nonexistent process\n"));
    return ; // virtually "removed", shrug (might be system thread, e.g. finalizer
  } else {
    Port* registeredPort = NULL;
    
    ASSERT(p != NULL);
    ASSERT(p->tsos != 0); // we have a process with >= 1 thread
    
    removeHashTable(threadproctable, id, (void*) proc);
    
    // remove (+ deallocate) potential entry in threadrecvtable
    registeredPort = lookupHashTable(threadrecvtable, id);
    if (registeredPort != NULL) {
      removeHashTable(threadrecvtable, id, registeredPort);
      stgFree(registeredPort);
    }
    
    // edentrace: emit killthread event (now in schedule.c)

    p->tsos -= 1;
    
    // if last tso: kill the whole process!
    if (p->tsos == 0) {
      killProcess_(p);
    }
  }
}

/* mark all threads reachable from the process table, unless they are
 * killed or completed.
 *
 * These threads are additional roots, since they produce output for
 * other processes. Not found if they are blocked on blackholes with
 * inport! OTOH, if we evacuated all blackholes with inport, we do not
 * find out whether an input is actually needed.
 * => evacuate all "live" threads  
 *
 * Our logic of evacuation is reverse to that of the sequential system:
 *   Sequential:
 *   evacuate heads of runnable + ccalling + sleeping + "blocked" queue
 *            => scavenge them (evacuate stack + link field = next in q)
 *            => evacuate some blackholes
 *            => evacuate TSOs blocked on *these* blackholes
 *                       (found on the blackhole_queue)
 *            => scavenge... (see above)
 *      non-evacuated threads (killed/complete/blocked-on-dead-BH) are
 *      evacuated later, to be "resurrected", so the scheduler cleanly
 *      removes them (!!)
 *
 * Parallel:
 *           evacuate *all* tsos *except* Thread[Complete|Killed]
 *             + if BlockedOnBlackHole: evacuate block_info (the blackhole)
 *            => scavenge these (stack + (unnecessarily:) link)
 *            => ...(see above)
 * 
 * The evacuation described is now done inside MarkWeak.c, where the
 * blackhole queue is traversed. Code here removed.
 */



// helper: traverse inport list, update closure field or send
// PP_TERMINATE to sender. Called after Garbage Collection

void updateInports(ProcessData *p) {
  Inport *inp, *temp;
  Inport **last;

  rtsPackBuffer termMsgBuffer;

  ASSERT( p != NULL);

  inp = p->inports;

  // could be the process does not have any inports...
  if (inp == NULL) 
    return;

  // initialise
  termMsgBuffer.size=0;
  termMsgBuffer.id=0;
  termMsgBuffer.sender.machine=thisPE;
  termMsgBuffer.sender.process=p->id;

  last = &(p->inports);
  
  while(inp != NULL) {
    ASSERT(*last == inp);

    temp = inp;       // only work on temp, inp set to next
    inp = inp->next;

    IF_PAR_DEBUG(ports,
		 debugBelch("updating inport %d\n", (nat)temp->id));

    temp->closure = isAlive(temp->closure); // updated to new BH
    if (temp->closure == NULL) { // a garbage inport, remove it

      IF_PAR_DEBUG(ports,
		   debugBelch("\t ...inport is garbage, removing it\n"));
 
      if (!isNoPort(temp->sender)) { // known sender?
	IF_PAR_DEBUG(ports,
		     debugBelch("sending terminate to sender (%d,%ld,%ld)\n",
				temp->sender.machine, 
				temp->sender.process,
				temp->sender.id));
	termMsgBuffer.sender.id = temp->id;
	termMsgBuffer.receiver  = temp->sender;
	sendMsg(PP_TERMINATE, &termMsgBuffer);
	// TODO handle send failure (buffer messages)
      }
      *last = inp; // skip temp reference (inp already set to next)
      stgFree(temp); // and remove the inport
    } else {
      // inport is alive, BH field already updated, nothing more to do
      last = &(temp->next);
    }
  }
}

void updateRTT(void) {
  ProcessData *p;

  IF_PAR_DEBUG(procs,
	       debugBelch("updateRTTable: processtable %p\n",
			  processtable));
  if (processtable != NULL) {
    p = processtable; 
    while (p != NULL) {
      IF_PAR_DEBUG(procs,
		   debugBelch("updating process %d (table @ %p)\n",
			      (int) p->id, p));
//      if (!updateTSOList(p)) {
	  if (p->tsos == 0) {
		StgWord killID;
		killID = p->id;
		killProcess_(p); // invalidates p, updates processtable! 
		// advance to ID after killID in processtable
		p = processtable;
		while (p != NULL && p->id < killID) p = p->next;
      } else {
		updateInports(p);
		p = p->next;
      }
    }
    IF_PAR_DEBUG(procs,
				 debugBelch("UpdateRTTable done\n"));
  } else {
    IF_PAR_DEBUG(procs,
				 debugBelch("No Processes, table is NULL!\n"));
  }
}

// EXTERNAL INTERFACE:
// using the Port type instead:
Inport* findInportByP(Port p) { 
  return findInport(p.process,p.id);
}
void connectInportByP(Port p, Port sender) {
  // called from HLComms and Schedule.c
  connectInport(p.process, p.id, sender);
}
void removeInportByP(Port p) { 
  removeInport(p.process, p.id); 
}
// Port for outport actions contains tso->id as id
StgTSO* findTSOByP(Port p) { 
  return findTSO(p.process, p.id); 
}

StgWord MyProcess(StgTSO* tso) {
  return (StgWord) lookupHashTable(threadproctable, tso->id);
}

Port* MyReceiver(StgTSO* tso) {
  Port* receiver;
  receiver = (Port*) lookupHashTable(threadrecvtable, tso->id);
  if (receiver == NULL) {
    return &NoPort; // or just null?
  } else {
    return receiver;
  }
}

#endif // PARALLEL_HASKELL, whole file

 
