diff --git a/257844/src/main/java/cs451/Main.java b/257844/src/main/java/cs451/Main.java index c21fd43..f505982 100644 --- a/257844/src/main/java/cs451/Main.java +++ b/257844/src/main/java/cs451/Main.java @@ -1,6 +1,5 @@ package cs451; -import cs451.net.event.Message; import cs451.net.NetManager; import cs451.net.event.NetEventType; import cs451.parser.Coordinator; @@ -17,23 +16,21 @@ import java.util.concurrent.ConcurrentLinkedQueue; public class Main { - private static String formatOutput(Integer id, Integer host, Message.TYPE tpe){ + private static String formatOutput(Integer id, Integer host, NetEventType tpe){ switch (tpe){ - case DATA: - return "d "+host+" "+id+"\n"; - case ACK: + case DLVR: return "d "+host+" "+id+"\n"; case BCST: return "b "+id+"\n"; default: - return ""; + return "? "+host+" "+id+"\n"; } } private static void writeOutput(String filepath) throws IOException { FileWriter writer = new FileWriter(filepath); - for(Pair, Message.TYPE> mhpt : mh_tpe){ + for(Pair, NetEventType> mhpt : mh_tpe){ try { writer.write(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second())); } catch (IOException e) { @@ -65,7 +62,7 @@ public class Main { } private static Host me = null; - private static final Queue,Message.TYPE>> mh_tpe = new ConcurrentLinkedQueue<>(); + private static final Queue,NetEventType>> mh_tpe = new ConcurrentLinkedQueue<>(); private static Parser parser = null; @@ -103,13 +100,13 @@ public class Main { NetManager.start(me,parser,(t,ne) -> { Logger.error(t+"(" + ne.message.src + "):" + ne.message.toString()); if(t == NetEventType.DLVR){ - Pair, Message.TYPE> mhpt = new Pair<>(new Pair<>(ne.message.id, ne.message.src), ne.message.tpe); + Pair, NetEventType> mhpt = new Pair<>(new Pair<>(ne.message.id, ne.message.src), t); System.out.print(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second())); mh_tpe.add(mhpt); }else if(t== NetEventType.BCST){ - Pair, Message.TYPE> mhpt = new Pair<>(new Pair<>(ne.message.id, ne.message.src), ne.message.tpe); + Pair, NetEventType> mhpt = new Pair<>(new Pair<>(ne.message.id, ne.message.src), t); System.out.print(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second())); - mh_tpe.add(new Pair<>(new Pair<>(ne.message.id,ne.message.src),ne.message.tpe)); + mh_tpe.add(mhpt); } }, (t,ne) -> Logger.error("ERR"+t+" - "+ne.getMessage())); diff --git a/257844/src/main/java/cs451/net/NetManager.java b/257844/src/main/java/cs451/net/NetManager.java index 4c2c095..f5aadcb 100644 --- a/257844/src/main/java/cs451/net/NetManager.java +++ b/257844/src/main/java/cs451/net/NetManager.java @@ -8,6 +8,7 @@ import cs451.parser.Parser; import cs451.tools.Logger; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.*; import java.util.function.BiConsumer; @@ -39,6 +40,9 @@ public abstract class NetManager { public static Integer WINDOW_WIDTH; + public static Integer BUNDLE_SIZE; + + public static List hosts; private static boolean isStopped = false; @@ -72,23 +76,32 @@ public abstract class NetManager { onErrorHandler = oeh; registerNetHandler(new NetHandlerSCKT( NetHandlerFD.class, NetHandlerDFLT.class)); - registerNetHandler(new NetHandlerFD( NetHandlerSL.class, NetHandlerSCKT.class)); - registerNetHandler(new NetHandlerSL( NetHandlerPL.class, NetHandlerFD.class)); + registerNetHandler(new NetHandlerFD( NetHandlerBNDL.class, NetHandlerSCKT.class)); + registerNetHandler(new NetHandlerBNDL( NetHandlerSL.class, NetHandlerFD.class)); + registerNetHandler(new NetHandlerSL( NetHandlerPL.class, NetHandlerBNDL.class)); registerNetHandler(new NetHandlerPL( NetHandlerBEB.class, NetHandlerSL.class)); registerNetHandler(new NetHandlerBEB( NetHandlerURB.class, NetHandlerPL.class)); - registerNetHandler(new NetHandlerURB( NetHandlerFIFO.class, NetHandlerBEB.class)); - registerNetHandler(new NetHandlerFIFO( NetHandlerTOPL.class, NetHandlerURB.class)); - registerNetHandler(new NetHandlerTOPL( NetHandlerDFLT.class, NetHandlerFIFO.class)); + registerNetHandler(new NetHandlerURB( NetHandlerLCB.class, NetHandlerBEB.class)); + //registerNetHandler(new NetHandlerFIFO( NetHandlerLCB.class, NetHandlerURB.class)); + registerNetHandler(new NetHandlerLCB( NetHandlerTOPL.class, NetHandlerURB.class)); + registerNetHandler(new NetHandlerTOPL( NetHandlerDFLT.class, NetHandlerLCB.class)); + registerNetHandler(new NetHandlerDFLT( NetHandlerDFLT.class, NetHandlerDFLT.class)); + hosts = p.hosts(); nm_listeners.values().forEach(neh -> neh.start(h,p)); ex = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT+THREAD_BOOST_COUNT,30,TimeUnit.SECONDS,new LinkedBlockingQueue<>()); ex.prestartAllCoreThreads(); i_tex.scheduleAtFixedRate(()-> { System.err.println("NetManager DeliverIf/BroadcastIf"); - nm_listeners.values().forEach(NetEventHandlerInterface::deliverIf); - nm_listeners.values().forEach(NetEventHandlerInterface::broadcastIf); + nm_listeners.values().forEach(neh->{ + if(neh.condbd.getAndSet(false)){ + neh.deliverIf(); + neh.broadcastIf(); + } + + }); }, 0, INTERNAL_WAIT, TimeUnit.MILLISECONDS); fd_tex.scheduleAtFixedRate(()-> { @@ -116,13 +129,16 @@ public abstract class NetManager { * @return true if NM and NEH are done */ public static boolean isDone() { - return isStopped || nm_listeners.values().stream().map(nmh ->{ - if(!nmh.isDone()){ - System.err.println("NetManager Waiting for: "+nmh.getClass().getSimpleName()); - return false; - } - return true; - }).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2); + return isStopped || + (ex.getActiveCount()==0 && + ex.getQueue().size()==0 && + nm_listeners.values().stream().allMatch(nmh->{ + if(!nmh.isDone()){ + System.out.println("Waiting for "+nmh.getClass().getSimpleName()); + return false; + } + return true; + })); } @@ -212,7 +228,6 @@ public abstract class NetManager { } } } - } diff --git a/257844/src/main/java/cs451/net/event/Message.java b/257844/src/main/java/cs451/net/event/Message.java deleted file mode 100644 index f4cb3b4..0000000 --- a/257844/src/main/java/cs451/net/event/Message.java +++ /dev/null @@ -1,164 +0,0 @@ -package cs451.net.event; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Objects; - - -/** - * Message abstraction class - * - * @author C. Hölzl - */ -public class Message implements Comparable { - - /** - * Type of Message - */ - public enum TYPE { - NONE("NONE", 'N'), - ERR("ERR", 'E'), - BCST("BCST", 'B'), - DATA("DATA", ' '), - ACK("ACK", 'A'), - HRTB("HRTB",'H'); - - public final Character c; - public final String tag; - TYPE(String tag, Character c){ - this.tag = tag; - this.c = c; - } - - } - - public Integer src = -1; - public final Integer id; - public TYPE tpe; - - /** - * Creates a new message from a given message - * @param m Message to copy - * @return Message - */ - public static Message MSG(Message m) { - return new Message(m); - } - - /** - * Build a Data Message - * @param id message id - * @return Message - */ - public static Message DMSG(Integer id){ - return new Message(id,TYPE.DATA); - } - public static Message DMSG(Integer id, Integer src) { - return new Message(id,TYPE.DATA,src); - } - - /** - * Build a Heartbeat Message - * @return Message - */ - public static Message HRTB(){ - return new Message(0,TYPE.HRTB); - } - - /** - * Build a Message of a given type - * @param mess_id Message ID - * @param tpe Message Type - * @return Message - */ - public static Message TMSG(Integer mess_id, TYPE tpe) { - return new Message(mess_id, tpe); - } - - /** - * @return Empty Message with no type nor ID - */ - public static Message EMPTY(){ - return new Message(-1, TYPE.NONE); - } - - - /** - * Converts a character to a given type - * @param c char to convert - * @return Type - */ - private static TYPE CharacterToTpe(Character c){ - return Arrays.stream(TYPE.values()).filter(type -> type.c==c).findFirst().orElse(TYPE.NONE); - } - - /** - * Create a message from a ByteBuffer - * @param b buffer - * @return Message - */ - public static Message FromBuffer(ByteBuffer b) { - Character tpe = b.getChar(); - Integer id = b.getInt(); - Integer src = b.getInt(); - return new Message(id,CharacterToTpe(tpe),src); - } - - /** - * Puts a message into a ByteBuffer - * @param b buffer - */ - public void ToBuffer(ByteBuffer b){ - b.putChar(tpe.c); - b.putInt(id); - b.putInt(src); - } - - private Message(Message m) { - this.id = m.id; - this.tpe = m.tpe; - this.src = m.src; - } - private Message(Integer id, TYPE tpe){ - this.id = id; - this.tpe = tpe; - } - private Message(Integer id, TYPE tpe, Integer src){ - this.id = id; - this.tpe = tpe; - this.src = src; - } - - @Override - public String toString() { - if (tpe == TYPE.DATA) { - return src + "-" + id; - } - return src + "-" + tpe.c + id; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final Message m = (Message) obj; - return id.equals(m.id) && src.equals(m.src); - } - - @Override - public int hashCode() { - return Objects.hash(id,src); - } - - @Override - public int compareTo(Message m) { - return id.compareTo(m.id); - } -} diff --git a/257844/src/main/java/cs451/net/event/NetEvent.java b/257844/src/main/java/cs451/net/event/NetEvent.java index 8deb36a..8c2344d 100644 --- a/257844/src/main/java/cs451/net/event/NetEvent.java +++ b/257844/src/main/java/cs451/net/event/NetEvent.java @@ -1,62 +1,48 @@ package cs451.net.event; +import cs451.net.event.message.Message; +import cs451.net.event.message.NetMessageAbstract; import cs451.parser.Host; /** * NetEvent abstraction class * - * Holds a {@link cs451.net.event.Message} and a {@link cs451.parser.Host} + * Holds a {@link Message} and a {@link cs451.parser.Host} * * @author C. Hölzl */ public class NetEvent { public final Host peer; - public final Message message; + public final NetMessageAbstract message; - private NetEvent(Host peer, Message message) { + private NetEvent(Host peer, NetMessageAbstract m) { this.peer = peer; - this.message = message; + this.message = m; } - - public static NetEvent Message(Host peer, Integer mess_id){ - return new NetEvent(peer, Message.DMSG(mess_id)); + public static NetEvent Message(NetMessageAbstract m){ + return new NetEvent(NO_PEER, NetMessageAbstract.MSG(m)); } - public static NetEvent Message(Host peer, Integer mess_id, Message.TYPE tpe){ - return new NetEvent(peer, Message.TMSG(mess_id,tpe)); + public static NetEvent Message(Host peer, NetMessageAbstract m){ + return new NetEvent(peer, NetMessageAbstract.MSG(m)); } - public static NetEvent Message(Host peer, Message m){ - return new NetEvent(peer, Message.MSG(m)); - } - - public static NetEvent Message(Message m){ - return new NetEvent(NO_PEER,Message.MSG(m)); - } - - public static NetEvent Message(Integer mess_id){ - return new NetEvent(NO_PEER, Message.DMSG(mess_id)); - } - - public static NetEvent Message(Integer src, Integer mess_id) { - return new NetEvent(NO_PEER, Message.DMSG(mess_id,src)); - } - - public static NetEvent MessageACK(Host peer, Message message) { - NetEvent ne = NetEvent.Message(peer,message); - ne.message.tpe = Message.TYPE.ACK; - return ne; + public static NetEvent MessageACK(Host peer, NetMessageAbstract m) { + return new NetEvent(peer,NetMessageAbstract.ACK(m)); } public static NetEvent MessageHRTB(Host peer){ - return NetEvent.Message(peer,Message.HRTB()); + return new NetEvent(peer,NetMessageAbstract.HRTB()); } + public static NetEvent MessageCRSH(Host peer){ + return new NetEvent(peer,NetMessageAbstract.EMPTY()); + } public static NetEvent EMPTY(){ - return new NetEvent(NO_PEER, Message.EMPTY()); + return new NetEvent(NO_PEER, NetMessageAbstract.EMPTY()); } public static final Host NO_PEER = null; diff --git a/257844/src/main/java/cs451/net/event/NetEventType.java b/257844/src/main/java/cs451/net/event/NetEventType.java index 4e4dc39..0460137 100644 --- a/257844/src/main/java/cs451/net/event/NetEventType.java +++ b/257844/src/main/java/cs451/net/event/NetEventType.java @@ -10,5 +10,5 @@ public enum NetEventType { DLVR, BCST, CRSH, - RCVR; + RCVR } diff --git a/257844/src/main/java/cs451/net/event/message/Message.java b/257844/src/main/java/cs451/net/event/message/Message.java new file mode 100644 index 0000000..72c9c7b --- /dev/null +++ b/257844/src/main/java/cs451/net/event/message/Message.java @@ -0,0 +1,112 @@ +package cs451.net.event.message; + +import java.nio.ByteBuffer; +import java.util.HashMap; + + +/** + * Message abstraction class + * + * @author C. Hölzl + */ +public class Message extends NetMessageAbstract { + + public final HashMap dependencies; + + private Message(Integer src, Integer id, TYPE tpe, HashMap dependencies){ + super(src, id, tpe); + this.dependencies = dependencies; + } + + /** + * Create a message from a ByteBuffer + * @param b buffer + * @return Message + */ + public static Message FromBuffer(ByteBuffer b) { + if(b.remaining() < 14) return getBuilder().build(); + + Character tpe = b.getChar(); + int id = b.getInt(); + int src = b.getInt(); + int depLen = b.getInt(); + HashMap newdep = new HashMap<>(depLen); + for (int i = 0; i < depLen; i++) { + if(b.remaining()<8) break; + newdep.put(b.getInt(),b.getInt()); + } + return new Message(src, id, NetMessageInterface.CharacterToTpe(tpe), newdep); + } + + /** + * Puts a message into a ByteBuffer + * @param b buffer + */ + public void ToBuffer(ByteBuffer b){ + if(b.remaining() < 14 + 8*dependencies.keySet().size()) return; + + b.putChar(tpe.c); + b.putInt(id); + b.putInt(src); + b.putInt(dependencies.keySet().size()); + for (Integer i : dependencies.keySet()) { + b.putInt(i); + b.putInt(dependencies.get(i)); + } + } + + @Override + public String toString() { + return src + "-" + tpe.c + id + " - " +dependencies.toString(); + } + + public static final Message HRTB_MSG = Message.getBuilder().setId(0).setTpe(TYPE.HRTB).build(); + + public static MessageBuilder getBuilder(){ + return new MessageBuilder(); + } + public static MessageBuilder getBuilder(NetMessageAbstract m){ + if (m instanceof Message) + return new MessageBuilder((Message) m); + return getBuilder(); + } + + public static class MessageBuilder{ + private int id = -1; + private int src = -1; + private TYPE tpe = TYPE.NONE; + private HashMap dependencies = new HashMap<>(0); + + public MessageBuilder(){} + public MessageBuilder(Message m){ + this.id = m.id; + this.src = m.src; + this.tpe = m.tpe; + this.dependencies = m.dependencies; + } + + public Message build(){ + return new Message(src,id,tpe,dependencies); + } + + public MessageBuilder setTpe(TYPE tpe) { + this.tpe = tpe; + return this; + } + + public MessageBuilder setId(int id) { + this.id = id; + return this; + } + + public MessageBuilder setSrc(int src) { + this.src = src; + return this; + } + + public MessageBuilder setDependencies(HashMap dependencies) { + this.dependencies = dependencies; + return this; + } + } +} diff --git a/257844/src/main/java/cs451/net/event/message/MessageBundle.java b/257844/src/main/java/cs451/net/event/message/MessageBundle.java new file mode 100644 index 0000000..960b12f --- /dev/null +++ b/257844/src/main/java/cs451/net/event/message/MessageBundle.java @@ -0,0 +1,118 @@ +package cs451.net.event.message; + +import cs451.net.NetManager; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +public class MessageBundle extends NetMessageAbstract { + + public final List messageList; + + private MessageBundle(int src, int id, TYPE tpe, List messageList) { + super(src,id,tpe); + this.messageList = messageList; + } + + public static NetMessageAbstract FromBuffer(ByteBuffer b) { + ArrayList msgl = new ArrayList<>(); + int mlen = Math.min(NetManager.BUNDLE_SIZE+1, b.getInt()); + for (int i = 0; i< mlen; i++) { + Message m = Message.FromBuffer(b); + if (m.tpe == TYPE.NONE) + break; + msgl.add(m); + } + return getBuilder().setMessageList(msgl).build(); + } + @Override + public void ToBuffer(ByteBuffer b) { + if (b.remaining() < 4) return; + + b.putInt(messageList.size()); + for (Message message : messageList) { + message.ToBuffer(b); + } + } + + @Override + public String toString() { + return tpe.c + "-" + Arrays.toString(messageList.toArray()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final MessageBundle m = (MessageBundle) obj; + return messageList.equals(m.messageList); + } + + @Override + public int hashCode() { + return Objects.hash(messageList); + } + + @Override + public int compareTo(NetMessageAbstract message) { + return 0; + } + + public static MessageBundleBuilder getBuilder(){ + return new MessageBundleBuilder(); + } + public static MessageBundleBuilder getBuilder(NetMessageAbstract m){ + if (m instanceof MessageBundle) + return new MessageBundleBuilder((MessageBundle) m); + return getBuilder(); + } + + public static class MessageBundleBuilder{ + private int id = -1; + private int src = -1; + private TYPE tpe = TYPE.NONE; + private List messageList = new ArrayList<>(0); + + public MessageBundleBuilder(){} + public MessageBundleBuilder(MessageBundle m){ + this.id = m.id; + this.src = m.src; + this.tpe = m.tpe; + this.messageList = m.messageList; + } + + public MessageBundle build(){ + return new MessageBundle(src,id,tpe,messageList); + } + + public MessageBundleBuilder setTpe(TYPE tpe) { + this.tpe = tpe; + return this; + } + + public MessageBundleBuilder setId(int id) { + this.id = id; + return this; + } + + public MessageBundleBuilder setSrc(int src) { + this.src = src; + return this; + } + + public MessageBundleBuilder setMessageList(List dependencies) { + this.messageList = dependencies; + return this; + } + } +} diff --git a/257844/src/main/java/cs451/net/event/message/NetMessageAbstract.java b/257844/src/main/java/cs451/net/event/message/NetMessageAbstract.java new file mode 100644 index 0000000..db83e5f --- /dev/null +++ b/257844/src/main/java/cs451/net/event/message/NetMessageAbstract.java @@ -0,0 +1,89 @@ +package cs451.net.event.message; + +import java.util.HashMap; +import java.util.List; +import java.util.Objects; + +public abstract class NetMessageAbstract implements NetMessageInterface { + public final TYPE tpe; + public final Integer src; + public final Integer id; + + public NetMessageAbstract(Integer src,Integer id, TYPE tpe) { + this.src = src; + this.id = id; + this.tpe = tpe; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final NetMessageAbstract m = (NetMessageAbstract) obj; + return id.equals(m.id) && src.equals(m.src); + } + + @Override + public int hashCode() { + return Objects.hash(id,src); + } + + @Override + public int compareTo(NetMessageAbstract m) { + return id.compareTo(m.id); + } + + + public static NetMessageAbstract MSG(NetMessageAbstract m) { + if (m instanceof Message) + return Message.getBuilder(m).build(); + if (m instanceof MessageBundle) + return MessageBundle.getBuilder(m).build(); + + return EMPTY(); + } + + /** + * Build a Ack Message + * @return Message + */ + public static NetMessageAbstract ACK(NetMessageAbstract m){ + if(m instanceof Message) + return Message.getBuilder(m).setTpe(TYPE.ACK).setDependencies(new HashMap<>()).build(); + return EMPTY(); + } + + + /** + * Build a Heartbeat Message + * @return Message + */ + public static NetMessageAbstract HRTB(){ + return MessageBundle.getBuilder().setMessageList(List.of(Message.HRTB_MSG)).setTpe(TYPE.HRTB).build(); + } + + public boolean isHeartBeat(){ + if (this instanceof Message){ + return this.tpe==TYPE.HRTB; + }else if (this instanceof MessageBundle){ + return ((MessageBundle)this).messageList.size()>0 && + ((MessageBundle)this).messageList.get(0).tpe==TYPE.HRTB; + }else{ + return false; + } + } + + /** + * @return Empty Message with no type nor ID + */ + public static Message EMPTY(){ + return Message.getBuilder().build(); + } +} diff --git a/257844/src/main/java/cs451/net/event/message/NetMessageInterface.java b/257844/src/main/java/cs451/net/event/message/NetMessageInterface.java new file mode 100644 index 0000000..3ee96f3 --- /dev/null +++ b/257844/src/main/java/cs451/net/event/message/NetMessageInterface.java @@ -0,0 +1,49 @@ +package cs451.net.event.message; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +public interface NetMessageInterface extends Comparable { + + /** + * Type of Message + */ + enum TYPE { + NONE("NONE", 'N'), + DATA("DATA", 'D'), + ACK("ACK", 'A'), + HRTB("HRTB",'H'); + + public final Character c; + public final String tag; + TYPE(String tag, Character c){ + this.tag = tag; + this.c = c; + } + } + + /** + * Converts a character to a given type + * @param c char to convert + * @return Type + */ + static TYPE CharacterToTpe(Character c){ + return Arrays.stream(TYPE.values()).filter(type -> type.c==c).findFirst().orElse(TYPE.NONE); + } + + /** + * Create a message from a ByteBuffer + * + * @param b buffer + * @return Message + */ + static NetMessageAbstract FromBuffer(ByteBuffer b) { + return MessageBundle.FromBuffer(b); + } + + /** + * Puts a message into a ByteBuffer + * @param b buffer + */ + void ToBuffer(ByteBuffer b); +} diff --git a/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java b/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java index 14959d9..28630f9 100644 --- a/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java +++ b/257844/src/main/java/cs451/net/handler/NetEventHandlerAbstract.java @@ -5,6 +5,8 @@ import cs451.net.event.NetEvent; import cs451.parser.Host; import cs451.parser.Parser; +import java.util.concurrent.atomic.AtomicBoolean; + /** * NetEventHandler abstraction class * @@ -12,13 +14,14 @@ import cs451.parser.Parser; */ public abstract class NetEventHandlerAbstract implements NetEventHandlerInterface { - /** * Deliver & Broadcast Layers to handle resulting operation */ private final Class deliverLayer; private final Class broadcastLayer; + public final AtomicBoolean condbd = new AtomicBoolean(false); + /** * Initialized the main NetEventHandler fields @@ -30,6 +33,12 @@ public abstract class NetEventHandlerAbstract implements NetEventHandlerInterfac this.broadcastLayer = broadcastLayer; } + /** + * Requests a conditional Broadcast/Delivery + */ + public void requestCondBD(Boolean v){ + condbd.set(v); + } /** * Delivers a NetEvent Synchronously diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java b/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java index 50d5fae..cdcdaa8 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java @@ -1,12 +1,10 @@ package cs451.net.handler; +import cs451.net.NetManager; import cs451.net.event.NetEvent; import cs451.parser.Host; import cs451.parser.Parser; -import java.util.List; -import java.util.stream.Collectors; - /** * NetEventHandler for Best Effort Broadcast * @@ -16,8 +14,7 @@ import java.util.stream.Collectors; */ public class NetHandlerBEB extends NetEventHandlerAbstract { - private List hosts; - private Host h; + private static Host h; public NetHandlerBEB(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); @@ -26,14 +23,13 @@ public class NetHandlerBEB extends NetEventHandlerAbstract { @Override public void broadcast(NetEvent ne) { deliverNextAsync(NetEvent.Message(h,ne.message)); - hosts.parallelStream().forEach(h -> broadcastNextSync(NetEvent.Message(h, ne.message))); + NetManager.hosts.parallelStream().filter(ch->ch.getId()!=h.getId()).forEach(h -> broadcastNextSync(NetEvent.Message(h, ne.message))); } @Override public void start(Host h, Parser p) { super.start(h,p); - this.h = h; - hosts = p.hosts().stream().filter(ch -> ch.getId()!=h.getId()).collect(Collectors.toList()); + NetHandlerBEB.h = h; } } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerBNDL.java b/257844/src/main/java/cs451/net/handler/NetHandlerBNDL.java new file mode 100644 index 0000000..f059cd7 --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetHandlerBNDL.java @@ -0,0 +1,68 @@ +package cs451.net.handler; + +import cs451.net.NetManager; +import cs451.net.event.NetEvent; +import cs451.net.event.message.Message; +import cs451.net.event.message.MessageBundle; +import cs451.parser.Host; +import cs451.parser.Parser; + +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; + +/** + * NetEventHandler for BLUNDLER + * + * Groups Multiple messages into larger packets (and de-groups them on receive + * + * @author C. Hölzl + */ +public class NetHandlerBNDL extends NetEventHandlerAbstract { + + private static final ConcurrentHashMap> sending = new ConcurrentHashMap<>(); + + public NetHandlerBNDL(Class deliverLayer, Class broadcastLayer) { + super(deliverLayer, broadcastLayer); + } + + @Override + public synchronized void broadcastIf() { + NetManager.hosts.forEach(h-> sending.computeIfPresent(h.getId(),(hid, mq)->{ + while(mq.size()>0){ + ArrayList msgl = new ArrayList<>(NetManager.BUNDLE_SIZE); + mq.drainTo(msgl, NetManager.BUNDLE_SIZE); + broadcastNextAsync(NetEvent.Message(h, MessageBundle.getBuilder().setMessageList(msgl).build())); + } + return mq; + })); + } + + @Override + public void broadcast(NetEvent ne) { + sending.computeIfPresent(ne.peer.getId(),(h, q) -> { + if(ne.message instanceof Message) { + q.add((Message) ne.message); + requestCondBD(true); + } + return q; + }); + } + + @Override + public void deliver(NetEvent ne) { + if (ne.message instanceof MessageBundle) { + ((MessageBundle) ne.message).messageList.forEach(msg -> deliverNextAsync(NetEvent.Message(ne.peer,msg))); + } + } + + @Override + public boolean isDone() { + return sending.values().stream().allMatch(v->v.size()==0); + } + + @Override + public void start(Host h, Parser p) { + p.hosts().stream().filter(ch->ch.getId()!=h.getId()).forEach(ch->sending.put(ch.getId(),new PriorityBlockingQueue<>())); + } +} diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerFD.java b/257844/src/main/java/cs451/net/handler/NetHandlerFD.java index 29c71b2..9969248 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerFD.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerFD.java @@ -1,12 +1,10 @@ package cs451.net.handler; import cs451.net.NetManager; -import cs451.net.event.Message; import cs451.net.event.NetEvent; import cs451.parser.Host; import cs451.parser.Parser; -import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** @@ -19,8 +17,8 @@ import java.util.concurrent.ConcurrentHashMap; */ public class NetHandlerFD extends NetEventHandlerAbstract { - private List hosts; private static final ConcurrentHashMap alive = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap should_I = new ConcurrentHashMap<>(); public NetHandlerFD(Class deliverLayer, Class broadcastLayer) { super(deliverLayer, broadcastLayer); @@ -28,20 +26,24 @@ public class NetHandlerFD extends NetEventHandlerAbstract { @Override public synchronized void beat() { - hosts.forEach(h-> { - alive.computeIfPresent(h.getId(),(k, v) -> { - broadcastNextSync(NetEvent.MessageHRTB(h)); + NetManager.hosts.forEach(h-> { + should_I.computeIfPresent(h.getId(),(k, v) -> { + if(v>=0) + broadcastNextSync(NetEvent.MessageHRTB(h)); return v+1; }); + alive.computeIfPresent(h.getId(), (k,v) -> v+1); if(alive.getOrDefault(h.getId(),0) > NetManager.FD_MAX_TRIES){ - crashNextSync(NetEvent.Message(h, Message.EMPTY())); + crashNextSync(NetEvent.MessageCRSH(h)); alive.remove(h.getId()); + should_I.remove(h.getId()); } }); } @Override public void broadcast(NetEvent ne) { + should_I.computeIfPresent(ne.peer.getId(),(k, v)->-1); if (alive.containsKey(ne.peer.getId())){ broadcastNextSync(ne); } @@ -49,16 +51,17 @@ public class NetHandlerFD extends NetEventHandlerAbstract { @Override public void deliver(NetEvent ne) { - alive.computeIfPresent(ne.peer.getId(),(k,v)->0); - if (ne.message.tpe != Message.TYPE.HRTB) { + alive.computeIfPresent(ne.peer.getId(), (k,v) -> 0); + if (!ne.message.isHeartBeat()) { deliverNextSync(ne); } } @Override public void start(Host h, Parser p) { - super.start(h, p); - hosts = p.hosts(); - hosts.stream().filter(ch->ch.getId()!=h.getId()).forEach(ch-> alive.put(ch.getId(),-1)); + NetManager.hosts.stream().filter(ch->ch.getId()!=h.getId()).forEach(ch-> { + alive.put(ch.getId(),-1); + should_I.put(ch.getId(),-1); + }); } } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java b/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java index 755dcd8..2a6b9c1 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java @@ -1,13 +1,12 @@ package cs451.net.handler; -import cs451.net.NetManager; -import cs451.net.event.Message; -import cs451.net.event.NetEvent; -import cs451.net.event.NetEventType; +import cs451.net.event.*; +import cs451.net.event.message.NetMessageAbstract; import cs451.parser.Host; import cs451.parser.Parser; -import java.util.Map; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -24,50 +23,53 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class NetHandlerFIFO extends NetEventHandlerAbstract { - private final AtomicInteger sn = new AtomicInteger(1); - private final Map rsn = new ConcurrentHashMap<>(); - private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>(); - - private Host me; + private static final ConcurrentHashMap rsn = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap> pending = new ConcurrentHashMap<>(); public NetHandlerFIFO(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); } - @Override - public synchronized void broadcast(NetEvent ne) { - Integer snv = sn.getAndIncrement(); - NetManager.complete(NetEventType.BCST, NetEvent.Message(me, snv, Message.TYPE.BCST)); - broadcastNextAsync(NetEvent.Message(me.getId(),snv)); - } - @Override public synchronized void deliverIf(){ - pending.removeIf(hmp -> { - Integer crsn = rsn.getOrDefault(hmp.src, 1); - if (hmp.id.equals(crsn)) { - NetManager.complete(NetEventType.DLVR,NetEvent.Message(hmp)); - deliverNextSync(NetEvent.Message(hmp)); - rsn.put(hmp.src, crsn + 1); - return true; - } - return false; + pending.forEach((hid, mq) -> { + AtomicInteger crsn = rsn.get(hid); + if(crsn == null) return; + Collection msgc = new ArrayList<>(); + mq.drainTo(msgc); + msgc.forEach(msg -> + crsn.getAndUpdate(v -> { + if (msg.id.equals(v)){ + deliverNextSync(NetEvent.Message(msg)); + return v+1; + }else{ + mq.add(msg); + return v; + } + }) + ); }); } @Override public void deliver(NetEvent ne) { - pending.add(ne.message); + pending.computeIfPresent(ne.message.src, (h, mq) -> { + mq.add(ne.message); + requestCondBD(true); + return mq; + }); } @Override public boolean isDone() { - return pending.stream().noneMatch(hmp -> hmp.id.equals(rsn.getOrDefault(hmp.src, 1))); + return pending.values().stream().allMatch(hmp -> hmp.stream().noneMatch(msg-> msg.id.equals(rsn.getOrDefault(msg.src, new AtomicInteger(1)).get()))); } @Override public void start(Host h, Parser p) { - super.start(h,p); - me = h; + for (Host host : p.hosts()) { + pending.put(host.getId(),new PriorityBlockingQueue<>()); + rsn.put(host.getId(), new AtomicInteger(1)); + } } } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerLCB.java b/257844/src/main/java/cs451/net/handler/NetHandlerLCB.java new file mode 100644 index 0000000..5f98a8f --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetHandlerLCB.java @@ -0,0 +1,98 @@ +package cs451.net.handler; + +import cs451.net.event.NetEvent; +import cs451.net.event.message.Message; +import cs451.parser.Host; +import cs451.parser.Parser; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class NetHandlerLCB extends NetEventHandlerAbstract { + + private static Map> causality; + private static final ConcurrentHashMap> undelivered = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap state = new ConcurrentHashMap<>(); + + public NetHandlerLCB(Class deliverLayer, Class broadcastLayer) { + super(deliverLayer, broadcastLayer); + } + + @Override + public synchronized void deliverIf() { + AtomicBoolean deliveredOne = new AtomicBoolean(false); + do { + deliveredOne.set(false); + + undelivered.forEach((hid, mq) -> { + AtomicInteger crsn = state.get(hid); + if(crsn == null) return; + Collection msgc = new ArrayList<>(); + mq.drainTo(msgc); + msgc.forEach(msg -> + crsn.getAndUpdate(v -> { + if (msg.id.equals(v+1) && checkDependencies(msg.src,msg.dependencies)){ + deliverNextSync(NetEvent.Message(msg)); + deliveredOne.set(true); + return v+1; + }else{ + mq.add(msg); + return v; + } + }) + ); + }); + }while((deliveredOne.get())); + } + + @Override + public void broadcast(NetEvent ne) { + broadcastNextSync(NetEvent.Message(Message.getBuilder(ne.message).setDependencies(getDependencies(ne.message.src)).build())); + } + + @Override + public void deliver(NetEvent ne) { + undelivered.computeIfPresent(ne.message.src,(k, lv) -> { + lv.add((Message) ne.message); + return lv; + }); + requestCondBD(true); + } + + @Override + public void start(Host h, Parser p) { + super.start(h, p); + causality = p.causality(); + for (Host host : p.hosts()) { + undelivered.put(host.getId(), new PriorityBlockingQueue<>()); + state.put(host.getId(), new AtomicInteger(0)); + } + } + + @Override + public boolean isDone() { + return undelivered.values().stream().allMatch(AbstractCollection::isEmpty); + } + + private HashMap getDependencies(Integer hid){ + HashMap dependencies = new HashMap<>(); + List hc = causality.getOrDefault(hid, Collections.emptyList()); + for (Integer hci : hc) { + dependencies.put(hci, state.get(hci).get()); + } + return dependencies; + } + + private Boolean checkDependencies(Integer hid, HashMap md){ + HashMap cd = getDependencies(hid); + for(Integer k : cd.keySet()){ + if(cd.get(k) < md.get(k)){ + return false; + } + } + return true; + } +} diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerPL.java b/257844/src/main/java/cs451/net/handler/NetHandlerPL.java index 72f4daa..28d5bcb 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerPL.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerPL.java @@ -1,6 +1,7 @@ package cs451.net.handler; import cs451.net.event.*; +import cs451.net.event.message.NetMessageInterface; import cs451.tools.Pair; import java.util.Set; @@ -15,7 +16,7 @@ import java.util.concurrent.ConcurrentHashMap; */ public class NetHandlerPL extends NetEventHandlerAbstract { - private final Set> delivered = ConcurrentHashMap.newKeySet(); + private static final Set> delivered = ConcurrentHashMap.newKeySet(); public NetHandlerPL(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java b/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java index 9346cc0..37a34f2 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java @@ -1,15 +1,14 @@ package cs451.net.handler; -import cs451.net.event.Message; import cs451.net.NetManager; import cs451.net.event.*; +import cs451.net.event.message.NetMessageInterface; import cs451.parser.Host; import cs451.parser.Parser; import java.io.IOException; import java.net.*; import java.nio.ByteBuffer; -import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -24,14 +23,10 @@ import java.util.concurrent.Executors; */ public class NetHandlerSCKT extends NetEventHandlerAbstract { - private static final Integer BUFF_SIZE = 128; + private static final Integer BUFF_SIZE = 2048; private static final ExecutorService tex = Executors.newSingleThreadExecutor(); - - private List hosts; - - private DatagramSocket socket = null; - + private static DatagramSocket socket = null; public NetHandlerSCKT(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); @@ -60,15 +55,15 @@ public class NetHandlerSCKT extends NetEventHandlerAbstract { return; } - byte[] buff = new byte[BUFF_SIZE]; - DatagramPacket datagram = new DatagramPacket(buff, buff.length); + byte[] b = new byte[BUFF_SIZE]; + DatagramPacket datagram = new DatagramPacket(b, b.length); try { socket.receive(datagram); - Optional rhid = hosts.stream().filter(h -> h.getAddr().equals(datagram.getSocketAddress())).map(Host::getId).findFirst(); + Optional rhid = NetManager.hosts.stream().filter(h -> h.getAddr().equals(datagram.getSocketAddress())).map(Host::getId).findFirst(); deliverNextAsync(NetEvent.Message( new Host((InetSocketAddress) datagram.getSocketAddress(), rhid.orElse(-1)), - Message.FromBuffer(ByteBuffer.wrap(datagram.getData())) + NetMessageInterface.FromBuffer(ByteBuffer.wrap(datagram.getData())) )); } catch (IOException e) { NetManager.error(NetEventType.DLVR,e); @@ -78,17 +73,13 @@ public class NetHandlerSCKT extends NetEventHandlerAbstract { @Override public void start(Host h, Parser p) { - super.start(h,p); - hosts = p.hosts(); - try { - this.socket = new DatagramSocket(h.getPort()); + socket = new DatagramSocket(h.getPort()); } catch (SocketException e) { throw new Error(e); } - tex.execute(()->{ - while (true) { + while (!socket.isClosed()) { deliver(NetEvent.EMPTY()); } }); diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerSL.java b/257844/src/main/java/cs451/net/handler/NetHandlerSL.java index 3e62028..62666be 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerSL.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerSL.java @@ -1,10 +1,8 @@ package cs451.net.handler; import cs451.net.NetManager; -import cs451.net.event.Message; import cs451.net.event.*; -import cs451.parser.Host; -import cs451.parser.Parser; +import cs451.net.event.message.NetMessageAbstract; import cs451.tools.Pair; import java.util.*; @@ -23,10 +21,9 @@ import java.util.concurrent.*; */ public class NetHandlerSL extends NetEventHandlerAbstract { - private List hosts; - Set hasTimeout = ConcurrentHashMap.newKeySet(); + private static final Set hasTimeout = ConcurrentHashMap.newKeySet(); - PriorityBlockingQueue> sending = new PriorityBlockingQueue<>(); + private static final PriorityBlockingQueue> sending = new PriorityBlockingQueue<>(); public NetHandlerSL(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); @@ -35,14 +32,14 @@ public class NetHandlerSL extends NetEventHandlerAbstract { @Override public synchronized void beat(){ sending.removeIf(ppm -> hasTimeout.contains(ppm.first())); - sending.stream().limit(NetManager.WINDOW_WIDTH).forEach(ppm -> hosts.stream().filter(hl->hl.getId()==(ppm.first())).findFirst() + sending.stream().limit(NetManager.WINDOW_WIDTH).forEach(ppm -> NetManager.hosts.stream().filter(hl->hl.getId()==(ppm.first())).findFirst() .ifPresent(host -> broadcastNextSync(NetEvent.Message(host, ppm.second())))); } @Override public void broadcast(NetEvent ne) { if (!hasTimeout.contains(ne.peer.getId())) - sending.add(new Pair<>(ne.peer.getId(),Message.MSG(ne.message))); + sending.add(new Pair<>(ne.peer.getId(), ne.message)); broadcastNextSync(ne); } @@ -51,7 +48,7 @@ public class NetHandlerSL extends NetEventHandlerAbstract { switch (ne.message.tpe){ case ACK: sending.removeIf(ppm -> (ne.peer.getId()==ppm.first() && - ppm.second().equals(ne.message))); + ne.message.equals(ppm.second()))); break; case DATA: broadcastNextSync(NetEvent.MessageACK(ne.peer,ne.message)); @@ -72,10 +69,4 @@ public class NetHandlerSL extends NetEventHandlerAbstract { public boolean isDone() { return sending.isEmpty(); } - - @Override - public void start(Host h, Parser p) { - this.hosts = p.hosts(); - } - } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java b/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java index 7175faf..669fb53 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java @@ -2,10 +2,12 @@ package cs451.net.handler; import cs451.net.NetManager; import cs451.net.event.NetEvent; +import cs451.net.event.NetEventType; +import cs451.net.event.message.Message; +import cs451.net.event.message.NetMessageInterface; import cs451.parser.Host; import cs451.parser.Parser; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -20,11 +22,12 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class NetHandlerTOPL extends NetEventHandlerAbstract { - private final AtomicInteger toSend = new AtomicInteger(0); - private final AtomicInteger delivered = new AtomicInteger(0); - private final AtomicInteger waiting = new AtomicInteger(0); + private static final AtomicInteger sn = new AtomicInteger(1); + + private static final AtomicInteger toSend = new AtomicInteger(0); + private static final AtomicInteger delivered = new AtomicInteger(0); + private static final AtomicInteger waiting = new AtomicInteger(0); - private final AtomicBoolean status = new AtomicBoolean(false); private Host me; @@ -34,25 +37,29 @@ public class NetHandlerTOPL extends NetEventHandlerAbstract { @Override public synchronized void broadcastIf() { - if( !status.get() ) return; while (toSend.get() > 0 && waiting.get() < NetManager.WINDOW_WIDTH) { toSend.decrementAndGet(); waiting.incrementAndGet(); - broadcastNextSync(NetEvent.EMPTY()); + + Message newMess = Message.getBuilder().setSrc(me.getId()).setId(sn.getAndIncrement()).setTpe(NetMessageInterface.TYPE.DATA).build(); + NetManager.complete(NetEventType.BCST, NetEvent.Message(me, newMess)); + broadcastNextSync(NetEvent.Message(newMess)); } } @Override public void broadcast(NetEvent ne) { - status.set(true); + requestCondBD(true); } @Override public synchronized void deliver(NetEvent ne) { + NetManager.complete(NetEventType.DLVR,ne); if (ne.message.src == me.getId()) { delivered.incrementAndGet(); waiting.decrementAndGet(); + requestCondBD(true); } } @@ -63,7 +70,6 @@ public class NetHandlerTOPL extends NetEventHandlerAbstract { @Override public void start(Host h, Parser p) { - super.start(h,p); me = h; toSend.set(p.messageCount()); } diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerURB.java b/257844/src/main/java/cs451/net/handler/NetHandlerURB.java index e260419..a102c76 100644 --- a/257844/src/main/java/cs451/net/handler/NetHandlerURB.java +++ b/257844/src/main/java/cs451/net/handler/NetHandlerURB.java @@ -1,6 +1,7 @@ package cs451.net.handler; import cs451.net.event.NetEvent; +import cs451.net.event.message.NetMessageAbstract; import cs451.parser.Host; import cs451.parser.Parser; import cs451.tools.Pair; @@ -22,14 +23,10 @@ import java.util.concurrent.*; */ public class NetHandlerURB extends NetEventHandlerAbstract { - - private final Set correct = ConcurrentHashMap.newKeySet(); - private final Map, Set> ack = new ConcurrentHashMap<>(); - private final Set> delivered = ConcurrentHashMap.newKeySet(); - private final Set> pending = ConcurrentHashMap.newKeySet(); - - private Integer myId; - + private static final Set correct = ConcurrentHashMap.newKeySet(); + private static final Map, Set> ack = new ConcurrentHashMap<>(); + private static final Set> delivered = ConcurrentHashMap.newKeySet(); + private static final Map, NetMessageAbstract> pending = new ConcurrentHashMap<>(); public NetHandlerURB(Class deliverLayer, Class broadcastLayer) { super(deliverLayer,broadcastLayer); @@ -37,15 +34,16 @@ public class NetHandlerURB extends NetEventHandlerAbstract { @Override public synchronized void deliverIf(){ - pending.stream().filter(smp -> ack.getOrDefault(smp,ConcurrentHashMap.newKeySet()).containsAll(correct)). + pending.keySet().stream().filter(smp -> + ack.getOrDefault(smp,ConcurrentHashMap.newKeySet()).containsAll(correct)). filter(delivered::add). - forEach(smp -> deliverNextAsync(NetEvent.Message(smp.first(),smp.second()))); + forEach(smp -> deliverNextAsync(NetEvent.Message(pending.get(smp)))); } @Override public void broadcast(NetEvent ne) { - pending.add(new Pair<>(myId, ne.message.id)); - ne.message.src = myId; + pending.put(new Pair<>(ne.message.src, ne.message.id),NetMessageAbstract.MSG(ne.message)); + requestCondBD(true); broadcastNextSync(ne); } @@ -59,25 +57,25 @@ public class NetHandlerURB extends NetEventHandlerAbstract { v.add(ne.peer.getId()); return v; }); - if(pending.add(smp)){ + if(pending.putIfAbsent(smp, NetMessageAbstract.MSG(ne.message))==null){ broadcastNextAsync(ne); } + requestCondBD(true); } @Override public void crash(NetEvent ne) { correct.remove(ne.peer.getId()); + requestCondBD(true); } @Override public boolean isDone() { - return delivered.containsAll(pending); + return delivered.containsAll(pending.keySet()); } @Override public void start(Host h, Parser p) { - super.start(h,p); p.hosts().forEach(ch-> correct.add(ch.getId())); - myId = h.getId(); } } \ No newline at end of file diff --git a/257844/src/main/java/cs451/parser/ConfigParser.java b/257844/src/main/java/cs451/parser/ConfigParser.java index 5837cbe..1630399 100644 --- a/257844/src/main/java/cs451/parser/ConfigParser.java +++ b/257844/src/main/java/cs451/parser/ConfigParser.java @@ -4,12 +4,23 @@ import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class ConfigParser { private String path; private int messages; + private final ConcurrentHashMap> causality = new ConcurrentHashMap<>(); + + public ConfigParser(List hosts) { + for (Host host : hosts) { + causality.put(host.getId(),new ArrayList<>()); + } + } public boolean populate(String value) { @@ -17,14 +28,22 @@ public class ConfigParser { path = file.getPath(); try (BufferedReader br = new BufferedReader(new FileReader(path))) { - int lineNum = 1; + int lineNum = 0; for (String line; (line = br.readLine()) != null; lineNum++) { - switch(lineNum){ - case 1: - messages = Integer.parseInt(line); - break; - default: - break; + System.out.println(line); + String[] lparts = line.trim().split(" "); + if (lineNum == 0){ + messages = Integer.parseInt(lparts[0]); + }else if (lineNum>=1 && lineNum<=causality.keySet().size()){ + Integer hid = Integer.parseInt(lparts[0]); + List vl = causality.get(hid); + if(vl != null){ + for(int i = 1; i < lparts.length; ++i){ + vl.add(Integer.parseInt(lparts[i])); + } + } + }else{ + break; } } } catch (IOException e) { @@ -41,4 +60,8 @@ public class ConfigParser { public int getMessages() { return messages; } + + public Map> getCausality() { + return causality; + } } diff --git a/257844/src/main/java/cs451/parser/Parser.java b/257844/src/main/java/cs451/parser/Parser.java index 3b6552a..f1dd36f 100644 --- a/257844/src/main/java/cs451/parser/Parser.java +++ b/257844/src/main/java/cs451/parser/Parser.java @@ -1,6 +1,7 @@ package cs451.parser; import java.util.List; +import java.util.Map; public class Parser { @@ -57,8 +58,9 @@ public class Parser { } if (argsNum == Constants.ARG_LIMIT_CONFIG) { - configParser = new ConfigParser(); + configParser = new ConfigParser(hosts()); if (!configParser.populate(args[Constants.CONFIG_VALUE])) { + help(); } } } @@ -108,4 +110,8 @@ public class Parser { return configParser.getMessages(); } + public Map> causality(){ + return configParser.getCausality(); + } + } diff --git a/257844/src/main/java/cs451/tools/ParamDetector.java b/257844/src/main/java/cs451/tools/ParamDetector.java index 7cb0342..cbe923d 100644 --- a/257844/src/main/java/cs451/tools/ParamDetector.java +++ b/257844/src/main/java/cs451/tools/ParamDetector.java @@ -30,8 +30,8 @@ public abstract class ParamDetector { int messages = p.messageCount(); int processCount = p.hosts().size(); - int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(4-(processCount/4.0))))); - int windowWidth = bound(coresPerProcess*windowWidthMult,4,messages); + int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(2-(processCount/2.0))))); + int windowWidth = bound(coresPerProcess*windowWidthMult,2,messages); System.out.println("Process expected to broadcast "+messages+" messages."); System.out.println("Starting Process with WindowWidth of "+windowWidth+" (~ x"+windowWidthMult+")."); @@ -42,5 +42,8 @@ public abstract class ParamDetector { //We might want to PingPong To set Custom Timing Limitations.... NetManager.FD_MAX_TRIES = 10; NetManager.FD_WAIT = 1000; + + NetManager.BUNDLE_SIZE = 2; + NetManager.WINDOW_WIDTH *= NetManager.BUNDLE_SIZE; } } diff --git a/bnr.sh b/bnr.sh index 8f20c9e..0c86326 100755 --- a/bnr.sh +++ b/bnr.sh @@ -1,12 +1,12 @@ #!/bin/bash -if [ $# -eq 2 ]; then +if [ $# -eq 3 ]; then export _JAVA_OPTIONS="-Xmx16G" ./257844/build.sh sudo echo 1 - echo "Running with $1 processes and $2 messages" - yes "" | ./validate.py -r 257844/run.sh -b fifo -l 257844/bin/logs -p $1 -m $2 + echo "Running $1 with $2 processes and $3 messages" + yes "" | ./validate.py -r 257844/run.sh -b $1 -l 257844/bin/logs -p $2 -m $3 else echo "Missing Arguments ..." - echo "Usage: $0 process_count message_count" + echo "Usage: $0 fifo|lcausal process_count message_count" fi diff --git a/validate.py b/validate.py index 0c8581d..46bba84 100755 --- a/validate.py +++ b/validate.py @@ -201,14 +201,144 @@ class FifoBroadcastValidation(Validation): return True class LCausalBroadcastValidation(Validation): - def __init__(self, processes, outputDir, causalRelationships): - super().__init__(processes, outputDir) def generateConfig(self): - raise NotImplementedError() + hosts = tempfile.NamedTemporaryFile(mode='w') + config = tempfile.NamedTemporaryFile(mode='w') - def checkProcess(self, pid): - raise NotImplementedError() + # Hosts file + for i in range(1, self.processes + 1): + hosts.write("{} localhost {}\n".format(i, PROCESSES_BASE_IP+i)) + hosts.flush() + + self.lcausalDeps = defaultdict(list) + + # # of messages + config.write("{}\n".format(self.messages)) + + # Config file; Create some random locality. Each process may have + # up to N processes / 2 locally dependent processes. + for thisproc in range(1, self.processes + 1): + config.write(str(thisproc) + " ") + + # Add random number of dependencies + deps = [i for i in range(1, self.processes + 1) if i != thisproc] + for j in range(0, random.randint(0, int(self.processes/2))): + depidx = random.randint(0, len(deps) - 1) + otherproc = deps[depidx] + config.write(str(otherproc) + " ") + self.lcausalDeps[thisproc].append(otherproc) + del deps[depidx] + config.write("\n") + + config.flush() + + return (hosts, config) + + def filePathForPID(self, pid): + return os.path.join(self.outputDirPath, 'proc{:02d}.output'.format(pid)) + + + def verifyCausality(self, fromPid, broadcastSeq, toPid, fromHistory): + ok = True + filename = self.filePathForPID(toPid) + toPidFile = open(filename) + toHistory = defaultdict(lambda: 0) + + for lineNumber, line in enumerate(toPidFile): + tokens = line.split() + if tokens[0] == 'd': + # Record most-recently received value from process. This is + # sufficient due to LCausal also adhering to FIFO. + if tokens[1] in fromHistory: + toHistory[tokens[1]] = int(tokens[2]) + + if tokens[1] == fromPid and tokens[2] == broadcastSeq: + # Found the dependent broadcast, ensure that history is consistent + # up to this point + for pid, seq in fromHistory.items(): + if toHistory[pid] < seq: + print("File {}, Line {}: \n\tDelivered dependent message {}:{} with unresolved dependency\n" + "\t Message was dependent on {}:{} but only delivered up to {}:{}".format(filename, lineNumber + 1,fromPid, broadcastSeq, pid,seq, pid, toHistory[pid])) + ok = False + + return ok + + + + def checkLCausal(self, pid): + if pid not in self.lcausalDeps: + return True + + depHistory = defaultdict(lambda: 0) + pidFile = open(self.filePathForPID(pid)) + + for line in pidFile.readlines(): + ltokens = line.split() + + # If token is broadcast, for each other process in the system, + # verify that that token was delivered with adherance to the + # causality as recorded in the @var depHistory up to this point + if ltokens[0] == 'b': + for i in range(1, self.processes+1): + if i == pid: + continue + if not self.verifyCausality( + fromPid=str(pid), + broadcastSeq=ltokens[1], + toPid=i, + fromHistory=depHistory + ): + return False + + # If token is deliver and is in the set of causally dependent processes, + # record it as being part of the dependent history up to this point. + if ltokens[0] == 'd' and int(ltokens[1]) in self.lcausalDeps[pid]: + depHistory[ltokens[1]] = int(ltokens[2]) + + return True + + + def checkFIFO(self, pid): + filePath = os.path.join(self.outputDirPath, 'proc{:02d}.output'.format(pid)) + + i = 1 + nextMessage = defaultdict(lambda : 1) + filename = os.path.basename(filePath) + + with open(filePath) as f: + for lineNumber, line in enumerate(f): + tokens = line.split() + + # Check broadcast + if tokens[0] == 'b': + msg = int(tokens[1]) + if msg != i: + print("File {}, Line {}: Messages broadcast out of order. Expected message {} but broadcast message {}".format(filename, lineNumber, i, msg)) + return False + i += 1 + + # Check delivery + if tokens[0] == 'd': + sender = int(tokens[1]) + msg = int(tokens[2]) + if msg != nextMessage[sender]: + print("File {}, Line {}: Message delivered out of order. Expected message {}, but delivered message {}".format(filename, lineNumber, nextMessage[sender], msg)) + return False + else: + nextMessage[sender] = msg + 1 + return True + + + def checkAll(self, continueOnError=True): + print("LCausal verification:") + for pid in range(1, self.processes+1): + ret = self.checkFIFO(pid) + ret &= self.checkLCausal(pid) + + if not ret and not continueOnError: + return False + return True class StressTest: def __init__(self, procs, concurrency, attempts, attemptsRatio): @@ -359,7 +489,7 @@ def main(processes, messages, runscript, broadcastType, logsDir, testConfig): if broadcastType == "fifo": validation = FifoBroadcastValidation(processes, messages, logsDir) else: - validation = LCausalBroadcastValidation(processes, messages, logsDir, None) + validation = LCausalBroadcastValidation(processes, messages, logsDir) hostsFile, configFile = validation.generateConfig()