1
0

Improvements

This commit is contained in:
choelzl 2020-11-13 17:40:09 +01:00
parent 9e710f03b7
commit 3f6730f9f5
Signed by: sora
GPG Key ID: A362EA0491E2EEA0
17 changed files with 180 additions and 128 deletions

View File

@ -103,8 +103,12 @@ public class Main {
NetManager.start(me,parser,(t,ne) -> { NetManager.start(me,parser,(t,ne) -> {
Logger.error(t+"(" + ne.message.src + "):" + ne.message.toString()); Logger.error(t+"(" + ne.message.src + "):" + ne.message.toString());
if(t == NetEventType.DLVR){ if(t == NetEventType.DLVR){
mh_tpe.add(new Pair<>(new Pair<>(ne.message.id,ne.message.src),ne.message.tpe)); Pair<Pair<Integer, Integer>, Message.TYPE> mhpt = new Pair<>(new Pair<>(ne.message.id, ne.message.src), ne.message.tpe);
}else if(t== NetEventType.SEND){ System.out.print(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second()));
mh_tpe.add(mhpt);
}else if(t== NetEventType.BCST){
Pair<Pair<Integer, Integer>, Message.TYPE> mhpt = new Pair<>(new Pair<>(ne.message.id, ne.message.src), ne.message.tpe);
System.out.print(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second()));
mh_tpe.add(new Pair<>(new Pair<>(ne.message.id,ne.message.src),ne.message.tpe)); mh_tpe.add(new Pair<>(new Pair<>(ne.message.id,ne.message.src),ne.message.tpe));
} }
}, (t,ne) -> Logger.error("ERR"+t+" - "+ne.getMessage())); }, (t,ne) -> Logger.error("ERR"+t+" - "+ne.getMessage()));
@ -113,7 +117,7 @@ public class Main {
coordinator.waitOnBarrier(); coordinator.waitOnBarrier();
System.out.println("Broadcasting messages..."); System.out.println("Broadcasting messages...");
NetManager.send(); NetManager.startBroadcast();
while(!NetManager.isDone()) { while(!NetManager.isDone()) {
Thread.sleep(500); Thread.sleep(500);

View File

@ -31,8 +31,6 @@ public abstract class NetManager {
public static int INTERNAL_WAIT; public static int INTERNAL_WAIT;
public static int SL_WAIT;
public static int SL_MAX_TRIES;
public static int FD_WAIT; public static int FD_WAIT;
public static int FD_MAX_TRIES; public static int FD_MAX_TRIES;
@ -47,8 +45,8 @@ public abstract class NetManager {
private static final Map<Class<? extends NetEventHandlerAbstract>, NetEventHandlerAbstract> nm_listeners = new HashMap<>(); private static final Map<Class<? extends NetEventHandlerAbstract>, NetEventHandlerAbstract> nm_listeners = new HashMap<>();
private static ThreadPoolExecutor ex; private static ThreadPoolExecutor ex;
private static final ScheduledExecutorService ftex = Executors.newSingleThreadScheduledExecutor(); private static final ScheduledExecutorService i_tex = Executors.newSingleThreadScheduledExecutor();
private static final ScheduledExecutorService stex = Executors.newSingleThreadScheduledExecutor(); private static final ScheduledExecutorService fd_tex = Executors.newSingleThreadScheduledExecutor();
private static BiConsumer<NetEventType,NetEvent> onCompleteHandler; private static BiConsumer<NetEventType,NetEvent> onCompleteHandler;
private static BiConsumer<NetEventType,Exception> onErrorHandler; private static BiConsumer<NetEventType,Exception> onErrorHandler;
@ -87,13 +85,18 @@ public abstract class NetManager {
ex = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT+THREAD_BOOST_COUNT,30,TimeUnit.SECONDS,new LinkedBlockingQueue<>()); ex = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT+THREAD_BOOST_COUNT,30,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
ex.prestartAllCoreThreads(); ex.prestartAllCoreThreads();
ftex.scheduleAtFixedRate(() -> nm_listeners.values().parallelStream().forEach(neh->{ i_tex.scheduleAtFixedRate(()-> {
neh.deliverIf(); System.out.println("NetManager DeliverIf/BroadcastIf");
neh.sendIf(); nm_listeners.values().forEach(nmh-> {
}), 0, INTERNAL_WAIT, TimeUnit.MILLISECONDS); nmh.deliverIf();
nmh.broadcastIf();
});
}, 0, INTERNAL_WAIT, TimeUnit.MILLISECONDS);
stex.scheduleAtFixedRate(()-> nm_listeners.values().forEach(NetEventHandlerInterface::beat fd_tex.scheduleAtFixedRate(()-> {
), 0, FD_WAIT, TimeUnit.MILLISECONDS); System.out.println("NetManager HeartBeat");
nm_listeners.values().forEach(NetEventHandlerInterface::beat);
}, 0, FD_WAIT, TimeUnit.MILLISECONDS);
} }
@ -101,11 +104,12 @@ public abstract class NetManager {
* Stops the NetManager * Stops the NetManager
*/ */
public static void stop() { public static void stop() {
System.out.println("NetManager is stopping...");
isStopped = true; isStopped = true;
nm_listeners.values().forEach(NetEventHandlerAbstract::stop); nm_listeners.values().forEach(NetEventHandlerAbstract::stop);
ex.shutdown(); ex.shutdown();
ftex.shutdown(); i_tex.shutdown();
stex.shutdown(); fd_tex.shutdown();
System.out.println("NetManager handled "+ex.getCompletedTaskCount()+" tasks during this run."); System.out.println("NetManager handled "+ex.getCompletedTaskCount()+" tasks during this run.");
} }
@ -114,14 +118,24 @@ public abstract class NetManager {
* @return true if NM and NEH are done * @return true if NM and NEH are done
*/ */
public static boolean isDone() { public static boolean isDone() {
return isStopped || nm_listeners.values().stream().map(NetEventHandlerAbstract::isDone).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2); return isStopped || nm_listeners.values().stream().map(nmh ->{
if(!nmh.isDone()){
System.out.println("NetManager Waiting for: "+nmh.getClass().getSimpleName());
return false;
}
return true;
}).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2);
} }
//================================================================================================================= //=================================================================================================================
//================================================================================================================= //=================================================================================================================
public static void handle(Class<? extends NetEventHandlerAbstract> layer, NetEventType event, NetEvent ne){ public static void handleSync(Class<? extends NetEventHandlerAbstract> layer, NetEventType event, NetEvent ne){
new NetEventRunner(ne,layer,event).run();
}
public static void handleAsync(Class<? extends NetEventHandlerAbstract> layer, NetEventType event, NetEvent ne){
try { try {
ex.getQueue().add(new NetEventRunner(ne, layer, event)); ex.getQueue().add(new NetEventRunner(ne, layer, event));
}catch(IllegalStateException ise){ }catch(IllegalStateException ise){
@ -140,28 +154,34 @@ public abstract class NetManager {
//================================================================================================================= //=================================================================================================================
//================================================================================================================= //=================================================================================================================
public static void deliver(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) { public static void deliverAsync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
handle(layer,NetEventType.DLVR,ne); handleAsync(layer,NetEventType.DLVR,ne);
} }
public static void send(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) { public static void broadcastAsync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
handle(layer,NetEventType.SEND,ne); handleAsync(layer,NetEventType.BCST,ne);
} }
public static void crash(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) { public static void crashAsync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
handle(layer,NetEventType.CRSH,ne); handleAsync(layer,NetEventType.CRSH,ne);
}
public static void recoverAsync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
handleAsync(layer,NetEventType.RCVR,ne);
} }
public static void deliverSync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) { public static void deliverSync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
new NetEventRunner(ne,layer,NetEventType.DLVR).run(); handleSync(layer,NetEventType.DLVR,ne);
} }
public static void sendSync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) { public static void sendSync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
new NetEventRunner(ne,layer,NetEventType.SEND).run(); handleSync(layer,NetEventType.BCST,ne);
} }
public static void crashSync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) { public static void crashSync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
new NetEventRunner(ne,layer,NetEventType.CRSH).run(); handleSync(layer,NetEventType.CRSH,ne);
}
public static void recoverSync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
handleSync(layer,NetEventType.RCVR,ne);
} }
public static void send() { public static void startBroadcast() {
new NetEventRunner(NetEvent.EMPTY(), NetHandlerTOPL.class,NetEventType.SEND).run(); handleAsync(NetHandlerTOPL.class,NetEventType.BCST,NetEvent.EMPTY());
} }
@ -185,10 +205,9 @@ public abstract class NetManager {
@Override @Override
public void run() { public void run() {
Logger.debug(this.lt.getSimpleName()+"_"+this.net+ ": " + ((ne.peer==null)?"?":ne.peer.getId()) + " - "+ne.message.toString()+"");
NetEventHandlerAbstract nl = nm_listeners.getOrDefault(this.lt, null); NetEventHandlerAbstract nl = nm_listeners.getOrDefault(this.lt, null);
if (nl != null) { if (nl != null) {
Logger.debug(Thread.currentThread().getId()+"_"+this.lt.getSimpleName().replace("NetHandler","")+"_"+this.net+ ": " + ((ne.peer==null)?"?":ne.peer.getId()) + " - "+ne.message.toString()+"");
nl.onEvent(this.net,ne); nl.onEvent(this.net,ne);
}else{ }else{
error(this.net,new UnsupportedOperationException("No Handler for "+this.lt)); error(this.net,new UnsupportedOperationException("No Handler for "+this.lt));

View File

@ -8,7 +8,7 @@ package cs451.net.event;
*/ */
public enum NetEventType { public enum NetEventType {
DLVR, DLVR,
SEND, BCST,
CRSH, CRSH,
RCVR; RCVR;
} }

View File

@ -47,7 +47,7 @@ public abstract class NetEventHandlerAbstract implements NetEventHandlerInterfac
* Sends a NetEvent Synchronously * Sends a NetEvent Synchronously
* @param ne NetEvent * @param ne NetEvent
*/ */
public void sendNextSync(NetEvent ne){ public void broadcastNextSync(NetEvent ne){
NetManager.sendSync(broadcastLayer,ne); NetManager.sendSync(broadcastLayer,ne);
} }
/** /**
@ -57,27 +57,41 @@ public abstract class NetEventHandlerAbstract implements NetEventHandlerInterfac
public void crashNextSync(NetEvent ne){ public void crashNextSync(NetEvent ne){
NetManager.crashSync(deliverLayer,ne); NetManager.crashSync(deliverLayer,ne);
} }
/**
* Recovers a NetEvent Synchronously
* @param ne NetEvent
*/
public void recoverNextSync(NetEvent ne){
NetManager.recoverSync(deliverLayer,ne);
}
/** /**
* Delivers a NetEvent Asynchronously * Delivers a NetEvent Asynchronously
* @param ne NetEvent * @param ne NetEvent
*/ */
public void deliverNextAsync(NetEvent ne){ public void deliverNextAsync(NetEvent ne){
NetManager.deliver(deliverLayer,ne); NetManager.deliverAsync(deliverLayer,ne);
} }
/** /**
* Sends a NetEvent Asynchronously * Sends a NetEvent Asynchronously
* @param ne NetEvent * @param ne NetEvent
*/ */
public void sendNextAsync(NetEvent ne){ public void broadcastNextAsync(NetEvent ne){
NetManager.send(broadcastLayer,ne); NetManager.broadcastAsync(broadcastLayer,ne);
} }
/** /**
* Crashes a NetEvent Asynchronously * Crashes a NetEvent Asynchronously
* @param ne NetEvent * @param ne NetEvent
*/ */
public void crashNextAsync(NetEvent ne) { public void crashNextAsync(NetEvent ne) {
NetManager.crash(deliverLayer,ne); NetManager.crashAsync(deliverLayer,ne);
}
/**
* Recovers a NetEvent Asynchronously
* @param ne NetEvent
*/
public void recoverNextAsync(NetEvent ne) {
NetManager.recoverAsync(deliverLayer,ne);
} }
/** /**

View File

@ -17,7 +17,7 @@ public interface NetEventHandlerInterface {
/** /**
* Sends Messages on condition (500ms event timer) * Sends Messages on condition (500ms event timer)
*/ */
default void sendIf(){} default void broadcastIf(){}
/** /**
* Delivers Messages on condition (500ms event timer) * Delivers Messages on condition (500ms event timer)
*/ */
@ -38,12 +38,17 @@ public interface NetEventHandlerInterface {
* Sends a NetEvent to the next layer synchronously * Sends a NetEvent to the next layer synchronously
* @param ne NetEvent * @param ne NetEvent
*/ */
void sendNextSync(NetEvent ne); void broadcastNextSync(NetEvent ne);
/** /**
* Crashes a NetEvent to the next layer synchronously * Crashes a NetEvent to the next layer synchronously
* @param ne NetEvent * @param ne NetEvent
*/ */
void crashNextSync(NetEvent ne); void crashNextSync(NetEvent ne);
/**
* Recovers a NetEvent to the next layer synchronously
* @param ne NetEvent
*/
void recoverNextSync(NetEvent ne);
/** /**
* Delivers a NetEvent to the next layer asynchronously * Delivers a NetEvent to the next layer asynchronously
@ -54,12 +59,17 @@ public interface NetEventHandlerInterface {
* Sends a NetEvent to the next layer asynchronously * Sends a NetEvent to the next layer asynchronously
* @param ne NetEvent * @param ne NetEvent
*/ */
void sendNextAsync(NetEvent ne); void broadcastNextAsync(NetEvent ne);
/** /**
* Crashes a NetEvent to the next layer asynchronously * Crashes a NetEvent to the next layer asynchronously
* @param ne NetEvent * @param ne NetEvent
*/ */
void crashNextAsync(NetEvent ne); void crashNextAsync(NetEvent ne);
/**
* Recovers a NetEvent to the next layer asynchronously
* @param ne NetEvent
*/
void recoverNextAsync(NetEvent ne);
/** /**
* Handles a NetEvent * Handles a NetEvent
@ -70,11 +80,12 @@ public interface NetEventHandlerInterface {
switch (et){ switch (et){
case DLVR: deliver(ne); case DLVR: deliver(ne);
break; break;
case SEND: send(ne); case BCST: broadcast(ne);
break; break;
case CRSH: crash(ne); case CRSH: crash(ne);
break; break;
case RCVR: case RCVR: recover(ne);
break;
default: default:
Logger.error("Unhandled EventType:"+et); Logger.error("Unhandled EventType:"+et);
} }
@ -85,8 +96,8 @@ public interface NetEventHandlerInterface {
* Send Event Handler * Send Event Handler
* @param ne NetEvent * @param ne NetEvent
*/ */
default void send(NetEvent ne){ default void broadcast(NetEvent ne){
sendNextSync(ne); broadcastNextSync(ne);
} }
/** /**
* Deliver Event Handler * Deliver Event Handler
@ -102,6 +113,13 @@ public interface NetEventHandlerInterface {
default void crash(NetEvent ne){ default void crash(NetEvent ne){
crashNextSync(ne); crashNextSync(ne);
} }
/**
* Recover Event Handler
* @param ne NetEvent
*/
default void recover(NetEvent ne){
recoverNextSync(ne);
}
/** /**
* Starts the NetEventHandler * Starts the NetEventHandler

View File

@ -5,6 +5,7 @@ import cs451.parser.Host;
import cs451.parser.Parser; import cs451.parser.Parser;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
/** /**
* NetEventHandler for Best Effort Broadcast * NetEventHandler for Best Effort Broadcast
@ -23,21 +24,16 @@ public class NetHandlerBEB extends NetEventHandlerAbstract {
} }
@Override @Override
public void send(NetEvent ne) { public void broadcast(NetEvent ne) {
hosts.parallelStream().forEach(h ->{
if(this.h.getId() != h.getId()) {
sendNextAsync(NetEvent.Message(h, ne.message));
}else{
deliverNextAsync(NetEvent.Message(h,ne.message)); deliverNextAsync(NetEvent.Message(h,ne.message));
} hosts.parallelStream().forEach(h -> broadcastNextSync(NetEvent.Message(h, ne.message)));
});
} }
@Override @Override
public void start(Host h, Parser p) { public void start(Host h, Parser p) {
super.start(h,p); super.start(h,p);
this.h = h; this.h = h;
hosts = p.hosts(); hosts = p.hosts().stream().filter(ch -> ch.getId()!=h.getId()).collect(Collectors.toList());
} }
} }

View File

@ -18,8 +18,8 @@ public class NetHandlerDFLT extends NetEventHandlerAbstract {
} }
@Override @Override
public void send(NetEvent ne){ public void broadcast(NetEvent ne){
NetManager.error(NetEventType.SEND,new Exception("Default Handler")); NetManager.error(NetEventType.BCST,new Exception("Default Handler"));
} }
@Override @Override

View File

@ -27,24 +27,33 @@ public class NetHandlerFD extends NetEventHandlerAbstract {
} }
@Override @Override
public void beat() { public synchronized void beat() {
hosts.forEach(h-> alive.computeIfPresent(h.getId(),(k, v) -> { hosts.forEach(h-> {
if(v == -1) return 0; alive.computeIfPresent(h.getId(),(k, v) -> {
if(v > NetManager.FD_MAX_TRIES) if(v > -1) broadcastNextAsync(NetEvent.MessageHRTB(h));
crashNextAsync(NetEvent.Message(h, Message.EMPTY()));
else
sendNextSync(NetEvent.MessageHRTB(h));
return v+1; return v+1;
})); });
if(alive.getOrDefault(h.getId(),-1) >= NetManager.FD_MAX_TRIES){
crashNextAsync(NetEvent.Message(h, Message.EMPTY()));
alive.remove(h.getId());
}
});
}
@Override
public void broadcast(NetEvent ne) {
if (alive.containsKey(ne.peer.getId())){
broadcastNextSync(ne);
}
} }
@Override @Override
public void deliver(NetEvent ne) { public void deliver(NetEvent ne) {
alive.computeIfPresent(ne.peer.getId(),(k,v)->-1); alive.computeIfPresent(ne.peer.getId(),(k,v)->-1);
switch(ne.message.tpe){ if (ne.message.tpe != Message.TYPE.HRTB) {
case HRTB: deliverNextSync(ne);
break; }else{
default: deliverNextSync(ne); broadcastNextSync(NetEvent.MessageHRTB(ne.peer));
} }
} }
@ -52,6 +61,6 @@ public class NetHandlerFD extends NetEventHandlerAbstract {
public void start(Host h, Parser p) { public void start(Host h, Parser p) {
super.start(h, p); super.start(h, p);
hosts = p.hosts(); hosts = p.hosts();
hosts.stream().filter(ch->ch.getId()!=h.getId()).forEach(ch-> alive.put(ch.getId(),0)); hosts.stream().filter(ch->ch.getId()!=h.getId()).forEach(ch-> alive.put(ch.getId(),-1));
} }
} }

View File

@ -35,14 +35,14 @@ public class NetHandlerFIFO extends NetEventHandlerAbstract {
} }
@Override @Override
public void send(NetEvent ne) { public synchronized void broadcast(NetEvent ne) {
Integer snv = sn.getAndIncrement(); Integer snv = sn.getAndIncrement();
NetManager.complete(NetEventType.SEND, NetEvent.Message(me, snv, Message.TYPE.BCST)); NetManager.complete(NetEventType.BCST, NetEvent.Message(me, snv, Message.TYPE.BCST));
sendNextAsync(NetEvent.Message(me.getId(),snv)); broadcastNextAsync(NetEvent.Message(me.getId(),snv));
} }
@Override @Override
public void deliverIf(){ public synchronized void deliverIf(){
pending.removeIf(hmp -> { pending.removeIf(hmp -> {
Integer crsn = rsn.getOrDefault(hmp.src, 1); Integer crsn = rsn.getOrDefault(hmp.src, 1);
if (hmp.id.equals(crsn)) { if (hmp.id.equals(crsn)) {

View File

@ -38,9 +38,9 @@ public class NetHandlerSCKT extends NetEventHandlerAbstract {
} }
@Override @Override
public void send(NetEvent event) { public void broadcast(NetEvent event) {
if(socket.isClosed()){ if(socket.isClosed()){
NetManager.error(NetEventType.SEND,new SocketException("Socket Closed")); NetManager.error(NetEventType.BCST,new SocketException("Socket Closed"));
return; return;
} }
ByteBuffer b = ByteBuffer.allocate(BUFF_SIZE); ByteBuffer b = ByteBuffer.allocate(BUFF_SIZE);
@ -49,7 +49,7 @@ public class NetHandlerSCKT extends NetEventHandlerAbstract {
try { try {
socket.send(datagram); socket.send(datagram);
} catch (IOException e) { } catch (IOException e) {
NetManager.error(NetEventType.SEND,e); NetManager.error(NetEventType.BCST,e);
} }
} }

View File

@ -1,7 +1,6 @@
package cs451.net.handler; package cs451.net.handler;
import cs451.net.event.Message; import cs451.net.event.Message;
import cs451.net.NetManager;
import cs451.net.event.*; import cs451.net.event.*;
import cs451.parser.Host; import cs451.parser.Host;
import cs451.parser.Parser; import cs451.parser.Parser;
@ -9,7 +8,6 @@ import cs451.tools.Pair;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* NetEventHandler for Stuborn Link * NetEventHandler for Stuborn Link
@ -27,54 +25,34 @@ public class NetHandlerSL extends NetEventHandlerAbstract {
private List<Host> hosts; private List<Host> hosts;
Set<Integer> hasTimeout = ConcurrentHashMap.newKeySet(); Set<Integer> hasTimeout = ConcurrentHashMap.newKeySet();
PriorityBlockingQueue<Pair<Pair<Integer, AtomicInteger>,Message>> sending = new PriorityBlockingQueue<>(); PriorityBlockingQueue<Pair<Integer,Message>> sending = new PriorityBlockingQueue<>();
public NetHandlerSL(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) { public NetHandlerSL(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
super(deliverLayer,broadcastLayer); super(deliverLayer,broadcastLayer);
} }
@Override @Override
public void sendIf(){ public synchronized void broadcastIf(){
sending.forEach(ppm -> { sending.removeIf(ppm -> hasTimeout.contains(ppm.first()));
ppm.first().second().getAndIncrement(); sending.forEach(ppm -> hosts.stream().filter(hl->hl.getId()==ppm.first()).findFirst()
Optional<Host> h = hosts.stream().filter(hl->hl.getId()==ppm.first().first()).findFirst(); .ifPresent(host -> broadcastNextSync(NetEvent.Message(host, ppm.second()))));
h.ifPresent(host -> sendNextAsync(NetEvent.Message(host, ppm.second())));
});
sending.removeIf(ppm -> {
if(hasTimeout.contains(ppm.first().first())){
return true;
}
// if(ppm.first().second().get() > NetManager.SL_MAX_TRIES){
// hasTimeout.add(ppm.first().first());
// Optional<Host> h = hosts.stream().filter(hl->hl.getId()==ppm.first().first()).findFirst();
// h.ifPresent(host -> crashNextAsync(NetEvent.Message(host, ppm.second())));
// return true;
// }
return false;
});
} }
@Override @Override
public void send(NetEvent ne) { public void broadcast(NetEvent ne) {
if(hasTimeout.contains(ne.peer.getId())){ sending.add(new Pair<>(ne.peer.getId(),Message.MSG(ne.message)));
return; broadcastNextSync(ne);
}
sending.add(new Pair<>(new Pair<>(ne.peer.getId(), new AtomicInteger(1)),ne.message));
sendNextSync(ne);
} }
@Override @Override
public void deliver(NetEvent ne) { public void deliver(NetEvent ne) {
if(hasTimeout.contains(ne.peer.getId())){
return;
}
switch (ne.message.tpe){ switch (ne.message.tpe){
case ACK: case ACK:
sending.removeIf(ppm -> (ppm.first().first()==ne.peer.getId() && sending.removeIf(ppm -> (ne.peer.getId()==ppm.first() &&
ppm.second().equals(ne.message))); ppm.second().equals(ne.message)));
break; break;
case DATA: case DATA:
sendNextSync(NetEvent.MessageACK(ne.peer,ne.message)); broadcastNextSync(NetEvent.MessageACK(ne.peer,ne.message));
deliverNextSync(ne); deliverNextSync(ne);
break; break;
default: default:
@ -88,6 +66,13 @@ public class NetHandlerSL extends NetEventHandlerAbstract {
crashNextSync(ne); crashNextSync(ne);
} }
@Override
public void recover(NetEvent ne) {
hasTimeout.remove(ne.peer.getId());
recoverNextSync(ne);
}
@Override @Override
public boolean isDone() { public boolean isDone() {
return sending.isEmpty(); return sending.isEmpty();

View File

@ -5,6 +5,7 @@ import cs451.net.event.NetEvent;
import cs451.parser.Host; import cs451.parser.Host;
import cs451.parser.Parser; import cs451.parser.Parser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -23,6 +24,8 @@ public class NetHandlerTOPL extends NetEventHandlerAbstract {
private final AtomicInteger delivered = new AtomicInteger(0); private final AtomicInteger delivered = new AtomicInteger(0);
private final AtomicInteger waiting = new AtomicInteger(0); private final AtomicInteger waiting = new AtomicInteger(0);
private final AtomicBoolean status = new AtomicBoolean(false);
private Host me; private Host me;
public NetHandlerTOPL(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) { public NetHandlerTOPL(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
@ -30,27 +33,29 @@ public class NetHandlerTOPL extends NetEventHandlerAbstract {
} }
@Override @Override
public void sendIf() { public synchronized void broadcastIf() {
if( !status.get() ) return;
while (toSend.get() > 0 && waiting.get() < NetManager.WINDOW_WIDTH) { while (toSend.get() > 0 && waiting.get() < NetManager.WINDOW_WIDTH) {
toSend.decrementAndGet(); toSend.decrementAndGet();
waiting.incrementAndGet(); waiting.incrementAndGet();
sendNextSync(NetEvent.EMPTY()); broadcastNextSync(NetEvent.EMPTY());
} }
} }
@Override @Override
public void send(NetEvent ne) { public void broadcast(NetEvent ne) {
sendIf(); status.set(true);
broadcastIf();
} }
@Override @Override
public void deliver(NetEvent ne) { public synchronized void deliver(NetEvent ne) {
if (ne.message.src == me.getId()) { if (ne.message.src == me.getId()) {
delivered.incrementAndGet(); delivered.incrementAndGet();
waiting.decrementAndGet(); waiting.decrementAndGet();
} }
sendIf(); broadcastIf();
} }
@Override @Override

View File

@ -36,22 +36,22 @@ public class NetHandlerURB extends NetEventHandlerAbstract {
} }
@Override @Override
public void deliverIf(){ public synchronized void deliverIf(){
pending.stream().filter(smp -> ack.getOrDefault(smp,ConcurrentHashMap.newKeySet()).containsAll(correct)). pending.stream().filter(smp -> ack.getOrDefault(smp,ConcurrentHashMap.newKeySet()).containsAll(correct)).
filter(delivered::add). filter(delivered::add).
forEach(smp -> deliverNextAsync(NetEvent.Message(smp.first(),smp.second()))); forEach(smp -> deliverNextAsync(NetEvent.Message(smp.first(),smp.second())));
} }
@Override @Override
public void send(NetEvent ne) { public void broadcast(NetEvent ne) {
pending.add(new Pair<>(myId, ne.message.id)); pending.add(new Pair<>(myId, ne.message.id));
ne.message.src = myId; ne.message.src = myId;
sendNextSync(ne); broadcastNextSync(ne);
} }
@Override @Override
public void deliver(NetEvent ne) { public void deliver(NetEvent ne) {
Pair<Integer,Integer> smp =new Pair<>(ne.message.src,ne.message.id); Pair<Integer,Integer> smp = new Pair<>(ne.message.src,ne.message.id);
ack.compute(smp,(k, v) -> { ack.compute(smp,(k, v) -> {
if(v == null){ if(v == null){
v = ConcurrentHashMap.newKeySet(); v = ConcurrentHashMap.newKeySet();
@ -60,7 +60,7 @@ public class NetHandlerURB extends NetEventHandlerAbstract {
return v; return v;
}); });
if(pending.add(smp)){ if(pending.add(smp)){
sendNextSync(ne); broadcastNextAsync(ne);
} }
deliverIf(); deliverIf();
} }

View File

@ -59,7 +59,7 @@ public class Pair<S, T> implements Comparable<T> {
return false; return false;
} }
final Pair<S,T> other = (Pair<S,T>) obj; final Pair<S,T> other = (Pair<S,T>) obj;
return this.vl.equals(other.vl); return this.first().equals(other.first()) && this.second().equals(other.second());
} }
@Override @Override

View File

@ -30,8 +30,8 @@ public abstract class ParamDetector {
int messages = p.messageCount(); int messages = p.messageCount();
int processCount = p.hosts().size(); int processCount = p.hosts().size();
int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(4-(processCount/3.0))))); int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(5-(processCount/5.0)))));
int windowWidth = bound(coresPerProcess*windowWidthMult,2,messages); int windowWidth = bound(coresPerProcess*windowWidthMult,4,messages);
System.out.println("Process expected to broadcast "+messages+" messages."); System.out.println("Process expected to broadcast "+messages+" messages.");
System.out.println("Starting Process with WindowWidth of "+windowWidth+" (~ x"+windowWidthMult+")."); System.out.println("Starting Process with WindowWidth of "+windowWidth+" (~ x"+windowWidthMult+").");
@ -42,8 +42,5 @@ public abstract class ParamDetector {
//We might want to PingPong To set Custom Timing Limitations.... //We might want to PingPong To set Custom Timing Limitations....
NetManager.FD_MAX_TRIES = 10; NetManager.FD_MAX_TRIES = 10;
NetManager.FD_WAIT = 1000; NetManager.FD_WAIT = 1000;
NetManager.SL_MAX_TRIES = 8;
NetManager.SL_WAIT = NetManager.INTERNAL_WAIT;
} }
} }

View File

@ -16,9 +16,14 @@ RUN apt-get -y install file unzip zip xz-utils git \
ADD docker/apache-maven-3.6.3-bin.tar.gz /usr/local ADD docker/apache-maven-3.6.3-bin.tar.gz /usr/local
RUN ln -s /usr/local/apache-maven-3.6.3/bin/mvn /usr/local/bin RUN ln -s /usr/local/apache-maven-3.6.3/bin/mvn /usr/local/bin
COPY template_cpp /root/template_cpp #COPY template_cpp /root/template_cpp
COPY template_java /root/template_java #COPY template_java /root/template_java
COPY 257844 /root/257844
ADD barrier.py /root ADD barrier.py /root
ADD finishedSignal.py /root
ADD validate.py /root
ADD bnr.sh /root
RUN /root/template_cpp/build.sh && /root/template_cpp/cleanup.sh #RUN /root/template_cpp/build.sh && /root/template_cpp/cleanup.sh
RUN /root/template_java/build.sh && /root/template_java/cleanup.sh #RUN /root/template_java/build.sh && /root/template_java/cleanup.sh
RUN /root/257844/build.sh && /root/257844/cleanup.sh

View File

@ -1,3 +1,3 @@
#!/bin/bash #!/bin/bash
docker run -it da_image /bin/bash docker run -it --rm da_image /bin/bash