diff --git a/257844/src/main/java/cs451/Main.java b/257844/src/main/java/cs451/Main.java index 524e67a..c21fd43 100644 --- a/257844/src/main/java/cs451/Main.java +++ b/257844/src/main/java/cs451/Main.java @@ -103,8 +103,12 @@ public class Main { NetManager.start(me,parser,(t,ne) -> { Logger.error(t+"(" + ne.message.src + "):" + ne.message.toString()); if(t == NetEventType.DLVR){ - mh_tpe.add(new Pair<>(new Pair<>(ne.message.id,ne.message.src),ne.message.tpe)); - }else if(t== NetEventType.SEND){ + Pair, Message.TYPE> mhpt = new Pair<>(new Pair<>(ne.message.id, ne.message.src), ne.message.tpe); + System.out.print(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second())); + mh_tpe.add(mhpt); + }else if(t== NetEventType.BCST){ + Pair, Message.TYPE> mhpt = new Pair<>(new Pair<>(ne.message.id, ne.message.src), ne.message.tpe); + System.out.print(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second())); mh_tpe.add(new Pair<>(new Pair<>(ne.message.id,ne.message.src),ne.message.tpe)); } }, (t,ne) -> Logger.error("ERR"+t+" - "+ne.getMessage())); @@ -113,7 +117,7 @@ public class Main { coordinator.waitOnBarrier(); System.out.println("Broadcasting messages..."); - NetManager.send(); + NetManager.startBroadcast(); while(!NetManager.isDone()) { Thread.sleep(500); diff --git a/257844/src/main/java/cs451/net/NetManager.java b/257844/src/main/java/cs451/net/NetManager.java index 03b2110..c83c5ae 100644 --- a/257844/src/main/java/cs451/net/NetManager.java +++ b/257844/src/main/java/cs451/net/NetManager.java @@ -31,8 +31,6 @@ public abstract class NetManager { public static int INTERNAL_WAIT; - public static int SL_WAIT; - public static int SL_MAX_TRIES; public static int FD_WAIT; public static int FD_MAX_TRIES; @@ -47,8 +45,8 @@ public abstract class NetManager { private static final Map, NetEventHandlerAbstract> nm_listeners = new HashMap<>(); private static ThreadPoolExecutor ex; - private static final ScheduledExecutorService ftex = Executors.newSingleThreadScheduledExecutor(); - private static final ScheduledExecutorService stex = Executors.newSingleThreadScheduledExecutor(); + private static final ScheduledExecutorService i_tex = Executors.newSingleThreadScheduledExecutor(); + private static final ScheduledExecutorService fd_tex = Executors.newSingleThreadScheduledExecutor(); private static BiConsumer onCompleteHandler; private static BiConsumer onErrorHandler; @@ -87,13 +85,18 @@ public abstract class NetManager { ex = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT+THREAD_BOOST_COUNT,30,TimeUnit.SECONDS,new LinkedBlockingQueue<>()); ex.prestartAllCoreThreads(); - ftex.scheduleAtFixedRate(() -> nm_listeners.values().parallelStream().forEach(neh->{ - neh.deliverIf(); - neh.sendIf(); - }), 0, INTERNAL_WAIT, TimeUnit.MILLISECONDS); + i_tex.scheduleAtFixedRate(()-> { + System.out.println("NetManager DeliverIf/BroadcastIf"); + nm_listeners.values().forEach(nmh-> { + nmh.deliverIf(); + nmh.broadcastIf(); + }); + }, 0, INTERNAL_WAIT, TimeUnit.MILLISECONDS); - stex.scheduleAtFixedRate(()-> nm_listeners.values().forEach(NetEventHandlerInterface::beat - ), 0, FD_WAIT, TimeUnit.MILLISECONDS); + fd_tex.scheduleAtFixedRate(()-> { + System.out.println("NetManager HeartBeat"); + nm_listeners.values().forEach(NetEventHandlerInterface::beat); + }, 0, FD_WAIT, TimeUnit.MILLISECONDS); } @@ -101,11 +104,12 @@ public abstract class NetManager { * Stops the NetManager */ public static void stop() { + System.out.println("NetManager is stopping..."); isStopped = true; nm_listeners.values().forEach(NetEventHandlerAbstract::stop); ex.shutdown(); - ftex.shutdown(); - stex.shutdown(); + i_tex.shutdown(); + fd_tex.shutdown(); System.out.println("NetManager handled "+ex.getCompletedTaskCount()+" tasks during this run."); } @@ -114,14 +118,24 @@ public abstract class NetManager { * @return true if NM and NEH are done */ public static boolean isDone() { - return isStopped || nm_listeners.values().stream().map(NetEventHandlerAbstract::isDone).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2); + return isStopped || nm_listeners.values().stream().map(nmh ->{ + if(!nmh.isDone()){ + System.out.println("NetManager Waiting for: "+nmh.getClass().getSimpleName()); + return false; + } + return true; + }).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2); } //================================================================================================================= //================================================================================================================= - public static void handle(Class layer, NetEventType event, NetEvent ne){ + public static void handleSync(Class layer, NetEventType event, NetEvent ne){ + new NetEventRunner(ne,layer,event).run(); + } + + public static void handleAsync(Class layer, NetEventType event, NetEvent ne){ try { ex.getQueue().add(new NetEventRunner(ne, layer, event)); }catch(IllegalStateException ise){ @@ -140,28 +154,34 @@ public abstract class NetManager { //================================================================================================================= //================================================================================================================= - public static void deliver(Class layer, NetEvent ne) { - handle(layer,NetEventType.DLVR,ne); + public static void deliverAsync(Class layer, NetEvent ne) { + handleAsync(layer,NetEventType.DLVR,ne); } - public static void send(Class layer, NetEvent ne) { - handle(layer,NetEventType.SEND,ne); + public static void broadcastAsync(Class layer, NetEvent ne) { + handleAsync(layer,NetEventType.BCST,ne); } - public static void crash(Class layer, NetEvent ne) { - handle(layer,NetEventType.CRSH,ne); + public static void crashAsync(Class layer, NetEvent ne) { + handleAsync(layer,NetEventType.CRSH,ne); + } + public static void recoverAsync(Class layer, NetEvent ne) { + handleAsync(layer,NetEventType.RCVR,ne); } public static void deliverSync(Class layer, NetEvent ne) { - new NetEventRunner(ne,layer,NetEventType.DLVR).run(); + handleSync(layer,NetEventType.DLVR,ne); } public static void sendSync(Class layer, NetEvent ne) { - new NetEventRunner(ne,layer,NetEventType.SEND).run(); + handleSync(layer,NetEventType.BCST,ne); } public static void crashSync(Class layer, NetEvent ne) { - new NetEventRunner(ne,layer,NetEventType.CRSH).run(); + handleSync(layer,NetEventType.CRSH,ne); + } + public static void recoverSync(Class layer, NetEvent ne) { + handleSync(layer,NetEventType.RCVR,ne); } - public static void send() { - new NetEventRunner(NetEvent.EMPTY(), NetHandlerTOPL.class,NetEventType.SEND).run(); + public static void startBroadcast() { + handleAsync(NetHandlerTOPL.class,NetEventType.BCST,NetEvent.EMPTY()); } @@ -185,10 +205,9 @@ public abstract class NetManager { @Override public void run() { - Logger.debug(this.lt.getSimpleName()+"_"+this.net+ ": " + ((ne.peer==null)?"?":ne.peer.getId()) + " - "+ne.message.toString()+""); - NetEventHandlerAbstract nl = nm_listeners.getOrDefault(this.lt, null); if (nl != null) { + Logger.debug(Thread.currentThread().getId()+"_"+this.lt.getSimpleName().replace("NetHandler","")+"_"+this.net+ ": " + ((ne.peer==null)?"?":ne.peer.getId()) + " - "+ne.message.toString()+""); nl.onEvent(this.net,ne); }else{ error(this.net,new UnsupportedOperationException("No Handler for "+this.lt)); diff --git a/257844/src/main/java/cs451/net/event/NetEventType.java b/257844/src/main/java/cs451/net/event/NetEventType.java index 29eea5b..4e4dc39 100644 --- a/257844/src/main/java/cs451/net/event/NetEventType.java +++ b/257844/src/main/java/cs451/net/event/NetEventType.java @@ -8,7 +8,7 @@ package cs451.net.event; */ public enum NetEventType { DLVR, - SEND, + BCST, CRSH, RCVR; } diff --git a/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java b/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java index 99b45a5..03c53c2 100644 --- a/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java +++ b/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java @@ -47,7 +47,7 @@ public abstract class NetEventHandlerAbstract implements NetEventHandlerInterfac * Sends a NetEvent Synchronously * @param ne NetEvent */ - public void sendNextSync(NetEvent ne){ + public void broadcastNextSync(NetEvent ne){ NetManager.sendSync(broadcastLayer,ne); } /** @@ -57,27 +57,41 @@ public abstract class NetEventHandlerAbstract implements NetEventHandlerInterfac public void crashNextSync(NetEvent ne){ NetManager.crashSync(deliverLayer,ne); } + /** + * Recovers a NetEvent Synchronously + * @param ne NetEvent + */ + public void recoverNextSync(NetEvent ne){ + NetManager.recoverSync(deliverLayer,ne); + } /** * Delivers a NetEvent Asynchronously * @param ne NetEvent */ public void deliverNextAsync(NetEvent ne){ - NetManager.deliver(deliverLayer,ne); + NetManager.deliverAsync(deliverLayer,ne); } /** * Sends a NetEvent Asynchronously * @param ne NetEvent */ - public void sendNextAsync(NetEvent ne){ - NetManager.send(broadcastLayer,ne); + public void broadcastNextAsync(NetEvent ne){ + NetManager.broadcastAsync(broadcastLayer,ne); } /** * Crashes a NetEvent Asynchronously * @param ne NetEvent */ public void crashNextAsync(NetEvent ne) { - NetManager.crash(deliverLayer,ne); + NetManager.crashAsync(deliverLayer,ne); + } + /** + * Recovers a NetEvent Asynchronously + * @param ne NetEvent + */ + public void recoverNextAsync(NetEvent ne) { + NetManager.recoverAsync(deliverLayer,ne); } /** diff --git a/257844/src/main/java/cs451/net/handler/NetEventHandlerInterface.java b/257844/src/main/java/cs451/net/handler/NetEventHandlerInterface.java index aa921e9..898ed70 100644 --- a/257844/src/main/java/cs451/net/handler/NetEventHandlerInterface.java +++ b/257844/src/main/java/cs451/net/handler/NetEventHandlerInterface.java @@ -17,7 +17,7 @@ public interface NetEventHandlerInterface { /** * Sends Messages on condition (500ms event timer) */ - default void sendIf(){} + default void broadcastIf(){} /** * Delivers Messages on condition (500ms event timer) */ @@ -38,12 +38,17 @@ public interface NetEventHandlerInterface { * Sends a NetEvent to the next layer synchronously * @param ne NetEvent */ - void sendNextSync(NetEvent ne); + void broadcastNextSync(NetEvent ne); /** * Crashes a NetEvent to the next layer synchronously * @param ne NetEvent */ void crashNextSync(NetEvent ne); + /** + * Recovers a NetEvent to the next layer synchronously + * @param ne NetEvent + */ + void recoverNextSync(NetEvent ne); /** * Delivers a NetEvent to the next layer asynchronously @@ -54,12 +59,17 @@ public interface NetEventHandlerInterface { * Sends a NetEvent to the next layer asynchronously * @param ne NetEvent */ - void sendNextAsync(NetEvent ne); + void broadcastNextAsync(NetEvent ne); /** * Crashes a NetEvent to the next layer asynchronously * @param ne NetEvent */ void crashNextAsync(NetEvent ne); + /** + * Recovers a NetEvent to the next layer asynchronously + * @param ne NetEvent + */ + void recoverNextAsync(NetEvent ne); /** * Handles a NetEvent @@ -70,11 +80,12 @@ public interface NetEventHandlerInterface { switch (et){ case DLVR: deliver(ne); break; - case SEND: send(ne); + case BCST: broadcast(ne); break; case CRSH: crash(ne); break; - case RCVR: + case RCVR: recover(ne); + break; default: Logger.error("Unhandled EventType:"+et); } @@ -85,8 +96,8 @@ public interface NetEventHandlerInterface { * Send Event Handler * @param ne NetEvent */ - default void send(NetEvent ne){ - sendNextSync(ne); + default void broadcast(NetEvent ne){ + broadcastNextSync(ne); } /** * Deliver Event Handler @@ -102,6 +113,13 @@ public interface NetEventHandlerInterface { default void crash(NetEvent ne){ crashNextSync(ne); } + /** + * Recover Event Handler + * @param ne NetEvent + */ + default void recover(NetEvent ne){ + recoverNextSync(ne); + } /** * Starts the NetEventHandler diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java b/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java index 7e6c511..50d5fae 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java @@ -5,6 +5,7 @@ import cs451.parser.Host; import cs451.parser.Parser; import java.util.List; +import java.util.stream.Collectors; /** * NetEventHandler for Best Effort Broadcast @@ -23,21 +24,16 @@ public class NetHandlerBEB extends NetEventHandlerAbstract { } @Override - public void send(NetEvent ne) { - hosts.parallelStream().forEach(h ->{ - if(this.h.getId() != h.getId()) { - sendNextAsync(NetEvent.Message(h, ne.message)); - }else{ - deliverNextAsync(NetEvent.Message(h,ne.message)); - } - }); + public void broadcast(NetEvent ne) { + deliverNextAsync(NetEvent.Message(h,ne.message)); + hosts.parallelStream().forEach(h -> broadcastNextSync(NetEvent.Message(h, ne.message))); } @Override public void start(Host h, Parser p) { super.start(h,p); this.h = h; - hosts = p.hosts(); + hosts = p.hosts().stream().filter(ch -> ch.getId()!=h.getId()).collect(Collectors.toList()); } } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java b/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java index 8a8e322..0afe3ed 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java @@ -18,8 +18,8 @@ public class NetHandlerDFLT extends NetEventHandlerAbstract { } @Override - public void send(NetEvent ne){ - NetManager.error(NetEventType.SEND,new Exception("Default Handler")); + public void broadcast(NetEvent ne){ + NetManager.error(NetEventType.BCST,new Exception("Default Handler")); } @Override diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerFD.java b/257844/src/main/java/cs451/net/handler/NetHandlerFD.java index a168673..4a71692 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerFD.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerFD.java @@ -27,24 +27,33 @@ public class NetHandlerFD extends NetEventHandlerAbstract { } @Override - public void beat() { - hosts.forEach(h-> alive.computeIfPresent(h.getId(),(k, v) -> { - if(v == -1) return 0; - if(v > NetManager.FD_MAX_TRIES) + public synchronized void beat() { + hosts.forEach(h-> { + alive.computeIfPresent(h.getId(),(k, v) -> { + if(v > -1) broadcastNextAsync(NetEvent.MessageHRTB(h)); + return v+1; + }); + if(alive.getOrDefault(h.getId(),-1) >= NetManager.FD_MAX_TRIES){ crashNextAsync(NetEvent.Message(h, Message.EMPTY())); - else - sendNextSync(NetEvent.MessageHRTB(h)); - return v+1; - })); + alive.remove(h.getId()); + } + }); + } + + @Override + public void broadcast(NetEvent ne) { + if (alive.containsKey(ne.peer.getId())){ + broadcastNextSync(ne); + } } @Override public void deliver(NetEvent ne) { alive.computeIfPresent(ne.peer.getId(),(k,v)->-1); - switch(ne.message.tpe){ - case HRTB: - break; - default: deliverNextSync(ne); + if (ne.message.tpe != Message.TYPE.HRTB) { + deliverNextSync(ne); + }else{ + broadcastNextSync(NetEvent.MessageHRTB(ne.peer)); } } @@ -52,6 +61,6 @@ public class NetHandlerFD extends NetEventHandlerAbstract { public void start(Host h, Parser p) { super.start(h, p); hosts = p.hosts(); - hosts.stream().filter(ch->ch.getId()!=h.getId()).forEach(ch-> alive.put(ch.getId(),0)); + hosts.stream().filter(ch->ch.getId()!=h.getId()).forEach(ch-> alive.put(ch.getId(),-1)); } } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java b/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java index 0bad518..9f20f49 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java @@ -35,14 +35,14 @@ public class NetHandlerFIFO extends NetEventHandlerAbstract { } @Override - public void send(NetEvent ne) { + public synchronized void broadcast(NetEvent ne) { Integer snv = sn.getAndIncrement(); - NetManager.complete(NetEventType.SEND, NetEvent.Message(me, snv, Message.TYPE.BCST)); - sendNextAsync(NetEvent.Message(me.getId(),snv)); + NetManager.complete(NetEventType.BCST, NetEvent.Message(me, snv, Message.TYPE.BCST)); + broadcastNextAsync(NetEvent.Message(me.getId(),snv)); } @Override - public void deliverIf(){ + public synchronized void deliverIf(){ pending.removeIf(hmp -> { Integer crsn = rsn.getOrDefault(hmp.src, 1); if (hmp.id.equals(crsn)) { diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java b/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java index d9e0040..5d815f0 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java @@ -38,9 +38,9 @@ public class NetHandlerSCKT extends NetEventHandlerAbstract { } @Override - public void send(NetEvent event) { + public void broadcast(NetEvent event) { if(socket.isClosed()){ - NetManager.error(NetEventType.SEND,new SocketException("Socket Closed")); + NetManager.error(NetEventType.BCST,new SocketException("Socket Closed")); return; } ByteBuffer b = ByteBuffer.allocate(BUFF_SIZE); @@ -49,7 +49,7 @@ public class NetHandlerSCKT extends NetEventHandlerAbstract { try { socket.send(datagram); } catch (IOException e) { - NetManager.error(NetEventType.SEND,e); + NetManager.error(NetEventType.BCST,e); } } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerSL.java b/257844/src/main/java/cs451/net/handler/NetHandlerSL.java index 80ff4dd..6dd11de 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerSL.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerSL.java @@ -1,7 +1,6 @@ package cs451.net.handler; import cs451.net.event.Message; -import cs451.net.NetManager; import cs451.net.event.*; import cs451.parser.Host; import cs451.parser.Parser; @@ -9,7 +8,6 @@ import cs451.tools.Pair; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; /** * NetEventHandler for Stuborn Link @@ -27,54 +25,34 @@ public class NetHandlerSL extends NetEventHandlerAbstract { private List hosts; Set hasTimeout = ConcurrentHashMap.newKeySet(); - PriorityBlockingQueue,Message>> sending = new PriorityBlockingQueue<>(); + PriorityBlockingQueue> sending = new PriorityBlockingQueue<>(); public NetHandlerSL(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); } @Override - public void sendIf(){ - sending.forEach(ppm -> { - ppm.first().second().getAndIncrement(); - Optional h = hosts.stream().filter(hl->hl.getId()==ppm.first().first()).findFirst(); - h.ifPresent(host -> sendNextAsync(NetEvent.Message(host, ppm.second()))); - }); - sending.removeIf(ppm -> { - if(hasTimeout.contains(ppm.first().first())){ - return true; - } -// if(ppm.first().second().get() > NetManager.SL_MAX_TRIES){ -// hasTimeout.add(ppm.first().first()); -// Optional h = hosts.stream().filter(hl->hl.getId()==ppm.first().first()).findFirst(); -// h.ifPresent(host -> crashNextAsync(NetEvent.Message(host, ppm.second()))); -// return true; -// } - return false; - }); + public synchronized void broadcastIf(){ + sending.removeIf(ppm -> hasTimeout.contains(ppm.first())); + sending.forEach(ppm -> hosts.stream().filter(hl->hl.getId()==ppm.first()).findFirst() + .ifPresent(host -> broadcastNextSync(NetEvent.Message(host, ppm.second())))); } @Override - public void send(NetEvent ne) { - if(hasTimeout.contains(ne.peer.getId())){ - return; - } - sending.add(new Pair<>(new Pair<>(ne.peer.getId(), new AtomicInteger(1)),ne.message)); - sendNextSync(ne); + public void broadcast(NetEvent ne) { + sending.add(new Pair<>(ne.peer.getId(),Message.MSG(ne.message))); + broadcastNextSync(ne); } @Override public void deliver(NetEvent ne) { - if(hasTimeout.contains(ne.peer.getId())){ - return; - } switch (ne.message.tpe){ case ACK: - sending.removeIf(ppm -> (ppm.first().first()==ne.peer.getId() && + sending.removeIf(ppm -> (ne.peer.getId()==ppm.first() && ppm.second().equals(ne.message))); break; case DATA: - sendNextSync(NetEvent.MessageACK(ne.peer,ne.message)); + broadcastNextSync(NetEvent.MessageACK(ne.peer,ne.message)); deliverNextSync(ne); break; default: @@ -88,6 +66,13 @@ public class NetHandlerSL extends NetEventHandlerAbstract { crashNextSync(ne); } + @Override + public void recover(NetEvent ne) { + hasTimeout.remove(ne.peer.getId()); + recoverNextSync(ne); + } + + @Override public boolean isDone() { return sending.isEmpty(); diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java b/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java index ef3061f..90beb53 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java @@ -5,6 +5,7 @@ import cs451.net.event.NetEvent; import cs451.parser.Host; import cs451.parser.Parser; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -23,6 +24,8 @@ public class NetHandlerTOPL extends NetEventHandlerAbstract { private final AtomicInteger delivered = new AtomicInteger(0); private final AtomicInteger waiting = new AtomicInteger(0); + private final AtomicBoolean status = new AtomicBoolean(false); + private Host me; public NetHandlerTOPL(Class deliverLayer, Class broadcastLayer) { @@ -30,27 +33,29 @@ public class NetHandlerTOPL extends NetEventHandlerAbstract { } @Override - public void sendIf() { + public synchronized void broadcastIf() { + if( !status.get() ) return; while (toSend.get() > 0 && waiting.get() < NetManager.WINDOW_WIDTH) { toSend.decrementAndGet(); waiting.incrementAndGet(); - sendNextSync(NetEvent.EMPTY()); + broadcastNextSync(NetEvent.EMPTY()); } } @Override - public void send(NetEvent ne) { - sendIf(); + public void broadcast(NetEvent ne) { + status.set(true); + broadcastIf(); } @Override - public void deliver(NetEvent ne) { + public synchronized void deliver(NetEvent ne) { if (ne.message.src == me.getId()) { delivered.incrementAndGet(); waiting.decrementAndGet(); } - sendIf(); + broadcastIf(); } @Override diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerURB.java b/257844/src/main/java/cs451/net/handler/NetHandlerURB.java index 9780dac..4525183 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerURB.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerURB.java @@ -36,22 +36,22 @@ public class NetHandlerURB extends NetEventHandlerAbstract { } @Override - public void deliverIf(){ + public synchronized void deliverIf(){ pending.stream().filter(smp -> ack.getOrDefault(smp,ConcurrentHashMap.newKeySet()).containsAll(correct)). filter(delivered::add). forEach(smp -> deliverNextAsync(NetEvent.Message(smp.first(),smp.second()))); } @Override - public void send(NetEvent ne) { + public void broadcast(NetEvent ne) { pending.add(new Pair<>(myId, ne.message.id)); ne.message.src = myId; - sendNextSync(ne); + broadcastNextSync(ne); } @Override public void deliver(NetEvent ne) { - Pair smp =new Pair<>(ne.message.src,ne.message.id); + Pair smp = new Pair<>(ne.message.src,ne.message.id); ack.compute(smp,(k, v) -> { if(v == null){ v = ConcurrentHashMap.newKeySet(); @@ -60,7 +60,7 @@ public class NetHandlerURB extends NetEventHandlerAbstract { return v; }); if(pending.add(smp)){ - sendNextSync(ne); + broadcastNextAsync(ne); } deliverIf(); } diff --git a/257844/src/main/java/cs451/tools/Pair.java b/257844/src/main/java/cs451/tools/Pair.java index 67f410e..685bab4 100644 --- a/257844/src/main/java/cs451/tools/Pair.java +++ b/257844/src/main/java/cs451/tools/Pair.java @@ -59,7 +59,7 @@ public class Pair implements Comparable { return false; } final Pair other = (Pair) obj; - return this.vl.equals(other.vl); + return this.first().equals(other.first()) && this.second().equals(other.second()); } @Override diff --git a/257844/src/main/java/cs451/tools/ParamDetector.java b/257844/src/main/java/cs451/tools/ParamDetector.java index fa42eae..8beedae 100644 --- a/257844/src/main/java/cs451/tools/ParamDetector.java +++ b/257844/src/main/java/cs451/tools/ParamDetector.java @@ -30,8 +30,8 @@ public abstract class ParamDetector { int messages = p.messageCount(); int processCount = p.hosts().size(); - int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(4-(processCount/3.0))))); - int windowWidth = bound(coresPerProcess*windowWidthMult,2,messages); + int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(5-(processCount/5.0))))); + int windowWidth = bound(coresPerProcess*windowWidthMult,4,messages); System.out.println("Process expected to broadcast "+messages+" messages."); System.out.println("Starting Process with WindowWidth of "+windowWidth+" (~ x"+windowWidthMult+")."); @@ -42,8 +42,5 @@ public abstract class ParamDetector { //We might want to PingPong To set Custom Timing Limitations.... NetManager.FD_MAX_TRIES = 10; NetManager.FD_WAIT = 1000; - - NetManager.SL_MAX_TRIES = 8; - NetManager.SL_WAIT = NetManager.INTERNAL_WAIT; } } diff --git a/docker/Dockerfile b/docker/Dockerfile index 10fe3b5..f397497 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -16,9 +16,14 @@ RUN apt-get -y install file unzip zip xz-utils git \ ADD docker/apache-maven-3.6.3-bin.tar.gz /usr/local RUN ln -s /usr/local/apache-maven-3.6.3/bin/mvn /usr/local/bin -COPY template_cpp /root/template_cpp -COPY template_java /root/template_java +#COPY template_cpp /root/template_cpp +#COPY template_java /root/template_java +COPY 257844 /root/257844 ADD barrier.py /root +ADD finishedSignal.py /root +ADD validate.py /root +ADD bnr.sh /root -RUN /root/template_cpp/build.sh && /root/template_cpp/cleanup.sh -RUN /root/template_java/build.sh && /root/template_java/cleanup.sh +#RUN /root/template_cpp/build.sh && /root/template_cpp/cleanup.sh +#RUN /root/template_java/build.sh && /root/template_java/cleanup.sh +RUN /root/257844/build.sh && /root/257844/cleanup.sh diff --git a/docker/run-example.sh b/docker/run-example.sh index da148ad..59db3da 100755 --- a/docker/run-example.sh +++ b/docker/run-example.sh @@ -1,3 +1,3 @@ #!/bin/bash -docker run -it da_image /bin/bash +docker run -it --rm da_image /bin/bash