package testprocs;

import simulator.*;
import scripting.*;
import modules.color.*;
import modules.hierarchy.*;
import modules.neighborset.*;
import modules.gradient.*;
import modules.node.*;
import utils.*;
import java.awt.*;
import java.util.*;
import java.io.*;

public class PNAtomExp implements ProcessorModule {
  static final String script = "modules.color.ColorModule\nmodules.gradient.ActiveGradient\nmodules.neighborset.NeighborSet\nmodules.broadcast.Broadcast\nmodules.region.RegionManager\nmodules.node.PersistentNodeB\ntestprocs.PNAtomExp";

  
  public static double killrate = 0.01; // prob. of killing particle each round
  public static int maxrunlength = 1000; // length to run sim
  public static int nodesize = 4; // size of the test node
  public static double oprate = 0.001; // frequency of operation initiation
  public static double readbias = 0.5; // probability op is a read
  public static String filestem = "pnatomtest";

  public static void main (String argv[]) {

    for(int i=0;i<argv.length;i++) {
      if(argv[i].equals("-k")) 
        { i++; killrate=(new Double(argv[i])).doubleValue(); }
      else if(argv[i].equals("-l")) 
        { i++; maxrunlength=(new Integer(argv[i])).intValue(); }
      else if(argv[i].equals("-n")) 
        { i++; nodesize=(new Integer(argv[i])).intValue(); }
      else if(argv[i].equals("-r")) 
        { i++; oprate=(new Double(argv[i])).doubleValue(); }
      else if(argv[i].equals("-b")) 
        { i++; readbias=(new Double(argv[i])).doubleValue(); }
      else if(argv[i].equals("-f"))
        { i++; filestem=argv[i]; }
    }

    try {
      out = new PrintWriter(new FileWriter(filestem+".out"),true);
      out.println("Atomicity log for PN");
      outdb = new PrintWriter(new FileWriter(filestem+".db"),true);
    } catch(Exception e) {
      System.out.println("failure: "+e);
    }

    out.println("Parameters:");
    out.println("Kill Rate = "+killrate);
    out.println("Run Length = "+maxrunlength);
    out.println("Node Size = "+nodesize);
    out.println("Op Rate = "+oprate);
    out.println("Read Bias = "+readbias);
    
    CommunicationsModel comm = new PerfectCommunicationsModel();
    ProcessorFactory pf = new ScriptProcFactory(script);
    sim = new Simulator(comm,pf,2000,0.04);

    sim.addObserver(new PNAtomExpCritic(sim));
        
    // and... run!
    sim.setPaused(false);
    startTime=System.currentTimeMillis();
    sim.run();

    System.out.println("All Systems Go");
  }
  static Simulator sim;

  // this attempts to produce an atomic trace of operations, then ends
  static long startTime=0;
  public static void endexperiment() {
    sim.setPaused(true);

    // add in some notes on event-processing rates:
    int n_evt = sim.numEvents();
    double t_elapsed = System.currentTimeMillis()-startTime;
    System.out.println
      ("Simulation processed "+n_evt+
       " events over the course of "+t_elapsed/1000+
       " seconds at a rate of "+(1000.*n_evt/t_elapsed)+" events per second.");

    System.out.print("\nAnalyzing "+opList.size()+" operations");

    boolean good=true;
    // walk the list, attempting to assign serialized times
    Iterator i=opList.keySet().iterator();
    double timeFinger = 0; double epsilon=1e-8;
    int count=0;
    while(i.hasNext()) {
      count++; if(count%100==1) System.out.print(".");
      //System.out.println("a");
      // weed out failed reads and writes
      // the serialization must be between start and end times
      Object key = i.next(); 
      Operation op = (Operation)opList.get(key);
      if(op==null) System.out.println(key+" "+opList);
      boolean usable=op.successful;
      if(!usable && !op.isRead && i.hasNext()) { //write maybe usable anyway...
        //System.out.println("Dead Write: "+op.startTime);
        SortedMap tail = opList.tailMap(key);
        Iterator j = tail.keySet().iterator(); j.next();
        Tuple nextkey = (Tuple)j.next(); // get key following this one
        if(nextkey.first().equals(((Tuple)key).first())) usable=true;
      }
      if(usable) {
        op.serializedTime = Math.max(op.startTime,timeFinger+epsilon);
        timeFinger=op.serializedTime;
        sortList.put(new Double(op.serializedTime),op);
      }
    }
    // certify that all reads return the preceding write & all serialization 
    // is between start and end
    i=sortList.keySet().iterator();
    Object value=null;
    while(i.hasNext()) {
      Double stime = (Double)i.next();
      Operation op = (Operation)sortList.get(stime);
      if(op.isRead) {
        if(!((value==null && op.value==null) || value.equals(op.value))) {
          good=false;
          out.println("FAIL: Read/Write mismatch at "+op.serializedTime);
        }
      } else {
        value=op.value;
      }
      if(op.serializedTime<op.startTime || op.serializedTime<op.startTime) {
        good=false;
        out.println("FAIL: Out of bounds at "+op.serializedTime);
      }
    }

    out.println("\n\n");
    out.println("Number of operations: "+opList.size());
    // announce whether it succeeded
    if(good) out.println("SUCCESS: Sequence is atomic.");
    else out.println("FAILURE: Sequence is not atomic.");

    // print the serialized event list
    out.println("Serialized\tOpType\tValue\tStart\tEnd");
    i=sortList.keySet().iterator();
    while(i.hasNext()) {
      Double stime = (Double)i.next();
      Operation op = (Operation)sortList.get(stime);
      String ops="Write"; if(op.isRead) ops="Read";
      out.println(stime+"\t"+ops+"\t"+op.value+"\t"+op.startTime+
                  "\t"+op.endTime);
    }
    

    // last of all, print the operations database for post-run analysis
    outdb.println("RW, start, end, serial, tag_v, tag_q, value, success");
    i=opList.keySet().iterator();
    while(i.hasNext()) {
      Object key = i.next(); 
      Operation op = (Operation)opList.get(key);
      String s = "";
      if(op.isRead) s+="1, "; else s+="0, ";
      s += op.startTime+", "+op.endTime+", "+op.serializedTime+", ";
      if(op.versioning!=null)
        s += op.versioning.getVersion()+", "+op.versioning.getQuality()+", ";
      else s+=", , ";
      s += op.value+", ";
      if(op.successful) s+="1"; else s+="0";
      outdb.println(s);
    }

    System.exit(0);
  }
  static TreeMap opList = new TreeMap();
  static TreeMap sortList = new TreeMap();

  // records for the operations carried out by the PNs
  class Operation {
    boolean isRead;
    double startTime;
    double endTime;
    double serializedTime=Double.NaN;
    VersionedData versioning=null;
    Object value=null;
    boolean successful=false;
    Operation(boolean r,double s,double e) { isRead=r;startTime=s;endTime=e; }
    public String toString() {
      String vname="null"; if(value!=null) vname=value.toString();
      return "("+isRead+", "+startTime+", "+endTime+", "+serializedTime+", "+
        versioning+", "+vname+", "+successful+")";
    }
  }

  public Symbol getName() { return Symbol.GetSymbol("PNAtomExp"); }
  public void link(PortedProcessor p) {
    pp = p;
    pp.addClockListener(this);
    pn = (PersistentNodeB)p.getModule("PersistentNode");
    ag = (ActiveGradient)p.getModule("ActiveGradient");
  }

  PortedProcessor pp; PersistentNodeB pn; ActiveGradient ag;
  static Symbol nid = Symbol.GetSymbol("test");

  static double initiator = Double.NaN;
  // create the node
  public void init() {
    //System.out.print("i");
    if(Double.isNaN(initiator)) {
      initiator=pp.getUID(); System.out.println("Starting...");
      out.println("Time\tUID\tEvent");
      pn.initiateNode(nid,nodesize,3.0);
    }
  }
  static PrintWriter out;
  static PrintWriter outdb;

  // tasks for this process:
  // 1. occasionally initiate operations
  // 2. if an op is completed, log the result
  Operation curOp = null; boolean dead=false;
  Counter startup = new Counter(5);
  static int topvers = 0;
  synchronized public boolean signalEvent(Symbol name,Object data) {
    if(dead) return false;
    //System.out.print("u");
    boolean dirty = false;
    if(name==PortedProcessor.kClock) {
      int t = ((Integer)data).intValue();
      startup.tick(); // countdown to viability... this gives time for spread

      VersionedGradient g = (VersionedGradient)ag.get(nid);
      if(g!=null && g.getVersion()>topvers) {
        topvers=g.getVersion();
        out.println(sim.getTime()+"\tNode Moved\t"+topvers);
      }

      if(startup.expired()) {
        if(curOp==null) {
          if(pn.inNode(nid) && Math.random()<oprate) {
            boolean read = (Math.random()<readbias);
            curOp = new Operation(read,sim.getTime(),t+3*nodesize);
            if(read) {
              curOp.value=pn.getData(nid);
              curOp.versioning=pn.debugit2(nid,true);
              out.println(sim.getTime()+"\t"+pp.getUID()+"\t start read");
              //if(curOp.value==null) System.out.println("Null read: "+curOp+"\n"+pp);
            } else { 
              curOp.value=new Double(Math.random()); 
              pn.setData(nid,curOp.value); 
              curOp.versioning=pn.debugit2(nid,false);
              out.println(sim.getTime()+"\t"+pp.getUID()+"\t start write "+
                          curOp.value);
            }
          }
        } else if(t>=curOp.endTime) {
          curOp.successful=true;
          curOp.endTime = sim.getTime();
          if(curOp.isRead) {
            out.println(sim.getTime()+"\t"+pp.getUID()+"\t end read "+
                        curOp.value);
          } else {
            out.println(sim.getTime()+"\t"+pp.getUID()+"\t end write");
          }
          opList.put(poRank(curOp),curOp);
          curOp=null;
        }
      }
    }
    return dirty;
  }
  Tuple poRank(Operation op) {
    return new Tuple(op.versioning,new Double(op.startTime));
  }


  // when a proc is killed, this is notified to let it terminate its op
  synchronized public void kill() {
    dead=true;
    if(curOp!=null) { 
      curOp.successful=false;
      curOp.endTime = sim.getTime();
      if(curOp.isRead) {
        out.println(sim.getTime()+"\t"+pp.getUID()+"\t kill read");
      } else {
        out.println(sim.getTime()+"\t"+pp.getUID()+"\t kill write");
      }
      opList.put(poRank(curOp),curOp);
    }
  }
}

// this has two tasks:
// 1. reboot particles which are not in the middle of an operation
// 2. count number of reboots performed
// 3. terminate the sim when it's run long enough
class PNAtomExpCritic implements Observer {

  int kills = 0;
  Simulator sim;

  public PNAtomExpCritic(Simulator sim) {
    this.sim = sim;
  }

  // the standard update
  static Random rnd = new Random();
  double lastT=-100;
  public void update(Observable o, Object arg) {
    double t = sim.getTime();

    if(t-lastT>1) { lastT=t; System.out.print("."); }
    if(t>=PNAtomExp.maxrunlength) {
      PNAtomExp.endexperiment(); // terminate sim
    } else { // kill, as needed
      int killtarget = (int)(t*2000*PNAtomExp.killrate);
      while(kills<killtarget) {
        //System.out.print("k");
        kills++;
        Vector procs = new Vector(sim.getNet().getNodeSet());
        if(procs.size()!=2000) 
          System.out.println("Bad processor list size: "+procs.size());
        PortedProcessor p = (PortedProcessor)procs.get(rnd.nextInt(2000));
        PNAtomExp ae = (PNAtomExp)p.getModule("PNAtomExp");
        ae.kill(); // notify experiment module that it's being killed
        sim.resetProcessor(p);
      }
    }
    
  }
}


