cleanup
This commit is contained in:
parent
88e8903832
commit
004e433b31
@ -8,6 +8,7 @@ import cs451.parser.Host;
|
|||||||
import cs451.parser.Parser;
|
import cs451.parser.Parser;
|
||||||
import cs451.tools.Logger;
|
import cs451.tools.Logger;
|
||||||
import cs451.tools.Pair;
|
import cs451.tools.Pair;
|
||||||
|
import cs451.tools.ParamDetector;
|
||||||
|
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -47,11 +48,11 @@ public class Main {
|
|||||||
//immediately stop network packet processing
|
//immediately stop network packet processing
|
||||||
System.out.println("Immediately stopping network packet processing.");
|
System.out.println("Immediately stopping network packet processing.");
|
||||||
|
|
||||||
|
|
||||||
|
System.out.println("Stopping NetManager");
|
||||||
NetManager.stop();
|
NetManager.stop();
|
||||||
|
|
||||||
//write/flush output file if necessary
|
|
||||||
System.out.println("Writing output.");
|
System.out.println("Writing output.");
|
||||||
|
|
||||||
try {
|
try {
|
||||||
writeOutput(parser.output());
|
writeOutput(parser.output());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@ -98,6 +99,7 @@ public class Main {
|
|||||||
}
|
}
|
||||||
Coordinator coordinator = new Coordinator(parser.myId(), parser.barrierIp(), parser.barrierPort(), parser.signalIp(), parser.signalPort());
|
Coordinator coordinator = new Coordinator(parser.myId(), parser.barrierIp(), parser.barrierPort(), parser.signalIp(), parser.signalPort());
|
||||||
|
|
||||||
|
ParamDetector.detectAndSet(me, parser);
|
||||||
NetManager.start(me,parser,(t,ne) -> {
|
NetManager.start(me,parser,(t,ne) -> {
|
||||||
Logger.error(t+"(" + ne.message.src + "):" + ne.message.toString());
|
Logger.error(t+"(" + ne.message.src + "):" + ne.message.toString());
|
||||||
if(t == NetEventType.DLVR){
|
if(t == NetEventType.DLVR){
|
||||||
@ -119,18 +121,9 @@ public class Main {
|
|||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
}
|
}
|
||||||
|
|
||||||
System.out.println("Stopping NetManager");
|
|
||||||
NetManager.stop();
|
|
||||||
|
|
||||||
System.out.println("Signaling end of broadcasting messages");
|
System.out.println("Signaling end of broadcasting messages");
|
||||||
coordinator.finishedBroadcasting();
|
coordinator.finishedBroadcasting();
|
||||||
|
|
||||||
try {
|
|
||||||
writeOutput(parser.output());
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
// Sleep for 1 hour
|
// Sleep for 1 hour
|
||||||
Thread.sleep(60 * 60 * 1000);
|
Thread.sleep(60 * 60 * 1000);
|
||||||
|
@ -15,19 +15,33 @@ import java.util.function.BiConsumer;
|
|||||||
|
|
||||||
|
|
||||||
public abstract class NetManager {
|
public abstract class NetManager {
|
||||||
private static final Integer THREAD_COUNT = 8;
|
|
||||||
|
public static int INTERNAL_WAIT;
|
||||||
|
|
||||||
|
public static int SL_WAIT;
|
||||||
|
public static int SL_MAX_TRIES;
|
||||||
|
public static int FD_WAIT;
|
||||||
|
public static int FD_MAX_TRIES;
|
||||||
|
|
||||||
|
public static Integer THREAD_COUNT;
|
||||||
|
public static Integer THREAD_BOOST_COUNT;
|
||||||
|
|
||||||
|
public static Integer WINDOW_WIDTH;
|
||||||
|
|
||||||
|
|
||||||
private static boolean isStopped = false;
|
private static boolean isStopped = false;
|
||||||
|
|
||||||
private static final Map<Class<? extends NetEventHandler>, NetEventHandler> nm_listeners = new HashMap<>();
|
private static final Map<Class<? extends NetEventHandlerAbstract>, NetEventHandlerAbstract> nm_listeners = new HashMap<>();
|
||||||
|
|
||||||
private static final ThreadPoolExecutor ex = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT,60,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
|
private static ThreadPoolExecutor ex;
|
||||||
|
private static final ScheduledExecutorService ftex = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
private static final ScheduledExecutorService stex = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
|
||||||
private static BiConsumer<NetEventType,NetEvent> onCompleteHandler;
|
private static BiConsumer<NetEventType,NetEvent> onCompleteHandler;
|
||||||
private static BiConsumer<NetEventType,Exception> onErrorHandler;
|
private static BiConsumer<NetEventType,Exception> onErrorHandler;
|
||||||
|
|
||||||
|
|
||||||
private static void registerNetHandler(NetEventHandler handler){
|
private static void registerNetHandler(NetEventHandlerAbstract handler){
|
||||||
nm_listeners.put(handler.getClass(),handler);
|
nm_listeners.put(handler.getClass(),handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -35,8 +49,9 @@ public abstract class NetManager {
|
|||||||
onCompleteHandler = och;
|
onCompleteHandler = och;
|
||||||
onErrorHandler = oeh;
|
onErrorHandler = oeh;
|
||||||
|
|
||||||
registerNetHandler(new NetHandlerSCKT( NetHandlerSL.class, NetHandlerDFLT.class));
|
registerNetHandler(new NetHandlerSCKT( NetHandlerFD.class, NetHandlerDFLT.class));
|
||||||
registerNetHandler(new NetHandlerSL( NetHandlerPL.class, NetHandlerSCKT.class));
|
registerNetHandler(new NetHandlerFD( NetHandlerSL.class, NetHandlerSCKT.class));
|
||||||
|
registerNetHandler(new NetHandlerSL( NetHandlerPL.class, NetHandlerFD.class));
|
||||||
registerNetHandler(new NetHandlerPL( NetHandlerBEB.class, NetHandlerSL.class));
|
registerNetHandler(new NetHandlerPL( NetHandlerBEB.class, NetHandlerSL.class));
|
||||||
registerNetHandler(new NetHandlerBEB( NetHandlerURB.class, NetHandlerPL.class));
|
registerNetHandler(new NetHandlerBEB( NetHandlerURB.class, NetHandlerPL.class));
|
||||||
registerNetHandler(new NetHandlerURB( NetHandlerFIFO.class, NetHandlerBEB.class));
|
registerNetHandler(new NetHandlerURB( NetHandlerFIFO.class, NetHandlerBEB.class));
|
||||||
@ -45,25 +60,36 @@ public abstract class NetManager {
|
|||||||
registerNetHandler(new NetHandlerDFLT( NetHandlerDFLT.class, NetHandlerDFLT.class));
|
registerNetHandler(new NetHandlerDFLT( NetHandlerDFLT.class, NetHandlerDFLT.class));
|
||||||
|
|
||||||
nm_listeners.values().forEach(neh -> neh.start(h,p));
|
nm_listeners.values().forEach(neh -> neh.start(h,p));
|
||||||
|
ex = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT+THREAD_BOOST_COUNT,30,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
|
||||||
ex.prestartAllCoreThreads();
|
ex.prestartAllCoreThreads();
|
||||||
|
|
||||||
|
ftex.scheduleAtFixedRate(() -> nm_listeners.values().parallelStream().forEach(neh->{
|
||||||
|
neh.deliverIf();
|
||||||
|
neh.sendIf();
|
||||||
|
}), 0, INTERNAL_WAIT, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
stex.scheduleAtFixedRate(()-> nm_listeners.values().forEach(NetEventHandlerInterface::beat
|
||||||
|
), 0, FD_WAIT, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void stop() {
|
public static void stop() {
|
||||||
isStopped = true;
|
isStopped = true;
|
||||||
nm_listeners.values().forEach(NetEventHandler::stop);
|
nm_listeners.values().forEach(NetEventHandlerAbstract::stop);
|
||||||
ex.shutdown();
|
ex.shutdown();
|
||||||
|
ftex.shutdown();
|
||||||
|
stex.shutdown();
|
||||||
System.out.println("NetManager handled "+ex.getCompletedTaskCount()+" tasks during this run.");
|
System.out.println("NetManager handled "+ex.getCompletedTaskCount()+" tasks during this run.");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static boolean isDone() {
|
public static boolean isDone() {
|
||||||
return isStopped || nm_listeners.values().stream().map(NetEventHandler::isDone).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2);
|
return isStopped || nm_listeners.values().stream().map(NetEventHandlerAbstract::isDone).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
//=================================================================================================================
|
//=================================================================================================================
|
||||||
//=================================================================================================================
|
//=================================================================================================================
|
||||||
|
|
||||||
public static void handle(Class<? extends NetEventHandler> layer, NetEventType event, NetEvent ne){
|
public static void handle(Class<? extends NetEventHandlerAbstract> layer, NetEventType event, NetEvent ne){
|
||||||
try {
|
try {
|
||||||
ex.getQueue().add(new NetEventRunner(ne, layer, event));
|
ex.getQueue().add(new NetEventRunner(ne, layer, event));
|
||||||
}catch(IllegalStateException ise){
|
}catch(IllegalStateException ise){
|
||||||
@ -82,28 +108,30 @@ public abstract class NetManager {
|
|||||||
//=================================================================================================================
|
//=================================================================================================================
|
||||||
//=================================================================================================================
|
//=================================================================================================================
|
||||||
|
|
||||||
public static void deliver(Class<? extends NetEventHandler> layer, NetEvent ne) {
|
public static void deliver(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
|
||||||
handle(layer,NetEventType.DLVR,ne);
|
handle(layer,NetEventType.DLVR,ne);
|
||||||
}
|
}
|
||||||
|
public static void send(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
|
||||||
public static void send(Class<? extends NetEventHandler> layer, NetEvent ne) {
|
|
||||||
handle(layer,NetEventType.SEND,ne);
|
handle(layer,NetEventType.SEND,ne);
|
||||||
}
|
}
|
||||||
|
public static void crash(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
|
||||||
public static void crash(Class<? extends NetEventHandler> layer, NetEvent ne) {
|
|
||||||
handle(layer,NetEventType.CRSH,ne);
|
handle(layer,NetEventType.CRSH,ne);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void send() {
|
public static void deliverSync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
|
||||||
new NetEventRunner(NetEvent.EMPTY(),NetHandlerTOPL.class,NetEventType.SEND).run();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void deliverSync(Class<? extends NetEventHandler> layer, NetEvent ne) {
|
|
||||||
new NetEventRunner(ne,layer,NetEventType.DLVR).run();
|
new NetEventRunner(ne,layer,NetEventType.DLVR).run();
|
||||||
}
|
}
|
||||||
public static void sendSync(Class<? extends NetEventHandler> layer, NetEvent ne) {
|
public static void sendSync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
|
||||||
new NetEventRunner(ne,layer,NetEventType.SEND).run();
|
new NetEventRunner(ne,layer,NetEventType.SEND).run();
|
||||||
}
|
}
|
||||||
|
public static void crashSync(Class<? extends NetEventHandlerAbstract> layer, NetEvent ne) {
|
||||||
|
new NetEventRunner(ne,layer,NetEventType.CRSH).run();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void send() {
|
||||||
|
new NetEventRunner(NetEvent.EMPTY(), NetHandlerTOPL.class,NetEventType.SEND).run();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
//=================================================================================================================
|
//=================================================================================================================
|
||||||
//=================================================================================================================
|
//=================================================================================================================
|
||||||
@ -111,10 +139,10 @@ public abstract class NetManager {
|
|||||||
private static class NetEventRunner implements Runnable {
|
private static class NetEventRunner implements Runnable {
|
||||||
|
|
||||||
private final NetEvent ne;
|
private final NetEvent ne;
|
||||||
private final Class<? extends NetEventHandler> lt;
|
private final Class<? extends NetEventHandlerAbstract> lt;
|
||||||
private final NetEventType net;
|
private final NetEventType net;
|
||||||
|
|
||||||
private NetEventRunner(NetEvent ne, Class<? extends NetEventHandler> lt, NetEventType net) {
|
private NetEventRunner(NetEvent ne, Class<? extends NetEventHandlerAbstract> lt, NetEventType net) {
|
||||||
this.ne = ne;
|
this.ne = ne;
|
||||||
this.lt = lt;
|
this.lt = lt;
|
||||||
this.net = net;
|
this.net = net;
|
||||||
@ -124,7 +152,7 @@ public abstract class NetManager {
|
|||||||
public void run() {
|
public void run() {
|
||||||
Logger.debug(this.lt.getSimpleName()+"_"+this.net+ ": " + ((ne.peer==null)?"?":ne.peer.getId()) + " - "+ne.message.toString()+"");
|
Logger.debug(this.lt.getSimpleName()+"_"+this.net+ ": " + ((ne.peer==null)?"?":ne.peer.getId()) + " - "+ne.message.toString()+"");
|
||||||
|
|
||||||
NetEventHandler nl = nm_listeners.getOrDefault(this.lt, null);
|
NetEventHandlerAbstract nl = nm_listeners.getOrDefault(this.lt, null);
|
||||||
if (nl != null) {
|
if (nl != null) {
|
||||||
nl.onEvent(this.net,ne);
|
nl.onEvent(this.net,ne);
|
||||||
}else{
|
}else{
|
||||||
|
@ -4,14 +4,15 @@ import java.nio.ByteBuffer;
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
public class Message implements Comparable {
|
public class Message implements Comparable<Message> {
|
||||||
|
|
||||||
public enum TYPE {
|
public enum TYPE {
|
||||||
NONE("NONE", 'N'),
|
NONE("NONE", 'N'),
|
||||||
ERR("ERR", 'E'),
|
ERR("ERR", 'E'),
|
||||||
|
BCST("BCST", 'B'),
|
||||||
DATA("DATA", ' '),
|
DATA("DATA", ' '),
|
||||||
ACK("ACK", 'A'),
|
ACK("ACK", 'A'),
|
||||||
BCST("BCST", 'B');
|
HRTB("HRTB",'H');
|
||||||
|
|
||||||
public final Character c;
|
public final Character c;
|
||||||
public final String tag;
|
public final String tag;
|
||||||
@ -22,8 +23,8 @@ public class Message implements Comparable {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public int src = -1;
|
public Integer src = -1;
|
||||||
public final int id;
|
public final Integer id;
|
||||||
public TYPE tpe;
|
public TYPE tpe;
|
||||||
|
|
||||||
public static Message DMSG(Integer id){
|
public static Message DMSG(Integer id){
|
||||||
@ -33,6 +34,10 @@ public class Message implements Comparable {
|
|||||||
return new Message(id,TYPE.DATA,src);
|
return new Message(id,TYPE.DATA,src);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Message HRTB(){
|
||||||
|
return new Message(0,TYPE.HRTB);
|
||||||
|
}
|
||||||
|
|
||||||
public static Message TMSG(Integer mess_id, TYPE tpe) {
|
public static Message TMSG(Integer mess_id, TYPE tpe) {
|
||||||
return new Message(mess_id, tpe);
|
return new Message(mess_id, tpe);
|
||||||
}
|
}
|
||||||
@ -94,7 +99,7 @@ public class Message implements Comparable {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
final Message m = (Message) obj;
|
final Message m = (Message) obj;
|
||||||
return id == m.id && src == m.src;
|
return id.equals(m.id) && src.equals(m.src);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -103,8 +108,7 @@ public class Message implements Comparable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int compareTo(Object obj) {
|
public int compareTo(Message m) {
|
||||||
final Message m = (Message) obj;
|
return id.compareTo(m.id);
|
||||||
return id-m.id;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -43,6 +43,10 @@ public class NetEvent {
|
|||||||
return ne;
|
return ne;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static NetEvent MessageHRTB(Host peer){
|
||||||
|
return NetEvent.Message(peer,Message.HRTB());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static NetEvent EMPTY(){
|
public static NetEvent EMPTY(){
|
||||||
return new NetEvent(NO_PEER, Message.EMPTY());
|
return new NetEvent(NO_PEER, Message.EMPTY());
|
||||||
|
@ -1,84 +0,0 @@
|
|||||||
package cs451.net.handler;
|
|
||||||
|
|
||||||
import cs451.net.NetManager;
|
|
||||||
import cs451.net.event.NetEvent;
|
|
||||||
import cs451.net.event.NetEventType;
|
|
||||||
import cs451.parser.Host;
|
|
||||||
import cs451.parser.Parser;
|
|
||||||
import cs451.tools.Logger;
|
|
||||||
|
|
||||||
public abstract class NetEventHandler {
|
|
||||||
|
|
||||||
/*
|
|
||||||
* 1 - 50ms - 50ms
|
|
||||||
* 2 - 400ms - 450ms
|
|
||||||
* 3 - 1350ms - 1800ms
|
|
||||||
* 4 - 3200ms - 5000ms
|
|
||||||
*/
|
|
||||||
public static final int MS_WAIT = 50;
|
|
||||||
public static final int MAX_TRIES = 5;
|
|
||||||
|
|
||||||
public static final Integer SEND_WINDOW = 3;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private final Class<? extends NetEventHandler> deliverLayer;
|
|
||||||
private final Class<? extends NetEventHandler> broadcastLayer;
|
|
||||||
|
|
||||||
|
|
||||||
NetEventHandler(Class<? extends NetEventHandler> deliverLayer, Class<? extends NetEventHandler> broadcastLayer) {
|
|
||||||
this.deliverLayer = deliverLayer;
|
|
||||||
this.broadcastLayer = broadcastLayer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void deliverNextSync(NetEvent ne){
|
|
||||||
NetManager.deliverSync(deliverLayer,ne);
|
|
||||||
}
|
|
||||||
public void sendNextSync(NetEvent ne){
|
|
||||||
NetManager.sendSync(broadcastLayer,ne);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void deliverNext(NetEvent ne){
|
|
||||||
NetManager.deliver(deliverLayer,ne);
|
|
||||||
}
|
|
||||||
public void sendNext(NetEvent ne){
|
|
||||||
NetManager.send(broadcastLayer,ne);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void onEvent(NetEventType et, NetEvent ne){
|
|
||||||
switch (et){
|
|
||||||
case DLVR: deliver(ne);
|
|
||||||
break;
|
|
||||||
case SEND: send(ne);
|
|
||||||
break;
|
|
||||||
case CRSH: crash(ne);
|
|
||||||
break;
|
|
||||||
case RCVR: recover(ne);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
Logger.error("Unhandled EventType:"+et);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void send(NetEvent ne){
|
|
||||||
NetManager.error(NetEventType.SEND,new Exception("Default Handler"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void deliver(NetEvent ne){
|
|
||||||
NetManager.error(NetEventType.DLVR,new Exception("Default Handler"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void crash(NetEvent ne){
|
|
||||||
NetManager.error(NetEventType.CRSH,new Exception("Default Handler"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void recover(NetEvent ne){
|
|
||||||
NetManager.error(NetEventType.RCVR,new Exception("Default Handler"));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start(Host h, Parser p) {}
|
|
||||||
|
|
||||||
public void stop() {}
|
|
||||||
|
|
||||||
public boolean isDone(){ return true;}
|
|
||||||
}
|
|
@ -0,0 +1,56 @@
|
|||||||
|
package cs451.net.handler;
|
||||||
|
|
||||||
|
import cs451.net.NetManager;
|
||||||
|
import cs451.net.event.NetEvent;
|
||||||
|
import cs451.parser.Host;
|
||||||
|
import cs451.parser.Parser;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
public abstract class NetEventHandlerAbstract implements NetEventHandlerInterface {
|
||||||
|
|
||||||
|
private final Class<? extends NetEventHandlerAbstract> deliverLayer;
|
||||||
|
private final Class<? extends NetEventHandlerAbstract> broadcastLayer;
|
||||||
|
|
||||||
|
public final AtomicBoolean active = new AtomicBoolean(true);
|
||||||
|
|
||||||
|
|
||||||
|
NetEventHandlerAbstract(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
|
||||||
|
this.deliverLayer = deliverLayer;
|
||||||
|
this.broadcastLayer = broadcastLayer;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public void deliverNextSync(NetEvent ne){
|
||||||
|
NetManager.deliverSync(deliverLayer,ne);
|
||||||
|
}
|
||||||
|
public void sendNextSync(NetEvent ne){
|
||||||
|
NetManager.sendSync(broadcastLayer,ne);
|
||||||
|
}
|
||||||
|
public void crashNextSync(NetEvent ne){
|
||||||
|
NetManager.crashSync(deliverLayer,ne);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void deliverNextAsync(NetEvent ne){
|
||||||
|
NetManager.deliver(deliverLayer,ne);
|
||||||
|
}
|
||||||
|
public void sendNextAsync(NetEvent ne){
|
||||||
|
NetManager.send(broadcastLayer,ne);
|
||||||
|
}
|
||||||
|
public void crashNextAsync(NetEvent ne) {
|
||||||
|
NetManager.crash(deliverLayer,ne);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void start(Host h, Parser p) {
|
||||||
|
Objects.requireNonNull(h);
|
||||||
|
Objects.requireNonNull(p);
|
||||||
|
active.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stop() {
|
||||||
|
active.set(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,52 @@
|
|||||||
|
package cs451.net.handler;
|
||||||
|
|
||||||
|
import cs451.net.event.NetEvent;
|
||||||
|
import cs451.net.event.NetEventType;
|
||||||
|
import cs451.parser.Host;
|
||||||
|
import cs451.parser.Parser;
|
||||||
|
import cs451.tools.Logger;
|
||||||
|
|
||||||
|
public interface NetEventHandlerInterface {
|
||||||
|
|
||||||
|
|
||||||
|
default void sendIf(){}
|
||||||
|
default void deliverIf(){}
|
||||||
|
default void beat(){}
|
||||||
|
|
||||||
|
void deliverNextSync(NetEvent ne);
|
||||||
|
void sendNextSync(NetEvent ne);
|
||||||
|
void crashNextSync(NetEvent ne);
|
||||||
|
|
||||||
|
void deliverNextAsync(NetEvent ne);
|
||||||
|
void sendNextAsync(NetEvent ne);
|
||||||
|
void crashNextAsync(NetEvent ne);
|
||||||
|
|
||||||
|
default void onEvent(NetEventType et, NetEvent ne){
|
||||||
|
switch (et){
|
||||||
|
case DLVR: deliver(ne);
|
||||||
|
break;
|
||||||
|
case SEND: send(ne);
|
||||||
|
break;
|
||||||
|
case CRSH: crash(ne);
|
||||||
|
break;
|
||||||
|
case RCVR:
|
||||||
|
default:
|
||||||
|
Logger.error("Unhandled EventType:"+et);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
default void send(NetEvent ne){
|
||||||
|
sendNextSync(ne);
|
||||||
|
}
|
||||||
|
default void deliver(NetEvent ne){
|
||||||
|
deliverNextSync(ne);
|
||||||
|
}
|
||||||
|
default void crash(NetEvent ne){
|
||||||
|
crashNextSync(ne);
|
||||||
|
}
|
||||||
|
|
||||||
|
void start(Host h, Parser p);
|
||||||
|
void stop();
|
||||||
|
|
||||||
|
default boolean isDone(){ return true;}
|
||||||
|
}
|
@ -6,33 +6,29 @@ import cs451.parser.Parser;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class NetHandlerBEB extends NetEventHandler {
|
public class NetHandlerBEB extends NetEventHandlerAbstract {
|
||||||
|
|
||||||
private List<Host> hosts;
|
private List<Host> hosts;
|
||||||
private Host h;
|
private Host h;
|
||||||
|
|
||||||
public NetHandlerBEB(Class<? extends NetEventHandler> deliverLayer, Class<? extends NetEventHandler> broadcastLayer) {
|
public NetHandlerBEB(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
|
||||||
super(deliverLayer,broadcastLayer);
|
super(deliverLayer,broadcastLayer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void send(NetEvent ne) {
|
public void send(NetEvent ne) {
|
||||||
for(Host h : hosts){
|
hosts.parallelStream().forEach(h ->{
|
||||||
if(this.h.getId() != h.getId()) {
|
if(this.h.getId() != h.getId()) {
|
||||||
sendNext(NetEvent.Message(h, ne.message));
|
sendNextAsync(NetEvent.Message(h, ne.message));
|
||||||
}else{
|
}else{
|
||||||
deliverNext(NetEvent.Message(h,ne.message));
|
deliverNextAsync(NetEvent.Message(h,ne.message));
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deliver(NetEvent ne) {
|
|
||||||
deliverNext(ne);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start(Host h, Parser p) {
|
public void start(Host h, Parser p) {
|
||||||
|
super.start(h,p);
|
||||||
this.h = h;
|
this.h = h;
|
||||||
hosts = p.hosts();
|
hosts = p.hosts();
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,28 @@
|
|||||||
package cs451.net.handler;
|
package cs451.net.handler;
|
||||||
|
|
||||||
public class NetHandlerDFLT extends NetEventHandler {
|
import cs451.net.NetManager;
|
||||||
|
import cs451.net.event.NetEvent;
|
||||||
|
import cs451.net.event.NetEventType;
|
||||||
|
|
||||||
public NetHandlerDFLT(Class<? extends NetEventHandler> deliverLayer, Class<? extends NetEventHandler> broadcastLayer) {
|
public class NetHandlerDFLT extends NetEventHandlerAbstract {
|
||||||
|
|
||||||
|
public NetHandlerDFLT(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
|
||||||
super(deliverLayer,broadcastLayer);
|
super(deliverLayer,broadcastLayer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void send(NetEvent ne){
|
||||||
|
NetManager.error(NetEventType.SEND,new Exception("Default Handler"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deliver(NetEvent ne){
|
||||||
|
NetManager.error(NetEventType.DLVR,new Exception("Default Handler"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void crash(NetEvent ne){
|
||||||
|
NetManager.error(NetEventType.CRSH,new Exception("Default Handler"));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
56
257844/src/main/java/cs451/net/handler/NetHandlerFD.java
Normal file
56
257844/src/main/java/cs451/net/handler/NetHandlerFD.java
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
package cs451.net.handler;
|
||||||
|
|
||||||
|
import cs451.net.NetManager;
|
||||||
|
import cs451.net.event.Message;
|
||||||
|
import cs451.net.event.NetEvent;
|
||||||
|
import cs451.parser.Host;
|
||||||
|
import cs451.parser.Parser;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
public class NetHandlerFD extends NetEventHandlerAbstract {
|
||||||
|
|
||||||
|
private Host me;
|
||||||
|
private List<Host> hosts;
|
||||||
|
private static final ConcurrentHashMap<Integer,Integer> alive = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public NetHandlerFD(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
|
||||||
|
super(deliverLayer, broadcastLayer);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void beat() {
|
||||||
|
hosts.forEach(h->{
|
||||||
|
if(h.getId() != me.getId()){
|
||||||
|
alive.computeIfPresent(h.getId(),(k, v) -> {
|
||||||
|
if(v == 0) return 1;
|
||||||
|
|
||||||
|
if(v > NetManager.FD_MAX_TRIES)
|
||||||
|
crashNextAsync(NetEvent.Message(h, Message.EMPTY()));
|
||||||
|
else
|
||||||
|
sendNextSync(NetEvent.MessageHRTB(h));
|
||||||
|
|
||||||
|
return v+1;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deliver(NetEvent ne) {
|
||||||
|
alive.computeIfPresent(ne.peer.getId(),(k,v)->0);
|
||||||
|
switch(ne.message.tpe){
|
||||||
|
case HRTB:
|
||||||
|
break;
|
||||||
|
default: deliverNextSync(ne);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start(Host h, Parser p) {
|
||||||
|
super.start(h, p);
|
||||||
|
hosts = p.hosts();
|
||||||
|
me = h;
|
||||||
|
hosts.forEach(ch-> alive.put(ch.getId(),0));
|
||||||
|
}
|
||||||
|
}
|
@ -12,7 +12,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||||||
import java.util.concurrent.PriorityBlockingQueue;
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public class NetHandlerFIFO extends NetEventHandler {
|
public class NetHandlerFIFO extends NetEventHandlerAbstract {
|
||||||
|
|
||||||
private final AtomicInteger sn = new AtomicInteger(1);
|
private final AtomicInteger sn = new AtomicInteger(1);
|
||||||
private final Map<Integer,Integer> rsn = new ConcurrentHashMap<>();
|
private final Map<Integer,Integer> rsn = new ConcurrentHashMap<>();
|
||||||
@ -20,7 +20,7 @@ public class NetHandlerFIFO extends NetEventHandler {
|
|||||||
|
|
||||||
private Host me;
|
private Host me;
|
||||||
|
|
||||||
public NetHandlerFIFO(Class<? extends NetEventHandler> deliverLayer, Class<? extends NetEventHandler> broadcastLayer) {
|
public NetHandlerFIFO(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
|
||||||
super(deliverLayer,broadcastLayer);
|
super(deliverLayer,broadcastLayer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -28,22 +28,21 @@ public class NetHandlerFIFO extends NetEventHandler {
|
|||||||
public void send(NetEvent ne) {
|
public void send(NetEvent ne) {
|
||||||
Integer snv = sn.getAndIncrement();
|
Integer snv = sn.getAndIncrement();
|
||||||
NetManager.complete(NetEventType.SEND, NetEvent.Message(me, snv, Message.TYPE.BCST));
|
NetManager.complete(NetEventType.SEND, NetEvent.Message(me, snv, Message.TYPE.BCST));
|
||||||
sendNext(NetEvent.Message(me.getId(),snv));
|
sendNextAsync(NetEvent.Message(me.getId(),snv));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deliverIf(){
|
@Override
|
||||||
synchronized (pending) {
|
public void deliverIf(){
|
||||||
pending.removeIf(hmp -> {
|
pending.removeIf(hmp -> {
|
||||||
Integer crsn = rsn.getOrDefault(hmp.src, 1);
|
Integer crsn = rsn.getOrDefault(hmp.src, 1);
|
||||||
if (hmp.id == crsn) {
|
if (hmp.id.equals(crsn)) {
|
||||||
deliverNextSync(NetEvent.Message(hmp));
|
NetManager.complete(NetEventType.DLVR,NetEvent.Message(hmp));
|
||||||
NetManager.complete(NetEventType.DLVR,NetEvent.Message(hmp));
|
deliverNextSync(NetEvent.Message(hmp));
|
||||||
rsn.put(hmp.src, crsn + 1);
|
rsn.put(hmp.src, crsn + 1);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
});
|
});
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -54,6 +53,7 @@ public class NetHandlerFIFO extends NetEventHandler {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start(Host h, Parser p) {
|
public void start(Host h, Parser p) {
|
||||||
|
super.start(h,p);
|
||||||
me = h;
|
me = h;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,23 +7,18 @@ import java.util.Set;
|
|||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
|
||||||
public class NetHandlerPL extends NetEventHandler {
|
public class NetHandlerPL extends NetEventHandlerAbstract {
|
||||||
|
|
||||||
private final Set<Pair<Integer,Message>> delivered = ConcurrentHashMap.newKeySet();
|
private final Set<Pair<Integer,Message>> delivered = ConcurrentHashMap.newKeySet();
|
||||||
|
|
||||||
public NetHandlerPL(Class<? extends NetEventHandler> deliverLayer, Class<? extends NetEventHandler> broadcastLayer) {
|
public NetHandlerPL(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
|
||||||
super(deliverLayer,broadcastLayer);
|
super(deliverLayer,broadcastLayer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void send(NetEvent event) {
|
|
||||||
sendNext(event);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deliver(NetEvent event) {
|
public void deliver(NetEvent event) {
|
||||||
if(delivered.add(new Pair<>(event.peer.getId(),event.message))){
|
if(delivered.add(new Pair<>(event.peer.getId(),event.message))){
|
||||||
deliverNext(event);
|
deliverNextSync(event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,21 +10,22 @@ import java.io.IOException;
|
|||||||
import java.net.*;
|
import java.net.*;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
public class NetHandlerSCKT extends NetEventHandler {
|
public class NetHandlerSCKT extends NetEventHandlerAbstract {
|
||||||
|
|
||||||
private static final Integer BUFF_SIZE = 128;
|
private static final Integer BUFF_SIZE = 128;
|
||||||
|
|
||||||
|
private static final ExecutorService tex = Executors.newSingleThreadExecutor();
|
||||||
|
|
||||||
private List<Host> hosts;
|
private List<Host> hosts;
|
||||||
|
|
||||||
private DatagramSocket socket = null;
|
private DatagramSocket socket = null;
|
||||||
private final AtomicBoolean receiving = new AtomicBoolean(true);
|
|
||||||
|
|
||||||
|
|
||||||
public NetHandlerSCKT(Class<? extends NetEventHandler> deliverLayer, Class<? extends NetEventHandler> broadcastLayer) {
|
public NetHandlerSCKT(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
|
||||||
super(deliverLayer,broadcastLayer);
|
super(deliverLayer,broadcastLayer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,7 +58,7 @@ public class NetHandlerSCKT extends NetEventHandler {
|
|||||||
socket.receive(datagram);
|
socket.receive(datagram);
|
||||||
Optional<Integer> rhid = hosts.stream().filter(h -> h.getAddr().equals(datagram.getSocketAddress())).map(Host::getId).findFirst();
|
Optional<Integer> rhid = hosts.stream().filter(h -> h.getAddr().equals(datagram.getSocketAddress())).map(Host::getId).findFirst();
|
||||||
|
|
||||||
deliverNext(NetEvent.Message(
|
deliverNextAsync(NetEvent.Message(
|
||||||
new Host((InetSocketAddress) datagram.getSocketAddress(), rhid.orElse(-1)),
|
new Host((InetSocketAddress) datagram.getSocketAddress(), rhid.orElse(-1)),
|
||||||
Message.FromBuffer(ByteBuffer.wrap(datagram.getData()))
|
Message.FromBuffer(ByteBuffer.wrap(datagram.getData()))
|
||||||
));
|
));
|
||||||
@ -69,8 +70,7 @@ public class NetHandlerSCKT extends NetEventHandler {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start(Host h, Parser p) {
|
public void start(Host h, Parser p) {
|
||||||
Objects.requireNonNull(h);
|
super.start(h,p);
|
||||||
Objects.requireNonNull(p);
|
|
||||||
hosts = p.hosts();
|
hosts = p.hosts();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -78,17 +78,18 @@ public class NetHandlerSCKT extends NetEventHandler {
|
|||||||
} catch (SocketException e) {
|
} catch (SocketException e) {
|
||||||
throw new Error(e);
|
throw new Error(e);
|
||||||
}
|
}
|
||||||
receiving.set(true);
|
|
||||||
new Thread(() -> {
|
tex.execute(()->{
|
||||||
while (receiving.get()) {
|
while (active.get()) {
|
||||||
this.deliver(NetEvent.EMPTY());
|
deliver(NetEvent.EMPTY());
|
||||||
}
|
}
|
||||||
}).start();
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop() {
|
public void stop() {
|
||||||
|
super.stop();
|
||||||
|
tex.shutdown();
|
||||||
socket.close();
|
socket.close();
|
||||||
receiving.set(false);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,87 +3,89 @@ package cs451.net.handler;
|
|||||||
import cs451.net.event.Message;
|
import cs451.net.event.Message;
|
||||||
import cs451.net.NetManager;
|
import cs451.net.NetManager;
|
||||||
import cs451.net.event.*;
|
import cs451.net.event.*;
|
||||||
|
import cs451.parser.Host;
|
||||||
|
import cs451.parser.Parser;
|
||||||
import cs451.tools.Pair;
|
import cs451.tools.Pair;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.*;
|
||||||
import java.util.Set;
|
import java.util.concurrent.*;
|
||||||
import java.util.Timer;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.TimerTask;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
|
|
||||||
|
|
||||||
public class NetHandlerSL extends NetEventHandler {
|
public class NetHandlerSL extends NetEventHandlerAbstract {
|
||||||
|
|
||||||
private static final Set<Message> EMPTY_SET = ConcurrentHashMap.newKeySet();
|
|
||||||
|
|
||||||
|
private List<Host> hosts;
|
||||||
Set<Integer> hasTimeout = ConcurrentHashMap.newKeySet();
|
Set<Integer> hasTimeout = ConcurrentHashMap.newKeySet();
|
||||||
Map<Integer, Set<Message>> hasAck = new ConcurrentHashMap<>();
|
|
||||||
Map<Pair<Integer, Message>, Integer> retryCounter = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
public NetHandlerSL(Class<? extends NetEventHandler> deliverLayer, Class<? extends NetEventHandler> broadcastLayer) {
|
PriorityBlockingQueue<Pair<Pair<Integer, AtomicInteger>,Message>> sending = new PriorityBlockingQueue<>();
|
||||||
|
|
||||||
|
public NetHandlerSL(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
|
||||||
super(deliverLayer,broadcastLayer);
|
super(deliverLayer,broadcastLayer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
private void handleTimeout(NetEvent ne) {
|
public void sendIf(){
|
||||||
if (hasAck.getOrDefault(ne.peer.getId(),EMPTY_SET).contains(ne.message)) {
|
sending.forEach(ppm -> {
|
||||||
return;
|
ppm.first().second().getAndIncrement();
|
||||||
}
|
Optional<Host> h = hosts.stream().filter(hl->hl.getId()==ppm.first().first()).findFirst();
|
||||||
if (retryCounter.compute(new Pair<>(ne.peer.getId(),ne.message), (k, ov) -> (ov == null) ? 2 : ov + 1) >= MAX_TRIES) {
|
h.ifPresent(host -> sendNextAsync(NetEvent.Message(host, ppm.second())));
|
||||||
timeout(ne);
|
});
|
||||||
} else {
|
sending.removeIf(ppm -> {
|
||||||
send(ne);
|
if(hasTimeout.contains(ppm.first().first())){
|
||||||
}
|
return true;
|
||||||
}
|
}
|
||||||
|
// if(ppm.first().second().get() > NetManager.SL_MAX_TRIES){
|
||||||
private void checkTimeout(NetEvent ne) {
|
// hasTimeout.add(ppm.first().first());
|
||||||
new Thread(()->{
|
// Optional<Host> h = hosts.stream().filter(hl->hl.getId()==ppm.first().first()).findFirst();
|
||||||
Integer t = retryCounter.getOrDefault(new Pair<>(ne.peer.getId(), ne.message), 1);
|
// h.ifPresent(host -> crashNextAsync(NetEvent.Message(host, ppm.second())));
|
||||||
new Timer().schedule(
|
// return true;
|
||||||
new TimerTask() {
|
// }
|
||||||
@Override
|
return false;
|
||||||
public void run () {
|
});
|
||||||
handleTimeout(ne);
|
|
||||||
}
|
|
||||||
}, MS_WAIT*t*t*t);
|
|
||||||
}).start();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void send(NetEvent ne) {
|
public void send(NetEvent ne) {
|
||||||
if (hasTimeout.contains(ne.peer.getId())) {
|
if(hasTimeout.contains(ne.peer.getId())){
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
sendNext(ne);
|
sending.add(new Pair<>(new Pair<>(ne.peer.getId(), new AtomicInteger(1)),ne.message));
|
||||||
checkTimeout(ne);
|
sendNextSync(ne);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deliver(NetEvent ne) {
|
public void deliver(NetEvent ne) {
|
||||||
if (hasTimeout.contains(ne.peer.getId())) {
|
if(hasTimeout.contains(ne.peer.getId())){
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
switch (ne.message.tpe){
|
switch (ne.message.tpe){
|
||||||
case ACK:
|
case ACK:
|
||||||
hasAck.compute(ne.peer.getId(),(k, s) -> {
|
sending.removeIf(ppm -> (ppm.first().first()==ne.peer.getId() &&
|
||||||
if(s == null) s = ConcurrentHashMap.newKeySet();
|
ppm.second().equals(ne.message)));
|
||||||
s.add(ne.message);
|
|
||||||
return s;
|
|
||||||
});
|
|
||||||
break;
|
break;
|
||||||
case DATA:
|
case DATA:
|
||||||
sendNext(NetEvent.MessageACK(ne.peer,ne.message));
|
sendNextSync(NetEvent.MessageACK(ne.peer,ne.message));
|
||||||
deliverNext(ne);
|
deliverNextSync(ne);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void timeout(NetEvent event) {
|
@Override
|
||||||
if(hasTimeout.add(event.peer.getId())) {
|
public void crash(NetEvent ne) {
|
||||||
NetManager.crash(NetHandlerURB.class, event);
|
hasTimeout.add(ne.peer.getId());
|
||||||
}
|
crashNextSync(ne);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDone() {
|
||||||
|
return sending.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start(Host h, Parser p) {
|
||||||
|
this.hosts = p.hosts();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,13 @@
|
|||||||
package cs451.net.handler;
|
package cs451.net.handler;
|
||||||
|
|
||||||
|
import cs451.net.NetManager;
|
||||||
import cs451.net.event.NetEvent;
|
import cs451.net.event.NetEvent;
|
||||||
import cs451.parser.Host;
|
import cs451.parser.Host;
|
||||||
import cs451.parser.Parser;
|
import cs451.parser.Parser;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public class NetHandlerTOPL extends NetEventHandler {
|
public class NetHandlerTOPL extends NetEventHandlerAbstract {
|
||||||
|
|
||||||
private final AtomicInteger toSend = new AtomicInteger(0);
|
private final AtomicInteger toSend = new AtomicInteger(0);
|
||||||
private final AtomicInteger delivered = new AtomicInteger(0);
|
private final AtomicInteger delivered = new AtomicInteger(0);
|
||||||
@ -14,21 +15,20 @@ public class NetHandlerTOPL extends NetEventHandler {
|
|||||||
|
|
||||||
private Host me;
|
private Host me;
|
||||||
|
|
||||||
public NetHandlerTOPL(Class<? extends NetEventHandler> deliverLayer, Class<? extends NetEventHandler> broadcastLayer) {
|
public NetHandlerTOPL(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
|
||||||
super(deliverLayer, broadcastLayer);
|
super(deliverLayer, broadcastLayer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
private void sendIf(){
|
public void sendIf() {
|
||||||
synchronized (waiting) {
|
while (toSend.get() > 0 && waiting.get() < NetManager.WINDOW_WIDTH) {
|
||||||
while (waiting.get() < SEND_WINDOW && toSend.get() > 0) {
|
toSend.decrementAndGet();
|
||||||
toSend.getAndDecrement();
|
waiting.incrementAndGet();
|
||||||
waiting.getAndIncrement();
|
sendNextSync(NetEvent.EMPTY());
|
||||||
sendNextSync(NetEvent.EMPTY());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void send(NetEvent ne) {
|
public void send(NetEvent ne) {
|
||||||
sendIf();
|
sendIf();
|
||||||
@ -37,21 +37,20 @@ public class NetHandlerTOPL extends NetEventHandler {
|
|||||||
@Override
|
@Override
|
||||||
public void deliver(NetEvent ne) {
|
public void deliver(NetEvent ne) {
|
||||||
if (ne.message.src == me.getId()) {
|
if (ne.message.src == me.getId()) {
|
||||||
synchronized (waiting) {
|
delivered.incrementAndGet();
|
||||||
delivered.getAndIncrement();
|
waiting.decrementAndGet();
|
||||||
waiting.getAndDecrement();
|
|
||||||
}
|
|
||||||
sendIf();
|
|
||||||
}
|
}
|
||||||
|
sendIf();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isDone() {
|
public boolean isDone() {
|
||||||
return toSend.get()==0 && waiting.get()==0;
|
return (toSend.get()==0) && (waiting.get()==0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start(Host h, Parser p) {
|
public void start(Host h, Parser p) {
|
||||||
|
super.start(h,p);
|
||||||
me = h;
|
me = h;
|
||||||
toSend.set(p.messageCount());
|
toSend.set(p.messageCount());
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,7 @@ import java.util.Set;
|
|||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
|
|
||||||
public class NetHandlerURB extends NetEventHandler {
|
public class NetHandlerURB extends NetEventHandlerAbstract {
|
||||||
|
|
||||||
|
|
||||||
private final Set<Integer> correct = ConcurrentHashMap.newKeySet();
|
private final Set<Integer> correct = ConcurrentHashMap.newKeySet();
|
||||||
@ -21,44 +21,54 @@ public class NetHandlerURB extends NetEventHandler {
|
|||||||
private Integer myId;
|
private Integer myId;
|
||||||
|
|
||||||
|
|
||||||
public NetHandlerURB(Class<? extends NetEventHandler> deliverLayer, Class<? extends NetEventHandler> broadcastLayer) {
|
public NetHandlerURB(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
|
||||||
super(deliverLayer,broadcastLayer);
|
super(deliverLayer,broadcastLayer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deliverIf(){
|
@Override
|
||||||
|
public void deliverIf(){
|
||||||
pending.stream().filter(smp -> ack.getOrDefault(smp,ConcurrentHashMap.newKeySet()).containsAll(correct)).
|
pending.stream().filter(smp -> ack.getOrDefault(smp,ConcurrentHashMap.newKeySet()).containsAll(correct)).
|
||||||
filter(delivered::add).
|
filter(delivered::add).
|
||||||
forEach(smp -> deliverNext(NetEvent.Message(smp.first(),smp.second())));
|
forEach(smp -> deliverNextAsync(NetEvent.Message(smp.first(),smp.second())));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void send(NetEvent ne) {
|
public void send(NetEvent ne) {
|
||||||
pending.add(new Pair<>(myId, ne.message.id));
|
pending.add(new Pair<>(myId, ne.message.id));
|
||||||
ne.message.src = myId;
|
ne.message.src = myId;
|
||||||
sendNext(ne);
|
sendNextSync(ne);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deliver(NetEvent ne) {
|
public void deliver(NetEvent ne) {
|
||||||
ack.compute(new Pair<>(ne.message.src,ne.message.id),(k, v) -> {
|
Pair<Integer,Integer> smp =new Pair<>(ne.message.src,ne.message.id);
|
||||||
|
ack.compute(smp,(k, v) -> {
|
||||||
if(v == null){
|
if(v == null){
|
||||||
v = ConcurrentHashMap.newKeySet();
|
v = ConcurrentHashMap.newKeySet();
|
||||||
}
|
}
|
||||||
v.add(ne.peer.getId());
|
v.add(ne.peer.getId());
|
||||||
return v;
|
return v;
|
||||||
});
|
});
|
||||||
if(pending.add(new Pair<>(ne.message.src,ne.message.id))){
|
if(pending.add(smp)){
|
||||||
sendNext(ne);
|
sendNextSync(ne);
|
||||||
}
|
}
|
||||||
deliverIf();
|
deliverIf();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void crash(NetEvent ne) {
|
public void crash(NetEvent ne) {
|
||||||
correct.remove(ne.peer.getId());
|
correct.remove(ne.peer.getId());
|
||||||
deliverIf();
|
deliverIf();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isDone() {
|
||||||
|
return delivered.containsAll(pending);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void start(Host h, Parser p) {
|
public void start(Host h, Parser p) {
|
||||||
|
super.start(h,p);
|
||||||
p.hosts().forEach(ch-> correct.add(ch.getId()));
|
p.hosts().forEach(ch-> correct.add(ch.getId()));
|
||||||
myId = h.getId();
|
myId = h.getId();
|
||||||
}
|
}
|
||||||
|
@ -3,24 +3,31 @@ package cs451.tools;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class Pair<S, T> implements Comparable {
|
public class Pair<S, T> implements Comparable<T> {
|
||||||
private final S x;
|
|
||||||
private final T y;
|
|
||||||
private final List<Object> vl = new ArrayList<>();
|
private final List<Object> vl = new ArrayList<>();
|
||||||
|
|
||||||
public Pair(S x, T y) {
|
public Pair(S x, T y) {
|
||||||
this.x = x;
|
vl.add(x);
|
||||||
this.y = y;
|
vl.add(y);
|
||||||
vl.add(x.toString());
|
|
||||||
vl.add(y.toString());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public S first(){
|
public S first(){
|
||||||
return this.x;
|
return (S) vl.get(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public T second(){
|
public T second(){
|
||||||
return this.y;
|
return (T) vl.get(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void first(S v){
|
||||||
|
vl.set(0,v);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void second(T v){
|
||||||
|
vl.set(1,v);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -35,6 +42,7 @@ public class Pair<S, T> implements Comparable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public final boolean equals(final Object obj) {
|
public final boolean equals(final Object obj) {
|
||||||
if (this == obj) {
|
if (this == obj) {
|
||||||
return true;
|
return true;
|
||||||
@ -45,14 +53,14 @@ public class Pair<S, T> implements Comparable {
|
|||||||
if (getClass() != obj.getClass()) {
|
if (getClass() != obj.getClass()) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
final Pair other = (Pair) obj;
|
final Pair<S,T> other = (Pair<S,T>) obj;
|
||||||
return this.vl.equals(other.vl);
|
return this.vl.equals(other.vl);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public int compareTo(Object o) {
|
public int compareTo(Object o) {
|
||||||
Pair co = (Pair)o;
|
Pair<S,T> co = (Pair<S,T>)o;
|
||||||
|
return ((Comparable<T>)second()).compareTo(co.second());
|
||||||
return ((int)co.second())- ((int)second());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
44
257844/src/main/java/cs451/tools/ParamDetector.java
Normal file
44
257844/src/main/java/cs451/tools/ParamDetector.java
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
package cs451.tools;
|
||||||
|
|
||||||
|
import cs451.net.NetManager;
|
||||||
|
import cs451.parser.Host;
|
||||||
|
import cs451.parser.Parser;
|
||||||
|
|
||||||
|
public abstract class ParamDetector {
|
||||||
|
|
||||||
|
private static Integer bound(Integer value, Integer lower, Integer upper){
|
||||||
|
return Math.min(Math.max(value,lower),upper);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void detectAndSet(Host me, Parser p){
|
||||||
|
|
||||||
|
int cores = Runtime.getRuntime().availableProcessors();
|
||||||
|
int SMProcesses = Math.toIntExact(p.hosts().stream().filter(h -> h.getIp().equals(me.getIp())).count());
|
||||||
|
int coresPerProcess = bound((cores/SMProcesses)+1,2,64);
|
||||||
|
int bonusCoresPerProcess = SMProcesses == 1 ? 2: 1;
|
||||||
|
|
||||||
|
System.out.println("System has "+cores+" cores, shared by "+SMProcesses+" processes.");
|
||||||
|
if(cores/SMProcesses < 2){
|
||||||
|
System.out.println("Running too many processes on the same System might degrade performance.");
|
||||||
|
}
|
||||||
|
System.out.println("Starting Process with "+coresPerProcess+" cores and "+bonusCoresPerProcess+" bonus cores.");
|
||||||
|
NetManager.THREAD_COUNT = coresPerProcess;
|
||||||
|
NetManager.THREAD_BOOST_COUNT = bonusCoresPerProcess;
|
||||||
|
|
||||||
|
int messages = p.messageCount();
|
||||||
|
int processCount = p.hosts().size();
|
||||||
|
int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(4-(processCount/3.0)))));
|
||||||
|
int windowWidth = bound(coresPerProcess*windowWidthMult,2,messages);
|
||||||
|
|
||||||
|
System.out.println("Process expected to broadcast "+messages+" messages.");
|
||||||
|
System.out.println("Starting Process with WindowWidth of "+windowWidth+" (~ x"+windowWidthMult+").");
|
||||||
|
NetManager.WINDOW_WIDTH = windowWidth;
|
||||||
|
|
||||||
|
//We might want to PingPong To set Custom Timing Limitations....
|
||||||
|
NetManager.FD_MAX_TRIES = 3;
|
||||||
|
NetManager.FD_WAIT = 1000;
|
||||||
|
|
||||||
|
NetManager.SL_MAX_TRIES = 8;
|
||||||
|
NetManager.INTERNAL_WAIT = 500;
|
||||||
|
}
|
||||||
|
}
|
3
bnr.sh
3
bnr.sh
@ -3,4 +3,5 @@
|
|||||||
export _JAVA_OPTIONS="-Xmx16G"
|
export _JAVA_OPTIONS="-Xmx16G"
|
||||||
./257844/build.sh
|
./257844/build.sh
|
||||||
sudo echo 1
|
sudo echo 1
|
||||||
echo "\n\n\n" | ./validate.py -r 257844/run.sh -b fifo -l 257844/bin/logs -p 10 -m 200
|
echo "Running with $1 processes and $2 messages"
|
||||||
|
echo "\n\n\n" | ./validate.py -r 257844/run.sh -b fifo -l 257844/bin/logs -p $1 -m $2
|
||||||
|
Loading…
x
Reference in New Issue
Block a user