From ad59dd7c4d6569276ece0621f642953460a79440 Mon Sep 17 00:00:00 2001 From: choelzl Date: Fri, 13 Nov 2020 22:07:46 +0100 Subject: [PATCH] Fix timing --- 257844/src/main/java/cs451/net/NetManager.java | 14 ++++++-------- .../net/handler/NetEventHandlerAbstract.java | 15 ++------------- .../main/java/cs451/net/handler/NetHandlerFD.java | 10 ++++------ .../java/cs451/net/handler/NetHandlerFIFO.java | 6 +++++- .../java/cs451/net/handler/NetHandlerSCKT.java | 2 +- .../main/java/cs451/net/handler/NetHandlerSL.java | 15 +++++---------- .../java/cs451/net/handler/NetHandlerTOPL.java | 2 -- .../java/cs451/net/handler/NetHandlerURB.java | 2 -- .../src/main/java/cs451/tools/ParamDetector.java | 4 ++-- 9 files changed, 25 insertions(+), 45 deletions(-) diff --git a/257844/src/main/java/cs451/net/NetManager.java b/257844/src/main/java/cs451/net/NetManager.java index c83c5ae..4c2c095 100644 --- a/257844/src/main/java/cs451/net/NetManager.java +++ b/257844/src/main/java/cs451/net/NetManager.java @@ -86,15 +86,13 @@ public abstract class NetManager { ex.prestartAllCoreThreads(); i_tex.scheduleAtFixedRate(()-> { - System.out.println("NetManager DeliverIf/BroadcastIf"); - nm_listeners.values().forEach(nmh-> { - nmh.deliverIf(); - nmh.broadcastIf(); - }); + System.err.println("NetManager DeliverIf/BroadcastIf"); + nm_listeners.values().forEach(NetEventHandlerInterface::deliverIf); + nm_listeners.values().forEach(NetEventHandlerInterface::broadcastIf); }, 0, INTERNAL_WAIT, TimeUnit.MILLISECONDS); fd_tex.scheduleAtFixedRate(()-> { - System.out.println("NetManager HeartBeat"); + System.err.println("NetManager HeartBeat"); nm_listeners.values().forEach(NetEventHandlerInterface::beat); }, 0, FD_WAIT, TimeUnit.MILLISECONDS); } @@ -104,7 +102,7 @@ public abstract class NetManager { * Stops the NetManager */ public static void stop() { - System.out.println("NetManager is stopping..."); + System.err.println("NetManager is stopping..."); isStopped = true; nm_listeners.values().forEach(NetEventHandlerAbstract::stop); ex.shutdown(); @@ -120,7 +118,7 @@ public abstract class NetManager { public static boolean isDone() { return isStopped || nm_listeners.values().stream().map(nmh ->{ if(!nmh.isDone()){ - System.out.println("NetManager Waiting for: "+nmh.getClass().getSimpleName()); + System.err.println("NetManager Waiting for: "+nmh.getClass().getSimpleName()); return false; } return true; diff --git a/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java b/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java index 03c53c2..14959d9 100644 --- a/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java +++ b/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java @@ -5,9 +5,6 @@ import cs451.net.event.NetEvent; import cs451.parser.Host; import cs451.parser.Parser; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; - /** * NetEventHandler abstraction class * @@ -22,8 +19,6 @@ public abstract class NetEventHandlerAbstract implements NetEventHandlerInterfac private final Class deliverLayer; private final Class broadcastLayer; - public final AtomicBoolean active = new AtomicBoolean(true); - /** * Initialized the main NetEventHandler fields @@ -99,17 +94,11 @@ public abstract class NetEventHandlerAbstract implements NetEventHandlerInterfac * @param h host (self) * @param p parser */ - public void start(Host h, Parser p) { - Objects.requireNonNull(h); - Objects.requireNonNull(p); - active.set(true); - } + public void start(Host h, Parser p) {} /** * Stops the NetEventHandler */ - public void stop() { - active.set(false); - } + public void stop() {} } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerFD.java b/257844/src/main/java/cs451/net/handler/NetHandlerFD.java index 4a71692..29c71b2 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerFD.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerFD.java @@ -30,11 +30,11 @@ public class NetHandlerFD extends NetEventHandlerAbstract { public synchronized void beat() { hosts.forEach(h-> { alive.computeIfPresent(h.getId(),(k, v) -> { - if(v > -1) broadcastNextAsync(NetEvent.MessageHRTB(h)); + broadcastNextSync(NetEvent.MessageHRTB(h)); return v+1; }); - if(alive.getOrDefault(h.getId(),-1) >= NetManager.FD_MAX_TRIES){ - crashNextAsync(NetEvent.Message(h, Message.EMPTY())); + if(alive.getOrDefault(h.getId(),0) > NetManager.FD_MAX_TRIES){ + crashNextSync(NetEvent.Message(h, Message.EMPTY())); alive.remove(h.getId()); } }); @@ -49,11 +49,9 @@ public class NetHandlerFD extends NetEventHandlerAbstract { @Override public void deliver(NetEvent ne) { - alive.computeIfPresent(ne.peer.getId(),(k,v)->-1); + alive.computeIfPresent(ne.peer.getId(),(k,v)->0); if (ne.message.tpe != Message.TYPE.HRTB) { deliverNextSync(ne); - }else{ - broadcastNextSync(NetEvent.MessageHRTB(ne.peer)); } } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java b/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java index 9f20f49..755dcd8 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java @@ -58,7 +58,11 @@ public class NetHandlerFIFO extends NetEventHandlerAbstract { @Override public void deliver(NetEvent ne) { pending.add(ne.message); - deliverIf(); + } + + @Override + public boolean isDone() { + return pending.stream().noneMatch(hmp -> hmp.id.equals(rsn.getOrDefault(hmp.src, 1))); } @Override diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java b/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java index 5d815f0..9346cc0 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java @@ -88,7 +88,7 @@ public class NetHandlerSCKT extends NetEventHandlerAbstract { } tex.execute(()->{ - while (active.get()) { + while (true) { deliver(NetEvent.EMPTY()); } }); diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerSL.java b/257844/src/main/java/cs451/net/handler/NetHandlerSL.java index 6dd11de..3e62028 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerSL.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerSL.java @@ -1,5 +1,6 @@ package cs451.net.handler; +import cs451.net.NetManager; import cs451.net.event.Message; import cs451.net.event.*; import cs451.parser.Host; @@ -32,15 +33,16 @@ public class NetHandlerSL extends NetEventHandlerAbstract { } @Override - public synchronized void broadcastIf(){ + public synchronized void beat(){ sending.removeIf(ppm -> hasTimeout.contains(ppm.first())); - sending.forEach(ppm -> hosts.stream().filter(hl->hl.getId()==ppm.first()).findFirst() + sending.stream().limit(NetManager.WINDOW_WIDTH).forEach(ppm -> hosts.stream().filter(hl->hl.getId()==(ppm.first())).findFirst() .ifPresent(host -> broadcastNextSync(NetEvent.Message(host, ppm.second())))); } @Override public void broadcast(NetEvent ne) { - sending.add(new Pair<>(ne.peer.getId(),Message.MSG(ne.message))); + if (!hasTimeout.contains(ne.peer.getId())) + sending.add(new Pair<>(ne.peer.getId(),Message.MSG(ne.message))); broadcastNextSync(ne); } @@ -66,13 +68,6 @@ public class NetHandlerSL extends NetEventHandlerAbstract { crashNextSync(ne); } - @Override - public void recover(NetEvent ne) { - hasTimeout.remove(ne.peer.getId()); - recoverNextSync(ne); - } - - @Override public boolean isDone() { return sending.isEmpty(); diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java b/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java index 90beb53..7175faf 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java @@ -46,7 +46,6 @@ public class NetHandlerTOPL extends NetEventHandlerAbstract { @Override public void broadcast(NetEvent ne) { status.set(true); - broadcastIf(); } @Override @@ -55,7 +54,6 @@ public class NetHandlerTOPL extends NetEventHandlerAbstract { delivered.incrementAndGet(); waiting.decrementAndGet(); } - broadcastIf(); } @Override diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerURB.java b/257844/src/main/java/cs451/net/handler/NetHandlerURB.java index 4525183..e260419 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerURB.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerURB.java @@ -62,13 +62,11 @@ public class NetHandlerURB extends NetEventHandlerAbstract { if(pending.add(smp)){ broadcastNextAsync(ne); } - deliverIf(); } @Override public void crash(NetEvent ne) { correct.remove(ne.peer.getId()); - deliverIf(); } @Override diff --git a/257844/src/main/java/cs451/tools/ParamDetector.java b/257844/src/main/java/cs451/tools/ParamDetector.java index 8beedae..7cb0342 100644 --- a/257844/src/main/java/cs451/tools/ParamDetector.java +++ b/257844/src/main/java/cs451/tools/ParamDetector.java @@ -30,14 +30,14 @@ public abstract class ParamDetector { int messages = p.messageCount(); int processCount = p.hosts().size(); - int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(5-(processCount/5.0))))); + int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(4-(processCount/4.0))))); int windowWidth = bound(coresPerProcess*windowWidthMult,4,messages); System.out.println("Process expected to broadcast "+messages+" messages."); System.out.println("Starting Process with WindowWidth of "+windowWidth+" (~ x"+windowWidthMult+")."); NetManager.WINDOW_WIDTH = windowWidth; - NetManager.INTERNAL_WAIT = 500; + NetManager.INTERNAL_WAIT = 50; //We might want to PingPong To set Custom Timing Limitations.... NetManager.FD_MAX_TRIES = 10;