diff --git a/257844/src/main/java/cs451/Main.java b/257844/src/main/java/cs451/Main.java index b94aa11..5324f85 100644 --- a/257844/src/main/java/cs451/Main.java +++ b/257844/src/main/java/cs451/Main.java @@ -8,6 +8,7 @@ import cs451.parser.Host; import cs451.parser.Parser; import cs451.tools.Logger; import cs451.tools.Pair; +import cs451.tools.ParamDetector; import java.io.FileWriter; import java.io.IOException; @@ -47,11 +48,11 @@ public class Main { //immediately stop network packet processing System.out.println("Immediately stopping network packet processing."); + + System.out.println("Stopping NetManager"); NetManager.stop(); - //write/flush output file if necessary System.out.println("Writing output."); - try { writeOutput(parser.output()); } catch (IOException e) { @@ -98,6 +99,7 @@ public class Main { } Coordinator coordinator = new Coordinator(parser.myId(), parser.barrierIp(), parser.barrierPort(), parser.signalIp(), parser.signalPort()); + ParamDetector.detectAndSet(me, parser); NetManager.start(me,parser,(t,ne) -> { Logger.error(t+"(" + ne.message.src + "):" + ne.message.toString()); if(t == NetEventType.DLVR){ @@ -119,18 +121,9 @@ public class Main { Thread.sleep(500); } - System.out.println("Stopping NetManager"); - NetManager.stop(); - System.out.println("Signaling end of broadcasting messages"); coordinator.finishedBroadcasting(); - try { - writeOutput(parser.output()); - } catch (IOException e) { - e.printStackTrace(); - } - while (true) { // Sleep for 1 hour Thread.sleep(60 * 60 * 1000); diff --git a/257844/src/main/java/cs451/net/NetManager.java b/257844/src/main/java/cs451/net/NetManager.java index 4ee7b28..d9090cc 100644 --- a/257844/src/main/java/cs451/net/NetManager.java +++ b/257844/src/main/java/cs451/net/NetManager.java @@ -15,19 +15,33 @@ import java.util.function.BiConsumer; public abstract class NetManager { - private static final Integer THREAD_COUNT = 8; + + public static int INTERNAL_WAIT; + + public static int SL_WAIT; + public static int SL_MAX_TRIES; + public static int FD_WAIT; + public static int FD_MAX_TRIES; + + public static Integer THREAD_COUNT; + public static Integer THREAD_BOOST_COUNT; + + public static Integer WINDOW_WIDTH; + private static boolean isStopped = false; - private static final Map, NetEventHandler> nm_listeners = new HashMap<>(); + private static final Map, NetEventHandlerAbstract> nm_listeners = new HashMap<>(); - private static final ThreadPoolExecutor ex = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT,60,TimeUnit.SECONDS,new LinkedBlockingQueue<>()); + private static ThreadPoolExecutor ex; + private static final ScheduledExecutorService ftex = Executors.newSingleThreadScheduledExecutor(); + private static final ScheduledExecutorService stex = Executors.newSingleThreadScheduledExecutor(); private static BiConsumer onCompleteHandler; private static BiConsumer onErrorHandler; - private static void registerNetHandler(NetEventHandler handler){ + private static void registerNetHandler(NetEventHandlerAbstract handler){ nm_listeners.put(handler.getClass(),handler); } @@ -35,8 +49,9 @@ public abstract class NetManager { onCompleteHandler = och; onErrorHandler = oeh; - registerNetHandler(new NetHandlerSCKT( NetHandlerSL.class, NetHandlerDFLT.class)); - registerNetHandler(new NetHandlerSL( NetHandlerPL.class, NetHandlerSCKT.class)); + registerNetHandler(new NetHandlerSCKT( NetHandlerFD.class, NetHandlerDFLT.class)); + registerNetHandler(new NetHandlerFD( NetHandlerSL.class, NetHandlerSCKT.class)); + registerNetHandler(new NetHandlerSL( NetHandlerPL.class, NetHandlerFD.class)); registerNetHandler(new NetHandlerPL( NetHandlerBEB.class, NetHandlerSL.class)); registerNetHandler(new NetHandlerBEB( NetHandlerURB.class, NetHandlerPL.class)); registerNetHandler(new NetHandlerURB( NetHandlerFIFO.class, NetHandlerBEB.class)); @@ -45,25 +60,36 @@ public abstract class NetManager { registerNetHandler(new NetHandlerDFLT( NetHandlerDFLT.class, NetHandlerDFLT.class)); nm_listeners.values().forEach(neh -> neh.start(h,p)); + ex = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT+THREAD_BOOST_COUNT,30,TimeUnit.SECONDS,new LinkedBlockingQueue<>()); ex.prestartAllCoreThreads(); + + ftex.scheduleAtFixedRate(() -> nm_listeners.values().parallelStream().forEach(neh->{ + neh.deliverIf(); + neh.sendIf(); + }), 0, INTERNAL_WAIT, TimeUnit.MILLISECONDS); + + stex.scheduleAtFixedRate(()-> nm_listeners.values().forEach(NetEventHandlerInterface::beat + ), 0, FD_WAIT, TimeUnit.MILLISECONDS); } public static void stop() { isStopped = true; - nm_listeners.values().forEach(NetEventHandler::stop); + nm_listeners.values().forEach(NetEventHandlerAbstract::stop); ex.shutdown(); + ftex.shutdown(); + stex.shutdown(); System.out.println("NetManager handled "+ex.getCompletedTaskCount()+" tasks during this run."); } public static boolean isDone() { - return isStopped || nm_listeners.values().stream().map(NetEventHandler::isDone).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2); + return isStopped || nm_listeners.values().stream().map(NetEventHandlerAbstract::isDone).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2); } //================================================================================================================= //================================================================================================================= - public static void handle(Class layer, NetEventType event, NetEvent ne){ + public static void handle(Class layer, NetEventType event, NetEvent ne){ try { ex.getQueue().add(new NetEventRunner(ne, layer, event)); }catch(IllegalStateException ise){ @@ -82,28 +108,30 @@ public abstract class NetManager { //================================================================================================================= //================================================================================================================= - public static void deliver(Class layer, NetEvent ne) { + public static void deliver(Class layer, NetEvent ne) { handle(layer,NetEventType.DLVR,ne); } - - public static void send(Class layer, NetEvent ne) { + public static void send(Class layer, NetEvent ne) { handle(layer,NetEventType.SEND,ne); } - - public static void crash(Class layer, NetEvent ne) { + public static void crash(Class layer, NetEvent ne) { handle(layer,NetEventType.CRSH,ne); } - public static void send() { - new NetEventRunner(NetEvent.EMPTY(),NetHandlerTOPL.class,NetEventType.SEND).run(); - } - - public static void deliverSync(Class layer, NetEvent ne) { + public static void deliverSync(Class layer, NetEvent ne) { new NetEventRunner(ne,layer,NetEventType.DLVR).run(); } - public static void sendSync(Class layer, NetEvent ne) { + public static void sendSync(Class layer, NetEvent ne) { new NetEventRunner(ne,layer,NetEventType.SEND).run(); } + public static void crashSync(Class layer, NetEvent ne) { + new NetEventRunner(ne,layer,NetEventType.CRSH).run(); + } + + public static void send() { + new NetEventRunner(NetEvent.EMPTY(), NetHandlerTOPL.class,NetEventType.SEND).run(); + } + //================================================================================================================= //================================================================================================================= @@ -111,10 +139,10 @@ public abstract class NetManager { private static class NetEventRunner implements Runnable { private final NetEvent ne; - private final Class lt; + private final Class lt; private final NetEventType net; - private NetEventRunner(NetEvent ne, Class lt, NetEventType net) { + private NetEventRunner(NetEvent ne, Class lt, NetEventType net) { this.ne = ne; this.lt = lt; this.net = net; @@ -124,7 +152,7 @@ public abstract class NetManager { public void run() { Logger.debug(this.lt.getSimpleName()+"_"+this.net+ ": " + ((ne.peer==null)?"?":ne.peer.getId()) + " - "+ne.message.toString()+""); - NetEventHandler nl = nm_listeners.getOrDefault(this.lt, null); + NetEventHandlerAbstract nl = nm_listeners.getOrDefault(this.lt, null); if (nl != null) { nl.onEvent(this.net,ne); }else{ diff --git a/257844/src/main/java/cs451/net/event/Message.java b/257844/src/main/java/cs451/net/event/Message.java index 2f66ebb..2213253 100644 --- a/257844/src/main/java/cs451/net/event/Message.java +++ b/257844/src/main/java/cs451/net/event/Message.java @@ -4,14 +4,15 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Objects; -public class Message implements Comparable { +public class Message implements Comparable { public enum TYPE { NONE("NONE", 'N'), ERR("ERR", 'E'), + BCST("BCST", 'B'), DATA("DATA", ' '), ACK("ACK", 'A'), - BCST("BCST", 'B'); + HRTB("HRTB",'H'); public final Character c; public final String tag; @@ -22,8 +23,8 @@ public class Message implements Comparable { } - public int src = -1; - public final int id; + public Integer src = -1; + public final Integer id; public TYPE tpe; public static Message DMSG(Integer id){ @@ -33,6 +34,10 @@ public class Message implements Comparable { return new Message(id,TYPE.DATA,src); } + public static Message HRTB(){ + return new Message(0,TYPE.HRTB); + } + public static Message TMSG(Integer mess_id, TYPE tpe) { return new Message(mess_id, tpe); } @@ -94,7 +99,7 @@ public class Message implements Comparable { return false; } final Message m = (Message) obj; - return id == m.id && src == m.src; + return id.equals(m.id) && src.equals(m.src); } @Override @@ -103,8 +108,7 @@ public class Message implements Comparable { } @Override - public int compareTo(Object obj) { - final Message m = (Message) obj; - return id-m.id; + public int compareTo(Message m) { + return id.compareTo(m.id); } } diff --git a/257844/src/main/java/cs451/net/event/NetEvent.java b/257844/src/main/java/cs451/net/event/NetEvent.java index 5c90aae..2f1e301 100644 --- a/257844/src/main/java/cs451/net/event/NetEvent.java +++ b/257844/src/main/java/cs451/net/event/NetEvent.java @@ -43,6 +43,10 @@ public class NetEvent { return ne; } + public static NetEvent MessageHRTB(Host peer){ + return NetEvent.Message(peer,Message.HRTB()); + } + public static NetEvent EMPTY(){ return new NetEvent(NO_PEER, Message.EMPTY()); diff --git a/257844/src/main/java/cs451/net/handler/NetEventHandler.java b/257844/src/main/java/cs451/net/handler/NetEventHandler.java deleted file mode 100644 index d37b5e1..0000000 --- a/257844/src/main/java/cs451/net/handler/NetEventHandler.java +++ /dev/null @@ -1,84 +0,0 @@ -package cs451.net.handler; - -import cs451.net.NetManager; -import cs451.net.event.NetEvent; -import cs451.net.event.NetEventType; -import cs451.parser.Host; -import cs451.parser.Parser; -import cs451.tools.Logger; - -public abstract class NetEventHandler { - - /* - * 1 - 50ms - 50ms - * 2 - 400ms - 450ms - * 3 - 1350ms - 1800ms - * 4 - 3200ms - 5000ms - */ - public static final int MS_WAIT = 50; - public static final int MAX_TRIES = 5; - - public static final Integer SEND_WINDOW = 3; - - - - private final Class deliverLayer; - private final Class broadcastLayer; - - - NetEventHandler(Class deliverLayer, Class broadcastLayer) { - this.deliverLayer = deliverLayer; - this.broadcastLayer = broadcastLayer; - } - - public void deliverNextSync(NetEvent ne){ - NetManager.deliverSync(deliverLayer,ne); - } - public void sendNextSync(NetEvent ne){ - NetManager.sendSync(broadcastLayer,ne); - } - - public void deliverNext(NetEvent ne){ - NetManager.deliver(deliverLayer,ne); - } - public void sendNext(NetEvent ne){ - NetManager.send(broadcastLayer,ne); - } - - public void onEvent(NetEventType et, NetEvent ne){ - switch (et){ - case DLVR: deliver(ne); - break; - case SEND: send(ne); - break; - case CRSH: crash(ne); - break; - case RCVR: recover(ne); - break; - default: - Logger.error("Unhandled EventType:"+et); - } - } - - public void send(NetEvent ne){ - NetManager.error(NetEventType.SEND,new Exception("Default Handler")); - } - - public void deliver(NetEvent ne){ - NetManager.error(NetEventType.DLVR,new Exception("Default Handler")); - } - - public void crash(NetEvent ne){ - NetManager.error(NetEventType.CRSH,new Exception("Default Handler")); - } - - public void recover(NetEvent ne){ - NetManager.error(NetEventType.RCVR,new Exception("Default Handler")); - } - - public void start(Host h, Parser p) {} - - public void stop() {} - - public boolean isDone(){ return true;} -} diff --git a/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java b/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java new file mode 100644 index 0000000..4b93f94 --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java @@ -0,0 +1,56 @@ +package cs451.net.handler; + +import cs451.net.NetManager; +import cs451.net.event.NetEvent; +import cs451.parser.Host; +import cs451.parser.Parser; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +public abstract class NetEventHandlerAbstract implements NetEventHandlerInterface { + + private final Class deliverLayer; + private final Class broadcastLayer; + + public final AtomicBoolean active = new AtomicBoolean(true); + + + NetEventHandlerAbstract(Class deliverLayer, Class broadcastLayer) { + this.deliverLayer = deliverLayer; + this.broadcastLayer = broadcastLayer; + } + + + + public void deliverNextSync(NetEvent ne){ + NetManager.deliverSync(deliverLayer,ne); + } + public void sendNextSync(NetEvent ne){ + NetManager.sendSync(broadcastLayer,ne); + } + public void crashNextSync(NetEvent ne){ + NetManager.crashSync(deliverLayer,ne); + } + + public void deliverNextAsync(NetEvent ne){ + NetManager.deliver(deliverLayer,ne); + } + public void sendNextAsync(NetEvent ne){ + NetManager.send(broadcastLayer,ne); + } + public void crashNextAsync(NetEvent ne) { + NetManager.crash(deliverLayer,ne); + } + + public void start(Host h, Parser p) { + Objects.requireNonNull(h); + Objects.requireNonNull(p); + active.set(true); + } + + public void stop() { + active.set(false); + } + +} diff --git a/257844/src/main/java/cs451/net/handler/NetEventHandlerInterface.java b/257844/src/main/java/cs451/net/handler/NetEventHandlerInterface.java new file mode 100644 index 0000000..db91331 --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetEventHandlerInterface.java @@ -0,0 +1,52 @@ +package cs451.net.handler; + +import cs451.net.event.NetEvent; +import cs451.net.event.NetEventType; +import cs451.parser.Host; +import cs451.parser.Parser; +import cs451.tools.Logger; + +public interface NetEventHandlerInterface { + + + default void sendIf(){} + default void deliverIf(){} + default void beat(){} + + void deliverNextSync(NetEvent ne); + void sendNextSync(NetEvent ne); + void crashNextSync(NetEvent ne); + + void deliverNextAsync(NetEvent ne); + void sendNextAsync(NetEvent ne); + void crashNextAsync(NetEvent ne); + + default void onEvent(NetEventType et, NetEvent ne){ + switch (et){ + case DLVR: deliver(ne); + break; + case SEND: send(ne); + break; + case CRSH: crash(ne); + break; + case RCVR: + default: + Logger.error("Unhandled EventType:"+et); + } + } + + default void send(NetEvent ne){ + sendNextSync(ne); + } + default void deliver(NetEvent ne){ + deliverNextSync(ne); + } + default void crash(NetEvent ne){ + crashNextSync(ne); + } + + void start(Host h, Parser p); + void stop(); + + default boolean isDone(){ return true;} +} diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java b/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java index 1016704..f743a72 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java @@ -6,33 +6,29 @@ import cs451.parser.Parser; import java.util.List; -public class NetHandlerBEB extends NetEventHandler { +public class NetHandlerBEB extends NetEventHandlerAbstract { private List hosts; private Host h; - public NetHandlerBEB(Class deliverLayer, Class broadcastLayer) { + public NetHandlerBEB(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); } - @Override public void send(NetEvent ne) { - for(Host h : hosts){ + hosts.parallelStream().forEach(h ->{ if(this.h.getId() != h.getId()) { - sendNext(NetEvent.Message(h, ne.message)); + sendNextAsync(NetEvent.Message(h, ne.message)); }else{ - deliverNext(NetEvent.Message(h,ne.message)); + deliverNextAsync(NetEvent.Message(h,ne.message)); } - } + }); } @Override - public void deliver(NetEvent ne) { - deliverNext(ne); - } - public void start(Host h, Parser p) { + super.start(h,p); this.h = h; hosts = p.hosts(); } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java b/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java index 385b516..303c3ae 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java @@ -1,8 +1,28 @@ package cs451.net.handler; -public class NetHandlerDFLT extends NetEventHandler { +import cs451.net.NetManager; +import cs451.net.event.NetEvent; +import cs451.net.event.NetEventType; - public NetHandlerDFLT(Class deliverLayer, Class broadcastLayer) { +public class NetHandlerDFLT extends NetEventHandlerAbstract { + + public NetHandlerDFLT(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); } + + @Override + public void send(NetEvent ne){ + NetManager.error(NetEventType.SEND,new Exception("Default Handler")); + } + + @Override + public void deliver(NetEvent ne){ + NetManager.error(NetEventType.DLVR,new Exception("Default Handler")); + } + + @Override + public void crash(NetEvent ne){ + NetManager.error(NetEventType.CRSH,new Exception("Default Handler")); + } + } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerFD.java b/257844/src/main/java/cs451/net/handler/NetHandlerFD.java new file mode 100644 index 0000000..e50909a --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetHandlerFD.java @@ -0,0 +1,56 @@ +package cs451.net.handler; + +import cs451.net.NetManager; +import cs451.net.event.Message; +import cs451.net.event.NetEvent; +import cs451.parser.Host; +import cs451.parser.Parser; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +public class NetHandlerFD extends NetEventHandlerAbstract { + + private Host me; + private List hosts; + private static final ConcurrentHashMap alive = new ConcurrentHashMap<>(); + + public NetHandlerFD(Class deliverLayer, Class broadcastLayer) { + super(deliverLayer, broadcastLayer); + } + + @Override + public void beat() { + hosts.forEach(h->{ + if(h.getId() != me.getId()){ + alive.computeIfPresent(h.getId(),(k, v) -> { + if(v == 0) return 1; + + if(v > NetManager.FD_MAX_TRIES) + crashNextAsync(NetEvent.Message(h, Message.EMPTY())); + else + sendNextSync(NetEvent.MessageHRTB(h)); + + return v+1; + }); + } + }); + } + + @Override + public void deliver(NetEvent ne) { + alive.computeIfPresent(ne.peer.getId(),(k,v)->0); + switch(ne.message.tpe){ + case HRTB: + break; + default: deliverNextSync(ne); + } + } + + @Override + public void start(Host h, Parser p) { + super.start(h, p); + hosts = p.hosts(); + me = h; + hosts.forEach(ch-> alive.put(ch.getId(),0)); + } +} diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java b/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java index 4364a1c..0bf53bd 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java @@ -12,7 +12,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; -public class NetHandlerFIFO extends NetEventHandler { +public class NetHandlerFIFO extends NetEventHandlerAbstract { private final AtomicInteger sn = new AtomicInteger(1); private final Map rsn = new ConcurrentHashMap<>(); @@ -20,7 +20,7 @@ public class NetHandlerFIFO extends NetEventHandler { private Host me; - public NetHandlerFIFO(Class deliverLayer, Class broadcastLayer) { + public NetHandlerFIFO(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); } @@ -28,22 +28,21 @@ public class NetHandlerFIFO extends NetEventHandler { public void send(NetEvent ne) { Integer snv = sn.getAndIncrement(); NetManager.complete(NetEventType.SEND, NetEvent.Message(me, snv, Message.TYPE.BCST)); - sendNext(NetEvent.Message(me.getId(),snv)); + sendNextAsync(NetEvent.Message(me.getId(),snv)); } - private void deliverIf(){ - synchronized (pending) { - pending.removeIf(hmp -> { - Integer crsn = rsn.getOrDefault(hmp.src, 1); - if (hmp.id == crsn) { - deliverNextSync(NetEvent.Message(hmp)); - NetManager.complete(NetEventType.DLVR,NetEvent.Message(hmp)); - rsn.put(hmp.src, crsn + 1); - return true; - } - return false; - }); - } + @Override + public void deliverIf(){ + pending.removeIf(hmp -> { + Integer crsn = rsn.getOrDefault(hmp.src, 1); + if (hmp.id.equals(crsn)) { + NetManager.complete(NetEventType.DLVR,NetEvent.Message(hmp)); + deliverNextSync(NetEvent.Message(hmp)); + rsn.put(hmp.src, crsn + 1); + return true; + } + return false; + }); } @Override @@ -54,6 +53,7 @@ public class NetHandlerFIFO extends NetEventHandler { @Override public void start(Host h, Parser p) { + super.start(h,p); me = h; } } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerPL.java b/257844/src/main/java/cs451/net/handler/NetHandlerPL.java index 88344a7..f049fe2 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerPL.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerPL.java @@ -7,23 +7,18 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -public class NetHandlerPL extends NetEventHandler { +public class NetHandlerPL extends NetEventHandlerAbstract { private final Set> delivered = ConcurrentHashMap.newKeySet(); - public NetHandlerPL(Class deliverLayer, Class broadcastLayer) { + public NetHandlerPL(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); } - @Override - public void send(NetEvent event) { - sendNext(event); - } - @Override public void deliver(NetEvent event) { if(delivered.add(new Pair<>(event.peer.getId(),event.message))){ - deliverNext(event); + deliverNextSync(event); } } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java b/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java index 698f421..2ac5b4c 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java @@ -10,21 +10,22 @@ import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; import java.util.List; -import java.util.Objects; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; -public class NetHandlerSCKT extends NetEventHandler { +public class NetHandlerSCKT extends NetEventHandlerAbstract { private static final Integer BUFF_SIZE = 128; + private static final ExecutorService tex = Executors.newSingleThreadExecutor(); + private List hosts; private DatagramSocket socket = null; - private final AtomicBoolean receiving = new AtomicBoolean(true); - public NetHandlerSCKT(Class deliverLayer, Class broadcastLayer) { + public NetHandlerSCKT(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); } @@ -57,7 +58,7 @@ public class NetHandlerSCKT extends NetEventHandler { socket.receive(datagram); Optional rhid = hosts.stream().filter(h -> h.getAddr().equals(datagram.getSocketAddress())).map(Host::getId).findFirst(); - deliverNext(NetEvent.Message( + deliverNextAsync(NetEvent.Message( new Host((InetSocketAddress) datagram.getSocketAddress(), rhid.orElse(-1)), Message.FromBuffer(ByteBuffer.wrap(datagram.getData())) )); @@ -69,8 +70,7 @@ public class NetHandlerSCKT extends NetEventHandler { @Override public void start(Host h, Parser p) { - Objects.requireNonNull(h); - Objects.requireNonNull(p); + super.start(h,p); hosts = p.hosts(); try { @@ -78,17 +78,18 @@ public class NetHandlerSCKT extends NetEventHandler { } catch (SocketException e) { throw new Error(e); } - receiving.set(true); - new Thread(() -> { - while (receiving.get()) { - this.deliver(NetEvent.EMPTY()); + + tex.execute(()->{ + while (active.get()) { + deliver(NetEvent.EMPTY()); } - }).start(); + }); } @Override public void stop() { + super.stop(); + tex.shutdown(); socket.close(); - receiving.set(false); } } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerSL.java b/257844/src/main/java/cs451/net/handler/NetHandlerSL.java index 8fa598d..586238a 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerSL.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerSL.java @@ -3,87 +3,89 @@ package cs451.net.handler; import cs451.net.event.Message; import cs451.net.NetManager; import cs451.net.event.*; +import cs451.parser.Host; +import cs451.parser.Parser; import cs451.tools.Pair; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; -public class NetHandlerSL extends NetEventHandler { - - private static final Set EMPTY_SET = ConcurrentHashMap.newKeySet(); +public class NetHandlerSL extends NetEventHandlerAbstract { + private List hosts; Set hasTimeout = ConcurrentHashMap.newKeySet(); - Map> hasAck = new ConcurrentHashMap<>(); - Map, Integer> retryCounter = new ConcurrentHashMap<>(); - public NetHandlerSL(Class deliverLayer, Class broadcastLayer) { + PriorityBlockingQueue,Message>> sending = new PriorityBlockingQueue<>(); + + public NetHandlerSL(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); } - - private void handleTimeout(NetEvent ne) { - if (hasAck.getOrDefault(ne.peer.getId(),EMPTY_SET).contains(ne.message)) { - return; - } - if (retryCounter.compute(new Pair<>(ne.peer.getId(),ne.message), (k, ov) -> (ov == null) ? 2 : ov + 1) >= MAX_TRIES) { - timeout(ne); - } else { - send(ne); - } - } - - private void checkTimeout(NetEvent ne) { - new Thread(()->{ - Integer t = retryCounter.getOrDefault(new Pair<>(ne.peer.getId(), ne.message), 1); - new Timer().schedule( - new TimerTask() { - @Override - public void run () { - handleTimeout(ne); - } - }, MS_WAIT*t*t*t); - }).start(); + @Override + public void sendIf(){ + sending.forEach(ppm -> { + ppm.first().second().getAndIncrement(); + Optional h = hosts.stream().filter(hl->hl.getId()==ppm.first().first()).findFirst(); + h.ifPresent(host -> sendNextAsync(NetEvent.Message(host, ppm.second()))); + }); + sending.removeIf(ppm -> { + if(hasTimeout.contains(ppm.first().first())){ + return true; + } +// if(ppm.first().second().get() > NetManager.SL_MAX_TRIES){ +// hasTimeout.add(ppm.first().first()); +// Optional h = hosts.stream().filter(hl->hl.getId()==ppm.first().first()).findFirst(); +// h.ifPresent(host -> crashNextAsync(NetEvent.Message(host, ppm.second()))); +// return true; +// } + return false; + }); } @Override public void send(NetEvent ne) { - if (hasTimeout.contains(ne.peer.getId())) { + if(hasTimeout.contains(ne.peer.getId())){ return; } - sendNext(ne); - checkTimeout(ne); + sending.add(new Pair<>(new Pair<>(ne.peer.getId(), new AtomicInteger(1)),ne.message)); + sendNextSync(ne); } @Override public void deliver(NetEvent ne) { - if (hasTimeout.contains(ne.peer.getId())) { + if(hasTimeout.contains(ne.peer.getId())){ return; } switch (ne.message.tpe){ case ACK: - hasAck.compute(ne.peer.getId(),(k, s) -> { - if(s == null) s = ConcurrentHashMap.newKeySet(); - s.add(ne.message); - return s; - }); + sending.removeIf(ppm -> (ppm.first().first()==ne.peer.getId() && + ppm.second().equals(ne.message))); break; case DATA: - sendNext(NetEvent.MessageACK(ne.peer,ne.message)); - deliverNext(ne); + sendNextSync(NetEvent.MessageACK(ne.peer,ne.message)); + deliverNextSync(ne); break; default: break; } } - private void timeout(NetEvent event) { - if(hasTimeout.add(event.peer.getId())) { - NetManager.crash(NetHandlerURB.class, event); - } + @Override + public void crash(NetEvent ne) { + hasTimeout.add(ne.peer.getId()); + crashNextSync(ne); + } + + @Override + public boolean isDone() { + return sending.isEmpty(); + } + + @Override + public void start(Host h, Parser p) { + this.hosts = p.hosts(); } } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java b/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java index 68aaed4..d2c94ac 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java @@ -1,12 +1,13 @@ package cs451.net.handler; +import cs451.net.NetManager; import cs451.net.event.NetEvent; import cs451.parser.Host; import cs451.parser.Parser; import java.util.concurrent.atomic.AtomicInteger; -public class NetHandlerTOPL extends NetEventHandler { +public class NetHandlerTOPL extends NetEventHandlerAbstract { private final AtomicInteger toSend = new AtomicInteger(0); private final AtomicInteger delivered = new AtomicInteger(0); @@ -14,21 +15,20 @@ public class NetHandlerTOPL extends NetEventHandler { private Host me; - public NetHandlerTOPL(Class deliverLayer, Class broadcastLayer) { + public NetHandlerTOPL(Class deliverLayer, Class broadcastLayer) { super(deliverLayer, broadcastLayer); } - - private void sendIf(){ - synchronized (waiting) { - while (waiting.get() < SEND_WINDOW && toSend.get() > 0) { - toSend.getAndDecrement(); - waiting.getAndIncrement(); - sendNextSync(NetEvent.EMPTY()); - } + @Override + public void sendIf() { + while (toSend.get() > 0 && waiting.get() < NetManager.WINDOW_WIDTH) { + toSend.decrementAndGet(); + waiting.incrementAndGet(); + sendNextSync(NetEvent.EMPTY()); } } + @Override public void send(NetEvent ne) { sendIf(); @@ -37,21 +37,20 @@ public class NetHandlerTOPL extends NetEventHandler { @Override public void deliver(NetEvent ne) { if (ne.message.src == me.getId()) { - synchronized (waiting) { - delivered.getAndIncrement(); - waiting.getAndDecrement(); - } - sendIf(); + delivered.incrementAndGet(); + waiting.decrementAndGet(); } + sendIf(); } @Override public boolean isDone() { - return toSend.get()==0 && waiting.get()==0; + return (toSend.get()==0) && (waiting.get()==0); } @Override public void start(Host h, Parser p) { + super.start(h,p); me = h; toSend.set(p.messageCount()); } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerURB.java b/257844/src/main/java/cs451/net/handler/NetHandlerURB.java index fa876cc..15d41bb 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerURB.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerURB.java @@ -10,7 +10,7 @@ import java.util.Set; import java.util.concurrent.*; -public class NetHandlerURB extends NetEventHandler { +public class NetHandlerURB extends NetEventHandlerAbstract { private final Set correct = ConcurrentHashMap.newKeySet(); @@ -21,44 +21,54 @@ public class NetHandlerURB extends NetEventHandler { private Integer myId; - public NetHandlerURB(Class deliverLayer, Class broadcastLayer) { + public NetHandlerURB(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); } - private void deliverIf(){ + @Override + public void deliverIf(){ pending.stream().filter(smp -> ack.getOrDefault(smp,ConcurrentHashMap.newKeySet()).containsAll(correct)). filter(delivered::add). - forEach(smp -> deliverNext(NetEvent.Message(smp.first(),smp.second()))); + forEach(smp -> deliverNextAsync(NetEvent.Message(smp.first(),smp.second()))); } @Override public void send(NetEvent ne) { pending.add(new Pair<>(myId, ne.message.id)); ne.message.src = myId; - sendNext(ne); + sendNextSync(ne); } @Override public void deliver(NetEvent ne) { - ack.compute(new Pair<>(ne.message.src,ne.message.id),(k, v) -> { + Pair smp =new Pair<>(ne.message.src,ne.message.id); + ack.compute(smp,(k, v) -> { if(v == null){ v = ConcurrentHashMap.newKeySet(); } v.add(ne.peer.getId()); return v; }); - if(pending.add(new Pair<>(ne.message.src,ne.message.id))){ - sendNext(ne); + if(pending.add(smp)){ + sendNextSync(ne); } deliverIf(); } + @Override public void crash(NetEvent ne) { correct.remove(ne.peer.getId()); deliverIf(); } + @Override + public boolean isDone() { + return delivered.containsAll(pending); + } + + @Override public void start(Host h, Parser p) { + super.start(h,p); p.hosts().forEach(ch-> correct.add(ch.getId())); myId = h.getId(); } diff --git a/257844/src/main/java/cs451/tools/Pair.java b/257844/src/main/java/cs451/tools/Pair.java index 2f28ee1..2afd9be 100644 --- a/257844/src/main/java/cs451/tools/Pair.java +++ b/257844/src/main/java/cs451/tools/Pair.java @@ -3,24 +3,31 @@ package cs451.tools; import java.util.ArrayList; import java.util.List; -public class Pair implements Comparable { - private final S x; - private final T y; +public class Pair implements Comparable { + private final List vl = new ArrayList<>(); public Pair(S x, T y) { - this.x = x; - this.y = y; - vl.add(x.toString()); - vl.add(y.toString()); + vl.add(x); + vl.add(y); } + @SuppressWarnings("unchecked") public S first(){ - return this.x; + return (S) vl.get(0); } + @SuppressWarnings("unchecked") public T second(){ - return this.y; + return (T) vl.get(1); + } + + public void first(S v){ + vl.set(0,v); + } + + public void second(T v){ + vl.set(1,v); } @@ -35,6 +42,7 @@ public class Pair implements Comparable { } @Override + @SuppressWarnings("unchecked") public final boolean equals(final Object obj) { if (this == obj) { return true; @@ -45,14 +53,14 @@ public class Pair implements Comparable { if (getClass() != obj.getClass()) { return false; } - final Pair other = (Pair) obj; + final Pair other = (Pair) obj; return this.vl.equals(other.vl); } @Override + @SuppressWarnings("unchecked") public int compareTo(Object o) { - Pair co = (Pair)o; - - return ((int)co.second())- ((int)second()); + Pair co = (Pair)o; + return ((Comparable)second()).compareTo(co.second()); } } diff --git a/257844/src/main/java/cs451/tools/ParamDetector.java b/257844/src/main/java/cs451/tools/ParamDetector.java new file mode 100644 index 0000000..147d565 --- /dev/null +++ b/257844/src/main/java/cs451/tools/ParamDetector.java @@ -0,0 +1,44 @@ +package cs451.tools; + +import cs451.net.NetManager; +import cs451.parser.Host; +import cs451.parser.Parser; + +public abstract class ParamDetector { + + private static Integer bound(Integer value, Integer lower, Integer upper){ + return Math.min(Math.max(value,lower),upper); + } + + public static void detectAndSet(Host me, Parser p){ + + int cores = Runtime.getRuntime().availableProcessors(); + int SMProcesses = Math.toIntExact(p.hosts().stream().filter(h -> h.getIp().equals(me.getIp())).count()); + int coresPerProcess = bound((cores/SMProcesses)+1,2,64); + int bonusCoresPerProcess = SMProcesses == 1 ? 2: 1; + + System.out.println("System has "+cores+" cores, shared by "+SMProcesses+" processes."); + if(cores/SMProcesses < 2){ + System.out.println("Running too many processes on the same System might degrade performance."); + } + System.out.println("Starting Process with "+coresPerProcess+" cores and "+bonusCoresPerProcess+" bonus cores."); + NetManager.THREAD_COUNT = coresPerProcess; + NetManager.THREAD_BOOST_COUNT = bonusCoresPerProcess; + + int messages = p.messageCount(); + int processCount = p.hosts().size(); + int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(4-(processCount/3.0))))); + int windowWidth = bound(coresPerProcess*windowWidthMult,2,messages); + + System.out.println("Process expected to broadcast "+messages+" messages."); + System.out.println("Starting Process with WindowWidth of "+windowWidth+" (~ x"+windowWidthMult+")."); + NetManager.WINDOW_WIDTH = windowWidth; + + //We might want to PingPong To set Custom Timing Limitations.... + NetManager.FD_MAX_TRIES = 3; + NetManager.FD_WAIT = 1000; + + NetManager.SL_MAX_TRIES = 8; + NetManager.INTERNAL_WAIT = 500; + } +} diff --git a/bnr.sh b/bnr.sh index efddb13..b75c884 100755 --- a/bnr.sh +++ b/bnr.sh @@ -3,4 +3,5 @@ export _JAVA_OPTIONS="-Xmx16G" ./257844/build.sh sudo echo 1 -echo "\n\n\n" | ./validate.py -r 257844/run.sh -b fifo -l 257844/bin/logs -p 10 -m 200 +echo "Running with $1 processes and $2 messages" +echo "\n\n\n" | ./validate.py -r 257844/run.sh -b fifo -l 257844/bin/logs -p $1 -m $2