1
0

Fix timing

This commit is contained in:
choelzl 2020-11-13 22:07:46 +01:00
parent 3f6730f9f5
commit ad59dd7c4d
Signed by: sora
GPG Key ID: A362EA0491E2EEA0
9 changed files with 25 additions and 45 deletions

View File

@ -86,15 +86,13 @@ public abstract class NetManager {
ex.prestartAllCoreThreads(); ex.prestartAllCoreThreads();
i_tex.scheduleAtFixedRate(()-> { i_tex.scheduleAtFixedRate(()-> {
System.out.println("NetManager DeliverIf/BroadcastIf"); System.err.println("NetManager DeliverIf/BroadcastIf");
nm_listeners.values().forEach(nmh-> { nm_listeners.values().forEach(NetEventHandlerInterface::deliverIf);
nmh.deliverIf(); nm_listeners.values().forEach(NetEventHandlerInterface::broadcastIf);
nmh.broadcastIf();
});
}, 0, INTERNAL_WAIT, TimeUnit.MILLISECONDS); }, 0, INTERNAL_WAIT, TimeUnit.MILLISECONDS);
fd_tex.scheduleAtFixedRate(()-> { fd_tex.scheduleAtFixedRate(()-> {
System.out.println("NetManager HeartBeat"); System.err.println("NetManager HeartBeat");
nm_listeners.values().forEach(NetEventHandlerInterface::beat); nm_listeners.values().forEach(NetEventHandlerInterface::beat);
}, 0, FD_WAIT, TimeUnit.MILLISECONDS); }, 0, FD_WAIT, TimeUnit.MILLISECONDS);
} }
@ -104,7 +102,7 @@ public abstract class NetManager {
* Stops the NetManager * Stops the NetManager
*/ */
public static void stop() { public static void stop() {
System.out.println("NetManager is stopping..."); System.err.println("NetManager is stopping...");
isStopped = true; isStopped = true;
nm_listeners.values().forEach(NetEventHandlerAbstract::stop); nm_listeners.values().forEach(NetEventHandlerAbstract::stop);
ex.shutdown(); ex.shutdown();
@ -120,7 +118,7 @@ public abstract class NetManager {
public static boolean isDone() { public static boolean isDone() {
return isStopped || nm_listeners.values().stream().map(nmh ->{ return isStopped || nm_listeners.values().stream().map(nmh ->{
if(!nmh.isDone()){ if(!nmh.isDone()){
System.out.println("NetManager Waiting for: "+nmh.getClass().getSimpleName()); System.err.println("NetManager Waiting for: "+nmh.getClass().getSimpleName());
return false; return false;
} }
return true; return true;

View File

@ -5,9 +5,6 @@ import cs451.net.event.NetEvent;
import cs451.parser.Host; import cs451.parser.Host;
import cs451.parser.Parser; import cs451.parser.Parser;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* NetEventHandler abstraction class * NetEventHandler abstraction class
* *
@ -22,8 +19,6 @@ public abstract class NetEventHandlerAbstract implements NetEventHandlerInterfac
private final Class<? extends NetEventHandlerAbstract> deliverLayer; private final Class<? extends NetEventHandlerAbstract> deliverLayer;
private final Class<? extends NetEventHandlerAbstract> broadcastLayer; private final Class<? extends NetEventHandlerAbstract> broadcastLayer;
public final AtomicBoolean active = new AtomicBoolean(true);
/** /**
* Initialized the main NetEventHandler fields * Initialized the main NetEventHandler fields
@ -99,17 +94,11 @@ public abstract class NetEventHandlerAbstract implements NetEventHandlerInterfac
* @param h host (self) * @param h host (self)
* @param p parser * @param p parser
*/ */
public void start(Host h, Parser p) { public void start(Host h, Parser p) {}
Objects.requireNonNull(h);
Objects.requireNonNull(p);
active.set(true);
}
/** /**
* Stops the NetEventHandler * Stops the NetEventHandler
*/ */
public void stop() { public void stop() {}
active.set(false);
}
} }

View File

@ -30,11 +30,11 @@ public class NetHandlerFD extends NetEventHandlerAbstract {
public synchronized void beat() { public synchronized void beat() {
hosts.forEach(h-> { hosts.forEach(h-> {
alive.computeIfPresent(h.getId(),(k, v) -> { alive.computeIfPresent(h.getId(),(k, v) -> {
if(v > -1) broadcastNextAsync(NetEvent.MessageHRTB(h)); broadcastNextSync(NetEvent.MessageHRTB(h));
return v+1; return v+1;
}); });
if(alive.getOrDefault(h.getId(),-1) >= NetManager.FD_MAX_TRIES){ if(alive.getOrDefault(h.getId(),0) > NetManager.FD_MAX_TRIES){
crashNextAsync(NetEvent.Message(h, Message.EMPTY())); crashNextSync(NetEvent.Message(h, Message.EMPTY()));
alive.remove(h.getId()); alive.remove(h.getId());
} }
}); });
@ -49,11 +49,9 @@ public class NetHandlerFD extends NetEventHandlerAbstract {
@Override @Override
public void deliver(NetEvent ne) { public void deliver(NetEvent ne) {
alive.computeIfPresent(ne.peer.getId(),(k,v)->-1); alive.computeIfPresent(ne.peer.getId(),(k,v)->0);
if (ne.message.tpe != Message.TYPE.HRTB) { if (ne.message.tpe != Message.TYPE.HRTB) {
deliverNextSync(ne); deliverNextSync(ne);
}else{
broadcastNextSync(NetEvent.MessageHRTB(ne.peer));
} }
} }

View File

@ -58,7 +58,11 @@ public class NetHandlerFIFO extends NetEventHandlerAbstract {
@Override @Override
public void deliver(NetEvent ne) { public void deliver(NetEvent ne) {
pending.add(ne.message); pending.add(ne.message);
deliverIf(); }
@Override
public boolean isDone() {
return pending.stream().noneMatch(hmp -> hmp.id.equals(rsn.getOrDefault(hmp.src, 1)));
} }
@Override @Override

View File

@ -88,7 +88,7 @@ public class NetHandlerSCKT extends NetEventHandlerAbstract {
} }
tex.execute(()->{ tex.execute(()->{
while (active.get()) { while (true) {
deliver(NetEvent.EMPTY()); deliver(NetEvent.EMPTY());
} }
}); });

View File

@ -1,5 +1,6 @@
package cs451.net.handler; package cs451.net.handler;
import cs451.net.NetManager;
import cs451.net.event.Message; import cs451.net.event.Message;
import cs451.net.event.*; import cs451.net.event.*;
import cs451.parser.Host; import cs451.parser.Host;
@ -32,15 +33,16 @@ public class NetHandlerSL extends NetEventHandlerAbstract {
} }
@Override @Override
public synchronized void broadcastIf(){ public synchronized void beat(){
sending.removeIf(ppm -> hasTimeout.contains(ppm.first())); sending.removeIf(ppm -> hasTimeout.contains(ppm.first()));
sending.forEach(ppm -> hosts.stream().filter(hl->hl.getId()==ppm.first()).findFirst() sending.stream().limit(NetManager.WINDOW_WIDTH).forEach(ppm -> hosts.stream().filter(hl->hl.getId()==(ppm.first())).findFirst()
.ifPresent(host -> broadcastNextSync(NetEvent.Message(host, ppm.second())))); .ifPresent(host -> broadcastNextSync(NetEvent.Message(host, ppm.second()))));
} }
@Override @Override
public void broadcast(NetEvent ne) { public void broadcast(NetEvent ne) {
sending.add(new Pair<>(ne.peer.getId(),Message.MSG(ne.message))); if (!hasTimeout.contains(ne.peer.getId()))
sending.add(new Pair<>(ne.peer.getId(),Message.MSG(ne.message)));
broadcastNextSync(ne); broadcastNextSync(ne);
} }
@ -66,13 +68,6 @@ public class NetHandlerSL extends NetEventHandlerAbstract {
crashNextSync(ne); crashNextSync(ne);
} }
@Override
public void recover(NetEvent ne) {
hasTimeout.remove(ne.peer.getId());
recoverNextSync(ne);
}
@Override @Override
public boolean isDone() { public boolean isDone() {
return sending.isEmpty(); return sending.isEmpty();

View File

@ -46,7 +46,6 @@ public class NetHandlerTOPL extends NetEventHandlerAbstract {
@Override @Override
public void broadcast(NetEvent ne) { public void broadcast(NetEvent ne) {
status.set(true); status.set(true);
broadcastIf();
} }
@Override @Override
@ -55,7 +54,6 @@ public class NetHandlerTOPL extends NetEventHandlerAbstract {
delivered.incrementAndGet(); delivered.incrementAndGet();
waiting.decrementAndGet(); waiting.decrementAndGet();
} }
broadcastIf();
} }
@Override @Override

View File

@ -62,13 +62,11 @@ public class NetHandlerURB extends NetEventHandlerAbstract {
if(pending.add(smp)){ if(pending.add(smp)){
broadcastNextAsync(ne); broadcastNextAsync(ne);
} }
deliverIf();
} }
@Override @Override
public void crash(NetEvent ne) { public void crash(NetEvent ne) {
correct.remove(ne.peer.getId()); correct.remove(ne.peer.getId());
deliverIf();
} }
@Override @Override

View File

@ -30,14 +30,14 @@ public abstract class ParamDetector {
int messages = p.messageCount(); int messages = p.messageCount();
int processCount = p.hosts().size(); int processCount = p.hosts().size();
int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(5-(processCount/5.0))))); int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(4-(processCount/4.0)))));
int windowWidth = bound(coresPerProcess*windowWidthMult,4,messages); int windowWidth = bound(coresPerProcess*windowWidthMult,4,messages);
System.out.println("Process expected to broadcast "+messages+" messages."); System.out.println("Process expected to broadcast "+messages+" messages.");
System.out.println("Starting Process with WindowWidth of "+windowWidth+" (~ x"+windowWidthMult+")."); System.out.println("Starting Process with WindowWidth of "+windowWidth+" (~ x"+windowWidthMult+").");
NetManager.WINDOW_WIDTH = windowWidth; NetManager.WINDOW_WIDTH = windowWidth;
NetManager.INTERNAL_WAIT = 500; NetManager.INTERNAL_WAIT = 50;
//We might want to PingPong To set Custom Timing Limitations.... //We might want to PingPong To set Custom Timing Limitations....
NetManager.FD_MAX_TRIES = 10; NetManager.FD_MAX_TRIES = 10;