1
0
This commit is contained in:
choelzl 2020-11-22 22:40:30 +01:00
parent ad59dd7c4d
commit ec8ee67745
Signed by: sora
GPG Key ID: A362EA0491E2EEA0
25 changed files with 879 additions and 352 deletions

View File

@ -1,6 +1,5 @@
package cs451; package cs451;
import cs451.net.event.Message;
import cs451.net.NetManager; import cs451.net.NetManager;
import cs451.net.event.NetEventType; import cs451.net.event.NetEventType;
import cs451.parser.Coordinator; import cs451.parser.Coordinator;
@ -17,23 +16,21 @@ import java.util.concurrent.ConcurrentLinkedQueue;
public class Main { public class Main {
private static String formatOutput(Integer id, Integer host, Message.TYPE tpe){ private static String formatOutput(Integer id, Integer host, NetEventType tpe){
switch (tpe){ switch (tpe){
case DATA: case DLVR:
return "d "+host+" "+id+"\n";
case ACK:
return "d "+host+" "+id+"\n"; return "d "+host+" "+id+"\n";
case BCST: case BCST:
return "b "+id+"\n"; return "b "+id+"\n";
default: default:
return ""; return "? "+host+" "+id+"\n";
} }
} }
private static void writeOutput(String filepath) throws IOException { private static void writeOutput(String filepath) throws IOException {
FileWriter writer = new FileWriter(filepath); FileWriter writer = new FileWriter(filepath);
for(Pair<Pair<Integer,Integer>, Message.TYPE> mhpt : mh_tpe){ for(Pair<Pair<Integer,Integer>, NetEventType> mhpt : mh_tpe){
try { try {
writer.write(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second())); writer.write(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second()));
} catch (IOException e) { } catch (IOException e) {
@ -65,7 +62,7 @@ public class Main {
} }
private static Host me = null; private static Host me = null;
private static final Queue<Pair<Pair<Integer,Integer>,Message.TYPE>> mh_tpe = new ConcurrentLinkedQueue<>(); private static final Queue<Pair<Pair<Integer,Integer>,NetEventType>> mh_tpe = new ConcurrentLinkedQueue<>();
private static Parser parser = null; private static Parser parser = null;
@ -103,13 +100,13 @@ public class Main {
NetManager.start(me,parser,(t,ne) -> { NetManager.start(me,parser,(t,ne) -> {
Logger.error(t+"(" + ne.message.src + "):" + ne.message.toString()); Logger.error(t+"(" + ne.message.src + "):" + ne.message.toString());
if(t == NetEventType.DLVR){ if(t == NetEventType.DLVR){
Pair<Pair<Integer, Integer>, Message.TYPE> mhpt = new Pair<>(new Pair<>(ne.message.id, ne.message.src), ne.message.tpe); Pair<Pair<Integer, Integer>, NetEventType> mhpt = new Pair<>(new Pair<>(ne.message.id, ne.message.src), t);
System.out.print(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second())); System.out.print(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second()));
mh_tpe.add(mhpt); mh_tpe.add(mhpt);
}else if(t== NetEventType.BCST){ }else if(t== NetEventType.BCST){
Pair<Pair<Integer, Integer>, Message.TYPE> mhpt = new Pair<>(new Pair<>(ne.message.id, ne.message.src), ne.message.tpe); Pair<Pair<Integer, Integer>, NetEventType> mhpt = new Pair<>(new Pair<>(ne.message.id, ne.message.src), t);
System.out.print(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second())); System.out.print(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second()));
mh_tpe.add(new Pair<>(new Pair<>(ne.message.id,ne.message.src),ne.message.tpe)); mh_tpe.add(mhpt);
} }
}, (t,ne) -> Logger.error("ERR"+t+" - "+ne.getMessage())); }, (t,ne) -> Logger.error("ERR"+t+" - "+ne.getMessage()));

View File

@ -8,6 +8,7 @@ import cs451.parser.Parser;
import cs451.tools.Logger; import cs451.tools.Logger;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
@ -39,6 +40,9 @@ public abstract class NetManager {
public static Integer WINDOW_WIDTH; public static Integer WINDOW_WIDTH;
public static Integer BUNDLE_SIZE;
public static List<Host> hosts;
private static boolean isStopped = false; private static boolean isStopped = false;
@ -72,23 +76,32 @@ public abstract class NetManager {
onErrorHandler = oeh; onErrorHandler = oeh;
registerNetHandler(new NetHandlerSCKT( NetHandlerFD.class, NetHandlerDFLT.class)); registerNetHandler(new NetHandlerSCKT( NetHandlerFD.class, NetHandlerDFLT.class));
registerNetHandler(new NetHandlerFD( NetHandlerSL.class, NetHandlerSCKT.class)); registerNetHandler(new NetHandlerFD( NetHandlerBNDL.class, NetHandlerSCKT.class));
registerNetHandler(new NetHandlerSL( NetHandlerPL.class, NetHandlerFD.class)); registerNetHandler(new NetHandlerBNDL( NetHandlerSL.class, NetHandlerFD.class));
registerNetHandler(new NetHandlerSL( NetHandlerPL.class, NetHandlerBNDL.class));
registerNetHandler(new NetHandlerPL( NetHandlerBEB.class, NetHandlerSL.class)); registerNetHandler(new NetHandlerPL( NetHandlerBEB.class, NetHandlerSL.class));
registerNetHandler(new NetHandlerBEB( NetHandlerURB.class, NetHandlerPL.class)); registerNetHandler(new NetHandlerBEB( NetHandlerURB.class, NetHandlerPL.class));
registerNetHandler(new NetHandlerURB( NetHandlerFIFO.class, NetHandlerBEB.class)); registerNetHandler(new NetHandlerURB( NetHandlerLCB.class, NetHandlerBEB.class));
registerNetHandler(new NetHandlerFIFO( NetHandlerTOPL.class, NetHandlerURB.class)); //registerNetHandler(new NetHandlerFIFO( NetHandlerLCB.class, NetHandlerURB.class));
registerNetHandler(new NetHandlerTOPL( NetHandlerDFLT.class, NetHandlerFIFO.class)); registerNetHandler(new NetHandlerLCB( NetHandlerTOPL.class, NetHandlerURB.class));
registerNetHandler(new NetHandlerTOPL( NetHandlerDFLT.class, NetHandlerLCB.class));
registerNetHandler(new NetHandlerDFLT( NetHandlerDFLT.class, NetHandlerDFLT.class)); registerNetHandler(new NetHandlerDFLT( NetHandlerDFLT.class, NetHandlerDFLT.class));
hosts = p.hosts();
nm_listeners.values().forEach(neh -> neh.start(h,p)); nm_listeners.values().forEach(neh -> neh.start(h,p));
ex = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT+THREAD_BOOST_COUNT,30,TimeUnit.SECONDS,new LinkedBlockingQueue<>()); ex = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT+THREAD_BOOST_COUNT,30,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
ex.prestartAllCoreThreads(); ex.prestartAllCoreThreads();
i_tex.scheduleAtFixedRate(()-> { i_tex.scheduleAtFixedRate(()-> {
System.err.println("NetManager DeliverIf/BroadcastIf"); System.err.println("NetManager DeliverIf/BroadcastIf");
nm_listeners.values().forEach(NetEventHandlerInterface::deliverIf); nm_listeners.values().forEach(neh->{
nm_listeners.values().forEach(NetEventHandlerInterface::broadcastIf); if(neh.condbd.getAndSet(false)){
neh.deliverIf();
neh.broadcastIf();
}
});
}, 0, INTERNAL_WAIT, TimeUnit.MILLISECONDS); }, 0, INTERNAL_WAIT, TimeUnit.MILLISECONDS);
fd_tex.scheduleAtFixedRate(()-> { fd_tex.scheduleAtFixedRate(()-> {
@ -116,13 +129,16 @@ public abstract class NetManager {
* @return true if NM and NEH are done * @return true if NM and NEH are done
*/ */
public static boolean isDone() { public static boolean isDone() {
return isStopped || nm_listeners.values().stream().map(nmh ->{ return isStopped ||
if(!nmh.isDone()){ (ex.getActiveCount()==0 &&
System.err.println("NetManager Waiting for: "+nmh.getClass().getSimpleName()); ex.getQueue().size()==0 &&
return false; nm_listeners.values().stream().allMatch(nmh->{
} if(!nmh.isDone()){
return true; System.out.println("Waiting for "+nmh.getClass().getSimpleName());
}).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2); return false;
}
return true;
}));
} }
@ -212,7 +228,6 @@ public abstract class NetManager {
} }
} }
} }
} }

View File

@ -1,164 +0,0 @@
package cs451.net.event;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
/**
* Message abstraction class
*
* @author C. Hölzl
*/
public class Message implements Comparable<Message> {
/**
* Type of Message
*/
public enum TYPE {
NONE("NONE", 'N'),
ERR("ERR", 'E'),
BCST("BCST", 'B'),
DATA("DATA", ' '),
ACK("ACK", 'A'),
HRTB("HRTB",'H');
public final Character c;
public final String tag;
TYPE(String tag, Character c){
this.tag = tag;
this.c = c;
}
}
public Integer src = -1;
public final Integer id;
public TYPE tpe;
/**
* Creates a new message from a given message
* @param m Message to copy
* @return Message
*/
public static Message MSG(Message m) {
return new Message(m);
}
/**
* Build a Data Message
* @param id message id
* @return Message
*/
public static Message DMSG(Integer id){
return new Message(id,TYPE.DATA);
}
public static Message DMSG(Integer id, Integer src) {
return new Message(id,TYPE.DATA,src);
}
/**
* Build a Heartbeat Message
* @return Message
*/
public static Message HRTB(){
return new Message(0,TYPE.HRTB);
}
/**
* Build a Message of a given type
* @param mess_id Message ID
* @param tpe Message Type
* @return Message
*/
public static Message TMSG(Integer mess_id, TYPE tpe) {
return new Message(mess_id, tpe);
}
/**
* @return Empty Message with no type nor ID
*/
public static Message EMPTY(){
return new Message(-1, TYPE.NONE);
}
/**
* Converts a character to a given type
* @param c char to convert
* @return Type
*/
private static TYPE CharacterToTpe(Character c){
return Arrays.stream(TYPE.values()).filter(type -> type.c==c).findFirst().orElse(TYPE.NONE);
}
/**
* Create a message from a ByteBuffer
* @param b buffer
* @return Message
*/
public static Message FromBuffer(ByteBuffer b) {
Character tpe = b.getChar();
Integer id = b.getInt();
Integer src = b.getInt();
return new Message(id,CharacterToTpe(tpe),src);
}
/**
* Puts a message into a ByteBuffer
* @param b buffer
*/
public void ToBuffer(ByteBuffer b){
b.putChar(tpe.c);
b.putInt(id);
b.putInt(src);
}
private Message(Message m) {
this.id = m.id;
this.tpe = m.tpe;
this.src = m.src;
}
private Message(Integer id, TYPE tpe){
this.id = id;
this.tpe = tpe;
}
private Message(Integer id, TYPE tpe, Integer src){
this.id = id;
this.tpe = tpe;
this.src = src;
}
@Override
public String toString() {
if (tpe == TYPE.DATA) {
return src + "-" + id;
}
return src + "-" + tpe.c + id;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final Message m = (Message) obj;
return id.equals(m.id) && src.equals(m.src);
}
@Override
public int hashCode() {
return Objects.hash(id,src);
}
@Override
public int compareTo(Message m) {
return id.compareTo(m.id);
}
}

View File

@ -1,62 +1,48 @@
package cs451.net.event; package cs451.net.event;
import cs451.net.event.message.Message;
import cs451.net.event.message.NetMessageAbstract;
import cs451.parser.Host; import cs451.parser.Host;
/** /**
* NetEvent abstraction class * NetEvent abstraction class
* *
* Holds a {@link cs451.net.event.Message} and a {@link cs451.parser.Host} * Holds a {@link Message} and a {@link cs451.parser.Host}
* *
* @author C. Hölzl * @author C. Hölzl
*/ */
public class NetEvent { public class NetEvent {
public final Host peer; public final Host peer;
public final Message message; public final NetMessageAbstract message;
private NetEvent(Host peer, Message message) { private NetEvent(Host peer, NetMessageAbstract m) {
this.peer = peer; this.peer = peer;
this.message = message; this.message = m;
} }
public static NetEvent Message(NetMessageAbstract m){
public static NetEvent Message(Host peer, Integer mess_id){ return new NetEvent(NO_PEER, NetMessageAbstract.MSG(m));
return new NetEvent(peer, Message.DMSG(mess_id));
} }
public static NetEvent Message(Host peer, Integer mess_id, Message.TYPE tpe){ public static NetEvent Message(Host peer, NetMessageAbstract m){
return new NetEvent(peer, Message.TMSG(mess_id,tpe)); return new NetEvent(peer, NetMessageAbstract.MSG(m));
} }
public static NetEvent Message(Host peer, Message m){ public static NetEvent MessageACK(Host peer, NetMessageAbstract m) {
return new NetEvent(peer, Message.MSG(m)); return new NetEvent(peer,NetMessageAbstract.ACK(m));
}
public static NetEvent Message(Message m){
return new NetEvent(NO_PEER,Message.MSG(m));
}
public static NetEvent Message(Integer mess_id){
return new NetEvent(NO_PEER, Message.DMSG(mess_id));
}
public static NetEvent Message(Integer src, Integer mess_id) {
return new NetEvent(NO_PEER, Message.DMSG(mess_id,src));
}
public static NetEvent MessageACK(Host peer, Message message) {
NetEvent ne = NetEvent.Message(peer,message);
ne.message.tpe = Message.TYPE.ACK;
return ne;
} }
public static NetEvent MessageHRTB(Host peer){ public static NetEvent MessageHRTB(Host peer){
return NetEvent.Message(peer,Message.HRTB()); return new NetEvent(peer,NetMessageAbstract.HRTB());
} }
public static NetEvent MessageCRSH(Host peer){
return new NetEvent(peer,NetMessageAbstract.EMPTY());
}
public static NetEvent EMPTY(){ public static NetEvent EMPTY(){
return new NetEvent(NO_PEER, Message.EMPTY()); return new NetEvent(NO_PEER, NetMessageAbstract.EMPTY());
} }
public static final Host NO_PEER = null; public static final Host NO_PEER = null;

View File

@ -10,5 +10,5 @@ public enum NetEventType {
DLVR, DLVR,
BCST, BCST,
CRSH, CRSH,
RCVR; RCVR
} }

View File

@ -0,0 +1,112 @@
package cs451.net.event.message;
import java.nio.ByteBuffer;
import java.util.HashMap;
/**
* Message abstraction class
*
* @author C. Hölzl
*/
public class Message extends NetMessageAbstract {
public final HashMap<Integer,Integer> dependencies;
private Message(Integer src, Integer id, TYPE tpe, HashMap<Integer,Integer> dependencies){
super(src, id, tpe);
this.dependencies = dependencies;
}
/**
* Create a message from a ByteBuffer
* @param b buffer
* @return Message
*/
public static Message FromBuffer(ByteBuffer b) {
if(b.remaining() < 14) return getBuilder().build();
Character tpe = b.getChar();
int id = b.getInt();
int src = b.getInt();
int depLen = b.getInt();
HashMap<Integer,Integer> newdep = new HashMap<>(depLen);
for (int i = 0; i < depLen; i++) {
if(b.remaining()<8) break;
newdep.put(b.getInt(),b.getInt());
}
return new Message(src, id, NetMessageInterface.CharacterToTpe(tpe), newdep);
}
/**
* Puts a message into a ByteBuffer
* @param b buffer
*/
public void ToBuffer(ByteBuffer b){
if(b.remaining() < 14 + 8*dependencies.keySet().size()) return;
b.putChar(tpe.c);
b.putInt(id);
b.putInt(src);
b.putInt(dependencies.keySet().size());
for (Integer i : dependencies.keySet()) {
b.putInt(i);
b.putInt(dependencies.get(i));
}
}
@Override
public String toString() {
return src + "-" + tpe.c + id + " - " +dependencies.toString();
}
public static final Message HRTB_MSG = Message.getBuilder().setId(0).setTpe(TYPE.HRTB).build();
public static MessageBuilder getBuilder(){
return new MessageBuilder();
}
public static MessageBuilder getBuilder(NetMessageAbstract m){
if (m instanceof Message)
return new MessageBuilder((Message) m);
return getBuilder();
}
public static class MessageBuilder{
private int id = -1;
private int src = -1;
private TYPE tpe = TYPE.NONE;
private HashMap<Integer,Integer> dependencies = new HashMap<>(0);
public MessageBuilder(){}
public MessageBuilder(Message m){
this.id = m.id;
this.src = m.src;
this.tpe = m.tpe;
this.dependencies = m.dependencies;
}
public Message build(){
return new Message(src,id,tpe,dependencies);
}
public MessageBuilder setTpe(TYPE tpe) {
this.tpe = tpe;
return this;
}
public MessageBuilder setId(int id) {
this.id = id;
return this;
}
public MessageBuilder setSrc(int src) {
this.src = src;
return this;
}
public MessageBuilder setDependencies(HashMap<Integer,Integer> dependencies) {
this.dependencies = dependencies;
return this;
}
}
}

View File

@ -0,0 +1,118 @@
package cs451.net.event.message;
import cs451.net.NetManager;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
public class MessageBundle extends NetMessageAbstract {
public final List<Message> messageList;
private MessageBundle(int src, int id, TYPE tpe, List<Message> messageList) {
super(src,id,tpe);
this.messageList = messageList;
}
public static NetMessageAbstract FromBuffer(ByteBuffer b) {
ArrayList<Message> msgl = new ArrayList<>();
int mlen = Math.min(NetManager.BUNDLE_SIZE+1, b.getInt());
for (int i = 0; i< mlen; i++) {
Message m = Message.FromBuffer(b);
if (m.tpe == TYPE.NONE)
break;
msgl.add(m);
}
return getBuilder().setMessageList(msgl).build();
}
@Override
public void ToBuffer(ByteBuffer b) {
if (b.remaining() < 4) return;
b.putInt(messageList.size());
for (Message message : messageList) {
message.ToBuffer(b);
}
}
@Override
public String toString() {
return tpe.c + "-" + Arrays.toString(messageList.toArray());
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final MessageBundle m = (MessageBundle) obj;
return messageList.equals(m.messageList);
}
@Override
public int hashCode() {
return Objects.hash(messageList);
}
@Override
public int compareTo(NetMessageAbstract message) {
return 0;
}
public static MessageBundleBuilder getBuilder(){
return new MessageBundleBuilder();
}
public static MessageBundleBuilder getBuilder(NetMessageAbstract m){
if (m instanceof MessageBundle)
return new MessageBundleBuilder((MessageBundle) m);
return getBuilder();
}
public static class MessageBundleBuilder{
private int id = -1;
private int src = -1;
private TYPE tpe = TYPE.NONE;
private List<Message> messageList = new ArrayList<>(0);
public MessageBundleBuilder(){}
public MessageBundleBuilder(MessageBundle m){
this.id = m.id;
this.src = m.src;
this.tpe = m.tpe;
this.messageList = m.messageList;
}
public MessageBundle build(){
return new MessageBundle(src,id,tpe,messageList);
}
public MessageBundleBuilder setTpe(TYPE tpe) {
this.tpe = tpe;
return this;
}
public MessageBundleBuilder setId(int id) {
this.id = id;
return this;
}
public MessageBundleBuilder setSrc(int src) {
this.src = src;
return this;
}
public MessageBundleBuilder setMessageList(List<Message> dependencies) {
this.messageList = dependencies;
return this;
}
}
}

View File

@ -0,0 +1,89 @@
package cs451.net.event.message;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
public abstract class NetMessageAbstract implements NetMessageInterface {
public final TYPE tpe;
public final Integer src;
public final Integer id;
public NetMessageAbstract(Integer src,Integer id, TYPE tpe) {
this.src = src;
this.id = id;
this.tpe = tpe;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final NetMessageAbstract m = (NetMessageAbstract) obj;
return id.equals(m.id) && src.equals(m.src);
}
@Override
public int hashCode() {
return Objects.hash(id,src);
}
@Override
public int compareTo(NetMessageAbstract m) {
return id.compareTo(m.id);
}
public static NetMessageAbstract MSG(NetMessageAbstract m) {
if (m instanceof Message)
return Message.getBuilder(m).build();
if (m instanceof MessageBundle)
return MessageBundle.getBuilder(m).build();
return EMPTY();
}
/**
* Build a Ack Message
* @return Message
*/
public static NetMessageAbstract ACK(NetMessageAbstract m){
if(m instanceof Message)
return Message.getBuilder(m).setTpe(TYPE.ACK).setDependencies(new HashMap<>()).build();
return EMPTY();
}
/**
* Build a Heartbeat Message
* @return Message
*/
public static NetMessageAbstract HRTB(){
return MessageBundle.getBuilder().setMessageList(List.of(Message.HRTB_MSG)).setTpe(TYPE.HRTB).build();
}
public boolean isHeartBeat(){
if (this instanceof Message){
return this.tpe==TYPE.HRTB;
}else if (this instanceof MessageBundle){
return ((MessageBundle)this).messageList.size()>0 &&
((MessageBundle)this).messageList.get(0).tpe==TYPE.HRTB;
}else{
return false;
}
}
/**
* @return Empty Message with no type nor ID
*/
public static Message EMPTY(){
return Message.getBuilder().build();
}
}

View File

@ -0,0 +1,49 @@
package cs451.net.event.message;
import java.nio.ByteBuffer;
import java.util.Arrays;
public interface NetMessageInterface extends Comparable<NetMessageAbstract> {
/**
* Type of Message
*/
enum TYPE {
NONE("NONE", 'N'),
DATA("DATA", 'D'),
ACK("ACK", 'A'),
HRTB("HRTB",'H');
public final Character c;
public final String tag;
TYPE(String tag, Character c){
this.tag = tag;
this.c = c;
}
}
/**
* Converts a character to a given type
* @param c char to convert
* @return Type
*/
static TYPE CharacterToTpe(Character c){
return Arrays.stream(TYPE.values()).filter(type -> type.c==c).findFirst().orElse(TYPE.NONE);
}
/**
* Create a message from a ByteBuffer
*
* @param b buffer
* @return Message
*/
static NetMessageAbstract FromBuffer(ByteBuffer b) {
return MessageBundle.FromBuffer(b);
}
/**
* Puts a message into a ByteBuffer
* @param b buffer
*/
void ToBuffer(ByteBuffer b);
}

View File

@ -5,6 +5,8 @@ import cs451.net.event.NetEvent;
import cs451.parser.Host; import cs451.parser.Host;
import cs451.parser.Parser; import cs451.parser.Parser;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* NetEventHandler abstraction class * NetEventHandler abstraction class
* *
@ -12,13 +14,14 @@ import cs451.parser.Parser;
*/ */
public abstract class NetEventHandlerAbstract implements NetEventHandlerInterface { public abstract class NetEventHandlerAbstract implements NetEventHandlerInterface {
/** /**
* Deliver & Broadcast Layers to handle resulting operation * Deliver & Broadcast Layers to handle resulting operation
*/ */
private final Class<? extends NetEventHandlerAbstract> deliverLayer; private final Class<? extends NetEventHandlerAbstract> deliverLayer;
private final Class<? extends NetEventHandlerAbstract> broadcastLayer; private final Class<? extends NetEventHandlerAbstract> broadcastLayer;
public final AtomicBoolean condbd = new AtomicBoolean(false);
/** /**
* Initialized the main NetEventHandler fields * Initialized the main NetEventHandler fields
@ -30,6 +33,12 @@ public abstract class NetEventHandlerAbstract implements NetEventHandlerInterfac
this.broadcastLayer = broadcastLayer; this.broadcastLayer = broadcastLayer;
} }
/**
* Requests a conditional Broadcast/Delivery
*/
public void requestCondBD(Boolean v){
condbd.set(v);
}
/** /**
* Delivers a NetEvent Synchronously * Delivers a NetEvent Synchronously

View File

@ -1,12 +1,10 @@
package cs451.net.handler; package cs451.net.handler;
import cs451.net.NetManager;
import cs451.net.event.NetEvent; import cs451.net.event.NetEvent;
import cs451.parser.Host; import cs451.parser.Host;
import cs451.parser.Parser; import cs451.parser.Parser;
import java.util.List;
import java.util.stream.Collectors;
/** /**
* NetEventHandler for Best Effort Broadcast * NetEventHandler for Best Effort Broadcast
* *
@ -16,8 +14,7 @@ import java.util.stream.Collectors;
*/ */
public class NetHandlerBEB extends NetEventHandlerAbstract { public class NetHandlerBEB extends NetEventHandlerAbstract {
private List<Host> hosts; private static Host h;
private Host h;
public NetHandlerBEB(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) { public NetHandlerBEB(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
super(deliverLayer,broadcastLayer); super(deliverLayer,broadcastLayer);
@ -26,14 +23,13 @@ public class NetHandlerBEB extends NetEventHandlerAbstract {
@Override @Override
public void broadcast(NetEvent ne) { public void broadcast(NetEvent ne) {
deliverNextAsync(NetEvent.Message(h,ne.message)); deliverNextAsync(NetEvent.Message(h,ne.message));
hosts.parallelStream().forEach(h -> broadcastNextSync(NetEvent.Message(h, ne.message))); NetManager.hosts.parallelStream().filter(ch->ch.getId()!=h.getId()).forEach(h -> broadcastNextSync(NetEvent.Message(h, ne.message)));
} }
@Override @Override
public void start(Host h, Parser p) { public void start(Host h, Parser p) {
super.start(h,p); super.start(h,p);
this.h = h; NetHandlerBEB.h = h;
hosts = p.hosts().stream().filter(ch -> ch.getId()!=h.getId()).collect(Collectors.toList());
} }
} }

View File

@ -0,0 +1,68 @@
package cs451.net.handler;
import cs451.net.NetManager;
import cs451.net.event.NetEvent;
import cs451.net.event.message.Message;
import cs451.net.event.message.MessageBundle;
import cs451.parser.Host;
import cs451.parser.Parser;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
/**
* NetEventHandler for BLUNDLER
*
* Groups Multiple messages into larger packets (and de-groups them on receive
*
* @author C. Hölzl
*/
public class NetHandlerBNDL extends NetEventHandlerAbstract {
private static final ConcurrentHashMap<Integer, PriorityBlockingQueue<Message>> sending = new ConcurrentHashMap<>();
public NetHandlerBNDL(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
super(deliverLayer, broadcastLayer);
}
@Override
public synchronized void broadcastIf() {
NetManager.hosts.forEach(h-> sending.computeIfPresent(h.getId(),(hid, mq)->{
while(mq.size()>0){
ArrayList<Message> msgl = new ArrayList<>(NetManager.BUNDLE_SIZE);
mq.drainTo(msgl, NetManager.BUNDLE_SIZE);
broadcastNextAsync(NetEvent.Message(h, MessageBundle.getBuilder().setMessageList(msgl).build()));
}
return mq;
}));
}
@Override
public void broadcast(NetEvent ne) {
sending.computeIfPresent(ne.peer.getId(),(h, q) -> {
if(ne.message instanceof Message) {
q.add((Message) ne.message);
requestCondBD(true);
}
return q;
});
}
@Override
public void deliver(NetEvent ne) {
if (ne.message instanceof MessageBundle) {
((MessageBundle) ne.message).messageList.forEach(msg -> deliverNextAsync(NetEvent.Message(ne.peer,msg)));
}
}
@Override
public boolean isDone() {
return sending.values().stream().allMatch(v->v.size()==0);
}
@Override
public void start(Host h, Parser p) {
p.hosts().stream().filter(ch->ch.getId()!=h.getId()).forEach(ch->sending.put(ch.getId(),new PriorityBlockingQueue<>()));
}
}

View File

@ -1,12 +1,10 @@
package cs451.net.handler; package cs451.net.handler;
import cs451.net.NetManager; import cs451.net.NetManager;
import cs451.net.event.Message;
import cs451.net.event.NetEvent; import cs451.net.event.NetEvent;
import cs451.parser.Host; import cs451.parser.Host;
import cs451.parser.Parser; import cs451.parser.Parser;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -19,8 +17,8 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class NetHandlerFD extends NetEventHandlerAbstract { public class NetHandlerFD extends NetEventHandlerAbstract {
private List<Host> hosts;
private static final ConcurrentHashMap<Integer,Integer> alive = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<Integer,Integer> alive = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Integer,Integer> should_I = new ConcurrentHashMap<>();
public NetHandlerFD(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) { public NetHandlerFD(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
super(deliverLayer, broadcastLayer); super(deliverLayer, broadcastLayer);
@ -28,20 +26,24 @@ public class NetHandlerFD extends NetEventHandlerAbstract {
@Override @Override
public synchronized void beat() { public synchronized void beat() {
hosts.forEach(h-> { NetManager.hosts.forEach(h-> {
alive.computeIfPresent(h.getId(),(k, v) -> { should_I.computeIfPresent(h.getId(),(k, v) -> {
broadcastNextSync(NetEvent.MessageHRTB(h)); if(v>=0)
broadcastNextSync(NetEvent.MessageHRTB(h));
return v+1; return v+1;
}); });
alive.computeIfPresent(h.getId(), (k,v) -> v+1);
if(alive.getOrDefault(h.getId(),0) > NetManager.FD_MAX_TRIES){ if(alive.getOrDefault(h.getId(),0) > NetManager.FD_MAX_TRIES){
crashNextSync(NetEvent.Message(h, Message.EMPTY())); crashNextSync(NetEvent.MessageCRSH(h));
alive.remove(h.getId()); alive.remove(h.getId());
should_I.remove(h.getId());
} }
}); });
} }
@Override @Override
public void broadcast(NetEvent ne) { public void broadcast(NetEvent ne) {
should_I.computeIfPresent(ne.peer.getId(),(k, v)->-1);
if (alive.containsKey(ne.peer.getId())){ if (alive.containsKey(ne.peer.getId())){
broadcastNextSync(ne); broadcastNextSync(ne);
} }
@ -49,16 +51,17 @@ public class NetHandlerFD extends NetEventHandlerAbstract {
@Override @Override
public void deliver(NetEvent ne) { public void deliver(NetEvent ne) {
alive.computeIfPresent(ne.peer.getId(),(k,v)->0); alive.computeIfPresent(ne.peer.getId(), (k,v) -> 0);
if (ne.message.tpe != Message.TYPE.HRTB) { if (!ne.message.isHeartBeat()) {
deliverNextSync(ne); deliverNextSync(ne);
} }
} }
@Override @Override
public void start(Host h, Parser p) { public void start(Host h, Parser p) {
super.start(h, p); NetManager.hosts.stream().filter(ch->ch.getId()!=h.getId()).forEach(ch-> {
hosts = p.hosts(); alive.put(ch.getId(),-1);
hosts.stream().filter(ch->ch.getId()!=h.getId()).forEach(ch-> alive.put(ch.getId(),-1)); should_I.put(ch.getId(),-1);
});
} }
} }

View File

@ -1,13 +1,12 @@
package cs451.net.handler; package cs451.net.handler;
import cs451.net.NetManager; import cs451.net.event.*;
import cs451.net.event.Message; import cs451.net.event.message.NetMessageAbstract;
import cs451.net.event.NetEvent;
import cs451.net.event.NetEventType;
import cs451.parser.Host; import cs451.parser.Host;
import cs451.parser.Parser; import cs451.parser.Parser;
import java.util.Map; import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -24,50 +23,53 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class NetHandlerFIFO extends NetEventHandlerAbstract { public class NetHandlerFIFO extends NetEventHandlerAbstract {
private final AtomicInteger sn = new AtomicInteger(1); private static final ConcurrentHashMap<Integer,AtomicInteger> rsn = new ConcurrentHashMap<>();
private final Map<Integer,Integer> rsn = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<Integer,PriorityBlockingQueue<NetMessageAbstract>> pending = new ConcurrentHashMap<>();
private final PriorityBlockingQueue<Message> pending = new PriorityBlockingQueue<>();
private Host me;
public NetHandlerFIFO(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) { public NetHandlerFIFO(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
super(deliverLayer,broadcastLayer); super(deliverLayer,broadcastLayer);
} }
@Override
public synchronized void broadcast(NetEvent ne) {
Integer snv = sn.getAndIncrement();
NetManager.complete(NetEventType.BCST, NetEvent.Message(me, snv, Message.TYPE.BCST));
broadcastNextAsync(NetEvent.Message(me.getId(),snv));
}
@Override @Override
public synchronized void deliverIf(){ public synchronized void deliverIf(){
pending.removeIf(hmp -> { pending.forEach((hid, mq) -> {
Integer crsn = rsn.getOrDefault(hmp.src, 1); AtomicInteger crsn = rsn.get(hid);
if (hmp.id.equals(crsn)) { if(crsn == null) return;
NetManager.complete(NetEventType.DLVR,NetEvent.Message(hmp)); Collection<NetMessageAbstract> msgc = new ArrayList<>();
deliverNextSync(NetEvent.Message(hmp)); mq.drainTo(msgc);
rsn.put(hmp.src, crsn + 1); msgc.forEach(msg ->
return true; crsn.getAndUpdate(v -> {
} if (msg.id.equals(v)){
return false; deliverNextSync(NetEvent.Message(msg));
return v+1;
}else{
mq.add(msg);
return v;
}
})
);
}); });
} }
@Override @Override
public void deliver(NetEvent ne) { public void deliver(NetEvent ne) {
pending.add(ne.message); pending.computeIfPresent(ne.message.src, (h, mq) -> {
mq.add(ne.message);
requestCondBD(true);
return mq;
});
} }
@Override @Override
public boolean isDone() { public boolean isDone() {
return pending.stream().noneMatch(hmp -> hmp.id.equals(rsn.getOrDefault(hmp.src, 1))); return pending.values().stream().allMatch(hmp -> hmp.stream().noneMatch(msg-> msg.id.equals(rsn.getOrDefault(msg.src, new AtomicInteger(1)).get())));
} }
@Override @Override
public void start(Host h, Parser p) { public void start(Host h, Parser p) {
super.start(h,p); for (Host host : p.hosts()) {
me = h; pending.put(host.getId(),new PriorityBlockingQueue<>());
rsn.put(host.getId(), new AtomicInteger(1));
}
} }
} }

View File

@ -0,0 +1,98 @@
package cs451.net.handler;
import cs451.net.event.NetEvent;
import cs451.net.event.message.Message;
import cs451.parser.Host;
import cs451.parser.Parser;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class NetHandlerLCB extends NetEventHandlerAbstract {
private static Map<Integer, List<Integer>> causality;
private static final ConcurrentHashMap<Integer, PriorityBlockingQueue<Message>> undelivered = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Integer, AtomicInteger> state = new ConcurrentHashMap<>();
public NetHandlerLCB(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
super(deliverLayer, broadcastLayer);
}
@Override
public synchronized void deliverIf() {
AtomicBoolean deliveredOne = new AtomicBoolean(false);
do {
deliveredOne.set(false);
undelivered.forEach((hid, mq) -> {
AtomicInteger crsn = state.get(hid);
if(crsn == null) return;
Collection<Message> msgc = new ArrayList<>();
mq.drainTo(msgc);
msgc.forEach(msg ->
crsn.getAndUpdate(v -> {
if (msg.id.equals(v+1) && checkDependencies(msg.src,msg.dependencies)){
deliverNextSync(NetEvent.Message(msg));
deliveredOne.set(true);
return v+1;
}else{
mq.add(msg);
return v;
}
})
);
});
}while((deliveredOne.get()));
}
@Override
public void broadcast(NetEvent ne) {
broadcastNextSync(NetEvent.Message(Message.getBuilder(ne.message).setDependencies(getDependencies(ne.message.src)).build()));
}
@Override
public void deliver(NetEvent ne) {
undelivered.computeIfPresent(ne.message.src,(k, lv) -> {
lv.add((Message) ne.message);
return lv;
});
requestCondBD(true);
}
@Override
public void start(Host h, Parser p) {
super.start(h, p);
causality = p.causality();
for (Host host : p.hosts()) {
undelivered.put(host.getId(), new PriorityBlockingQueue<>());
state.put(host.getId(), new AtomicInteger(0));
}
}
@Override
public boolean isDone() {
return undelivered.values().stream().allMatch(AbstractCollection::isEmpty);
}
private HashMap<Integer,Integer> getDependencies(Integer hid){
HashMap<Integer,Integer> dependencies = new HashMap<>();
List<Integer> hc = causality.getOrDefault(hid, Collections.emptyList());
for (Integer hci : hc) {
dependencies.put(hci, state.get(hci).get());
}
return dependencies;
}
private Boolean checkDependencies(Integer hid, HashMap<Integer,Integer> md){
HashMap<Integer,Integer> cd = getDependencies(hid);
for(Integer k : cd.keySet()){
if(cd.get(k) < md.get(k)){
return false;
}
}
return true;
}
}

View File

@ -1,6 +1,7 @@
package cs451.net.handler; package cs451.net.handler;
import cs451.net.event.*; import cs451.net.event.*;
import cs451.net.event.message.NetMessageInterface;
import cs451.tools.Pair; import cs451.tools.Pair;
import java.util.Set; import java.util.Set;
@ -15,7 +16,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class NetHandlerPL extends NetEventHandlerAbstract { public class NetHandlerPL extends NetEventHandlerAbstract {
private final Set<Pair<Integer,Message>> delivered = ConcurrentHashMap.newKeySet(); private static final Set<Pair<Integer, NetMessageInterface>> delivered = ConcurrentHashMap.newKeySet();
public NetHandlerPL(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) { public NetHandlerPL(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
super(deliverLayer,broadcastLayer); super(deliverLayer,broadcastLayer);

View File

@ -1,15 +1,14 @@
package cs451.net.handler; package cs451.net.handler;
import cs451.net.event.Message;
import cs451.net.NetManager; import cs451.net.NetManager;
import cs451.net.event.*; import cs451.net.event.*;
import cs451.net.event.message.NetMessageInterface;
import cs451.parser.Host; import cs451.parser.Host;
import cs451.parser.Parser; import cs451.parser.Parser;
import java.io.IOException; import java.io.IOException;
import java.net.*; import java.net.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -24,14 +23,10 @@ import java.util.concurrent.Executors;
*/ */
public class NetHandlerSCKT extends NetEventHandlerAbstract { public class NetHandlerSCKT extends NetEventHandlerAbstract {
private static final Integer BUFF_SIZE = 128; private static final Integer BUFF_SIZE = 2048;
private static final ExecutorService tex = Executors.newSingleThreadExecutor(); private static final ExecutorService tex = Executors.newSingleThreadExecutor();
private static DatagramSocket socket = null;
private List<Host> hosts;
private DatagramSocket socket = null;
public NetHandlerSCKT(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) { public NetHandlerSCKT(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
super(deliverLayer,broadcastLayer); super(deliverLayer,broadcastLayer);
@ -60,15 +55,15 @@ public class NetHandlerSCKT extends NetEventHandlerAbstract {
return; return;
} }
byte[] buff = new byte[BUFF_SIZE]; byte[] b = new byte[BUFF_SIZE];
DatagramPacket datagram = new DatagramPacket(buff, buff.length); DatagramPacket datagram = new DatagramPacket(b, b.length);
try { try {
socket.receive(datagram); socket.receive(datagram);
Optional<Integer> rhid = hosts.stream().filter(h -> h.getAddr().equals(datagram.getSocketAddress())).map(Host::getId).findFirst(); Optional<Integer> rhid = NetManager.hosts.stream().filter(h -> h.getAddr().equals(datagram.getSocketAddress())).map(Host::getId).findFirst();
deliverNextAsync(NetEvent.Message( deliverNextAsync(NetEvent.Message(
new Host((InetSocketAddress) datagram.getSocketAddress(), rhid.orElse(-1)), new Host((InetSocketAddress) datagram.getSocketAddress(), rhid.orElse(-1)),
Message.FromBuffer(ByteBuffer.wrap(datagram.getData())) NetMessageInterface.FromBuffer(ByteBuffer.wrap(datagram.getData()))
)); ));
} catch (IOException e) { } catch (IOException e) {
NetManager.error(NetEventType.DLVR,e); NetManager.error(NetEventType.DLVR,e);
@ -78,17 +73,13 @@ public class NetHandlerSCKT extends NetEventHandlerAbstract {
@Override @Override
public void start(Host h, Parser p) { public void start(Host h, Parser p) {
super.start(h,p);
hosts = p.hosts();
try { try {
this.socket = new DatagramSocket(h.getPort()); socket = new DatagramSocket(h.getPort());
} catch (SocketException e) { } catch (SocketException e) {
throw new Error(e); throw new Error(e);
} }
tex.execute(()->{ tex.execute(()->{
while (true) { while (!socket.isClosed()) {
deliver(NetEvent.EMPTY()); deliver(NetEvent.EMPTY());
} }
}); });

View File

@ -1,10 +1,8 @@
package cs451.net.handler; package cs451.net.handler;
import cs451.net.NetManager; import cs451.net.NetManager;
import cs451.net.event.Message;
import cs451.net.event.*; import cs451.net.event.*;
import cs451.parser.Host; import cs451.net.event.message.NetMessageAbstract;
import cs451.parser.Parser;
import cs451.tools.Pair; import cs451.tools.Pair;
import java.util.*; import java.util.*;
@ -23,10 +21,9 @@ import java.util.concurrent.*;
*/ */
public class NetHandlerSL extends NetEventHandlerAbstract { public class NetHandlerSL extends NetEventHandlerAbstract {
private List<Host> hosts; private static final Set<Integer> hasTimeout = ConcurrentHashMap.newKeySet();
Set<Integer> hasTimeout = ConcurrentHashMap.newKeySet();
PriorityBlockingQueue<Pair<Integer,Message>> sending = new PriorityBlockingQueue<>(); private static final PriorityBlockingQueue<Pair<Integer, NetMessageAbstract>> sending = new PriorityBlockingQueue<>();
public NetHandlerSL(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) { public NetHandlerSL(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
super(deliverLayer,broadcastLayer); super(deliverLayer,broadcastLayer);
@ -35,14 +32,14 @@ public class NetHandlerSL extends NetEventHandlerAbstract {
@Override @Override
public synchronized void beat(){ public synchronized void beat(){
sending.removeIf(ppm -> hasTimeout.contains(ppm.first())); sending.removeIf(ppm -> hasTimeout.contains(ppm.first()));
sending.stream().limit(NetManager.WINDOW_WIDTH).forEach(ppm -> hosts.stream().filter(hl->hl.getId()==(ppm.first())).findFirst() sending.stream().limit(NetManager.WINDOW_WIDTH).forEach(ppm -> NetManager.hosts.stream().filter(hl->hl.getId()==(ppm.first())).findFirst()
.ifPresent(host -> broadcastNextSync(NetEvent.Message(host, ppm.second())))); .ifPresent(host -> broadcastNextSync(NetEvent.Message(host, ppm.second()))));
} }
@Override @Override
public void broadcast(NetEvent ne) { public void broadcast(NetEvent ne) {
if (!hasTimeout.contains(ne.peer.getId())) if (!hasTimeout.contains(ne.peer.getId()))
sending.add(new Pair<>(ne.peer.getId(),Message.MSG(ne.message))); sending.add(new Pair<>(ne.peer.getId(), ne.message));
broadcastNextSync(ne); broadcastNextSync(ne);
} }
@ -51,7 +48,7 @@ public class NetHandlerSL extends NetEventHandlerAbstract {
switch (ne.message.tpe){ switch (ne.message.tpe){
case ACK: case ACK:
sending.removeIf(ppm -> (ne.peer.getId()==ppm.first() && sending.removeIf(ppm -> (ne.peer.getId()==ppm.first() &&
ppm.second().equals(ne.message))); ne.message.equals(ppm.second())));
break; break;
case DATA: case DATA:
broadcastNextSync(NetEvent.MessageACK(ne.peer,ne.message)); broadcastNextSync(NetEvent.MessageACK(ne.peer,ne.message));
@ -72,10 +69,4 @@ public class NetHandlerSL extends NetEventHandlerAbstract {
public boolean isDone() { public boolean isDone() {
return sending.isEmpty(); return sending.isEmpty();
} }
@Override
public void start(Host h, Parser p) {
this.hosts = p.hosts();
}
} }

View File

@ -2,10 +2,12 @@ package cs451.net.handler;
import cs451.net.NetManager; import cs451.net.NetManager;
import cs451.net.event.NetEvent; import cs451.net.event.NetEvent;
import cs451.net.event.NetEventType;
import cs451.net.event.message.Message;
import cs451.net.event.message.NetMessageInterface;
import cs451.parser.Host; import cs451.parser.Host;
import cs451.parser.Parser; import cs451.parser.Parser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -20,11 +22,12 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class NetHandlerTOPL extends NetEventHandlerAbstract { public class NetHandlerTOPL extends NetEventHandlerAbstract {
private final AtomicInteger toSend = new AtomicInteger(0); private static final AtomicInteger sn = new AtomicInteger(1);
private final AtomicInteger delivered = new AtomicInteger(0);
private final AtomicInteger waiting = new AtomicInteger(0); private static final AtomicInteger toSend = new AtomicInteger(0);
private static final AtomicInteger delivered = new AtomicInteger(0);
private static final AtomicInteger waiting = new AtomicInteger(0);
private final AtomicBoolean status = new AtomicBoolean(false);
private Host me; private Host me;
@ -34,25 +37,29 @@ public class NetHandlerTOPL extends NetEventHandlerAbstract {
@Override @Override
public synchronized void broadcastIf() { public synchronized void broadcastIf() {
if( !status.get() ) return;
while (toSend.get() > 0 && waiting.get() < NetManager.WINDOW_WIDTH) { while (toSend.get() > 0 && waiting.get() < NetManager.WINDOW_WIDTH) {
toSend.decrementAndGet(); toSend.decrementAndGet();
waiting.incrementAndGet(); waiting.incrementAndGet();
broadcastNextSync(NetEvent.EMPTY());
Message newMess = Message.getBuilder().setSrc(me.getId()).setId(sn.getAndIncrement()).setTpe(NetMessageInterface.TYPE.DATA).build();
NetManager.complete(NetEventType.BCST, NetEvent.Message(me, newMess));
broadcastNextSync(NetEvent.Message(newMess));
} }
} }
@Override @Override
public void broadcast(NetEvent ne) { public void broadcast(NetEvent ne) {
status.set(true); requestCondBD(true);
} }
@Override @Override
public synchronized void deliver(NetEvent ne) { public synchronized void deliver(NetEvent ne) {
NetManager.complete(NetEventType.DLVR,ne);
if (ne.message.src == me.getId()) { if (ne.message.src == me.getId()) {
delivered.incrementAndGet(); delivered.incrementAndGet();
waiting.decrementAndGet(); waiting.decrementAndGet();
requestCondBD(true);
} }
} }
@ -63,7 +70,6 @@ public class NetHandlerTOPL extends NetEventHandlerAbstract {
@Override @Override
public void start(Host h, Parser p) { public void start(Host h, Parser p) {
super.start(h,p);
me = h; me = h;
toSend.set(p.messageCount()); toSend.set(p.messageCount());
} }

View File

@ -1,6 +1,7 @@
package cs451.net.handler; package cs451.net.handler;
import cs451.net.event.NetEvent; import cs451.net.event.NetEvent;
import cs451.net.event.message.NetMessageAbstract;
import cs451.parser.Host; import cs451.parser.Host;
import cs451.parser.Parser; import cs451.parser.Parser;
import cs451.tools.Pair; import cs451.tools.Pair;
@ -22,14 +23,10 @@ import java.util.concurrent.*;
*/ */
public class NetHandlerURB extends NetEventHandlerAbstract { public class NetHandlerURB extends NetEventHandlerAbstract {
private static final Set<Integer> correct = ConcurrentHashMap.newKeySet();
private final Set<Integer> correct = ConcurrentHashMap.newKeySet(); private static final Map<Pair<Integer,Integer>, Set<Integer>> ack = new ConcurrentHashMap<>();
private final Map<Pair<Integer,Integer>, Set<Integer>> ack = new ConcurrentHashMap<>(); private static final Set<Pair<Integer, Integer>> delivered = ConcurrentHashMap.newKeySet();
private final Set<Pair<Integer,Integer>> delivered = ConcurrentHashMap.newKeySet(); private static final Map<Pair<Integer, Integer>, NetMessageAbstract> pending = new ConcurrentHashMap<>();
private final Set<Pair<Integer,Integer>> pending = ConcurrentHashMap.newKeySet();
private Integer myId;
public NetHandlerURB(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) { public NetHandlerURB(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
super(deliverLayer,broadcastLayer); super(deliverLayer,broadcastLayer);
@ -37,15 +34,16 @@ public class NetHandlerURB extends NetEventHandlerAbstract {
@Override @Override
public synchronized void deliverIf(){ public synchronized void deliverIf(){
pending.stream().filter(smp -> ack.getOrDefault(smp,ConcurrentHashMap.newKeySet()).containsAll(correct)). pending.keySet().stream().filter(smp ->
ack.getOrDefault(smp,ConcurrentHashMap.newKeySet()).containsAll(correct)).
filter(delivered::add). filter(delivered::add).
forEach(smp -> deliverNextAsync(NetEvent.Message(smp.first(),smp.second()))); forEach(smp -> deliverNextAsync(NetEvent.Message(pending.get(smp))));
} }
@Override @Override
public void broadcast(NetEvent ne) { public void broadcast(NetEvent ne) {
pending.add(new Pair<>(myId, ne.message.id)); pending.put(new Pair<>(ne.message.src, ne.message.id),NetMessageAbstract.MSG(ne.message));
ne.message.src = myId; requestCondBD(true);
broadcastNextSync(ne); broadcastNextSync(ne);
} }
@ -59,25 +57,25 @@ public class NetHandlerURB extends NetEventHandlerAbstract {
v.add(ne.peer.getId()); v.add(ne.peer.getId());
return v; return v;
}); });
if(pending.add(smp)){ if(pending.putIfAbsent(smp, NetMessageAbstract.MSG(ne.message))==null){
broadcastNextAsync(ne); broadcastNextAsync(ne);
} }
requestCondBD(true);
} }
@Override @Override
public void crash(NetEvent ne) { public void crash(NetEvent ne) {
correct.remove(ne.peer.getId()); correct.remove(ne.peer.getId());
requestCondBD(true);
} }
@Override @Override
public boolean isDone() { public boolean isDone() {
return delivered.containsAll(pending); return delivered.containsAll(pending.keySet());
} }
@Override @Override
public void start(Host h, Parser p) { public void start(Host h, Parser p) {
super.start(h,p);
p.hosts().forEach(ch-> correct.add(ch.getId())); p.hosts().forEach(ch-> correct.add(ch.getId()));
myId = h.getId();
} }
} }

View File

@ -4,12 +4,23 @@ import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ConfigParser { public class ConfigParser {
private String path; private String path;
private int messages; private int messages;
private final ConcurrentHashMap<Integer,List<Integer>> causality = new ConcurrentHashMap<>();
public ConfigParser(List<Host> hosts) {
for (Host host : hosts) {
causality.put(host.getId(),new ArrayList<>());
}
}
public boolean populate(String value) { public boolean populate(String value) {
@ -17,14 +28,22 @@ public class ConfigParser {
path = file.getPath(); path = file.getPath();
try (BufferedReader br = new BufferedReader(new FileReader(path))) { try (BufferedReader br = new BufferedReader(new FileReader(path))) {
int lineNum = 1; int lineNum = 0;
for (String line; (line = br.readLine()) != null; lineNum++) { for (String line; (line = br.readLine()) != null; lineNum++) {
switch(lineNum){ System.out.println(line);
case 1: String[] lparts = line.trim().split(" ");
messages = Integer.parseInt(line); if (lineNum == 0){
break; messages = Integer.parseInt(lparts[0]);
default: }else if (lineNum>=1 && lineNum<=causality.keySet().size()){
break; Integer hid = Integer.parseInt(lparts[0]);
List<Integer> vl = causality.get(hid);
if(vl != null){
for(int i = 1; i < lparts.length; ++i){
vl.add(Integer.parseInt(lparts[i]));
}
}
}else{
break;
} }
} }
} catch (IOException e) { } catch (IOException e) {
@ -41,4 +60,8 @@ public class ConfigParser {
public int getMessages() { public int getMessages() {
return messages; return messages;
} }
public Map<Integer, List<Integer>> getCausality() {
return causality;
}
} }

View File

@ -1,6 +1,7 @@
package cs451.parser; package cs451.parser;
import java.util.List; import java.util.List;
import java.util.Map;
public class Parser { public class Parser {
@ -57,8 +58,9 @@ public class Parser {
} }
if (argsNum == Constants.ARG_LIMIT_CONFIG) { if (argsNum == Constants.ARG_LIMIT_CONFIG) {
configParser = new ConfigParser(); configParser = new ConfigParser(hosts());
if (!configParser.populate(args[Constants.CONFIG_VALUE])) { if (!configParser.populate(args[Constants.CONFIG_VALUE])) {
help();
} }
} }
} }
@ -108,4 +110,8 @@ public class Parser {
return configParser.getMessages(); return configParser.getMessages();
} }
public Map<Integer, List<Integer>> causality(){
return configParser.getCausality();
}
} }

View File

@ -30,8 +30,8 @@ public abstract class ParamDetector {
int messages = p.messageCount(); int messages = p.messageCount();
int processCount = p.hosts().size(); int processCount = p.hosts().size();
int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(4-(processCount/4.0))))); int windowWidthMult = Math.toIntExact(Math.round(Math.pow(2.0,(2-(processCount/2.0)))));
int windowWidth = bound(coresPerProcess*windowWidthMult,4,messages); int windowWidth = bound(coresPerProcess*windowWidthMult,2,messages);
System.out.println("Process expected to broadcast "+messages+" messages."); System.out.println("Process expected to broadcast "+messages+" messages.");
System.out.println("Starting Process with WindowWidth of "+windowWidth+" (~ x"+windowWidthMult+")."); System.out.println("Starting Process with WindowWidth of "+windowWidth+" (~ x"+windowWidthMult+").");
@ -42,5 +42,8 @@ public abstract class ParamDetector {
//We might want to PingPong To set Custom Timing Limitations.... //We might want to PingPong To set Custom Timing Limitations....
NetManager.FD_MAX_TRIES = 10; NetManager.FD_MAX_TRIES = 10;
NetManager.FD_WAIT = 1000; NetManager.FD_WAIT = 1000;
NetManager.BUNDLE_SIZE = 2;
NetManager.WINDOW_WIDTH *= NetManager.BUNDLE_SIZE;
} }
} }

8
bnr.sh
View File

@ -1,12 +1,12 @@
#!/bin/bash #!/bin/bash
if [ $# -eq 2 ]; then if [ $# -eq 3 ]; then
export _JAVA_OPTIONS="-Xmx16G" export _JAVA_OPTIONS="-Xmx16G"
./257844/build.sh ./257844/build.sh
sudo echo 1 sudo echo 1
echo "Running with $1 processes and $2 messages" echo "Running $1 with $2 processes and $3 messages"
yes "" | ./validate.py -r 257844/run.sh -b fifo -l 257844/bin/logs -p $1 -m $2 yes "" | ./validate.py -r 257844/run.sh -b $1 -l 257844/bin/logs -p $2 -m $3
else else
echo "Missing Arguments ..." echo "Missing Arguments ..."
echo "Usage: $0 process_count message_count" echo "Usage: $0 fifo|lcausal process_count message_count"
fi fi

View File

@ -201,14 +201,144 @@ class FifoBroadcastValidation(Validation):
return True return True
class LCausalBroadcastValidation(Validation): class LCausalBroadcastValidation(Validation):
def __init__(self, processes, outputDir, causalRelationships):
super().__init__(processes, outputDir)
def generateConfig(self): def generateConfig(self):
raise NotImplementedError() hosts = tempfile.NamedTemporaryFile(mode='w')
config = tempfile.NamedTemporaryFile(mode='w')
def checkProcess(self, pid): # Hosts file
raise NotImplementedError() for i in range(1, self.processes + 1):
hosts.write("{} localhost {}\n".format(i, PROCESSES_BASE_IP+i))
hosts.flush()
self.lcausalDeps = defaultdict(list)
# # of messages
config.write("{}\n".format(self.messages))
# Config file; Create some random locality. Each process may have
# up to N processes / 2 locally dependent processes.
for thisproc in range(1, self.processes + 1):
config.write(str(thisproc) + " ")
# Add random number of dependencies
deps = [i for i in range(1, self.processes + 1) if i != thisproc]
for j in range(0, random.randint(0, int(self.processes/2))):
depidx = random.randint(0, len(deps) - 1)
otherproc = deps[depidx]
config.write(str(otherproc) + " ")
self.lcausalDeps[thisproc].append(otherproc)
del deps[depidx]
config.write("\n")
config.flush()
return (hosts, config)
def filePathForPID(self, pid):
return os.path.join(self.outputDirPath, 'proc{:02d}.output'.format(pid))
def verifyCausality(self, fromPid, broadcastSeq, toPid, fromHistory):
ok = True
filename = self.filePathForPID(toPid)
toPidFile = open(filename)
toHistory = defaultdict(lambda: 0)
for lineNumber, line in enumerate(toPidFile):
tokens = line.split()
if tokens[0] == 'd':
# Record most-recently received value from process. This is
# sufficient due to LCausal also adhering to FIFO.
if tokens[1] in fromHistory:
toHistory[tokens[1]] = int(tokens[2])
if tokens[1] == fromPid and tokens[2] == broadcastSeq:
# Found the dependent broadcast, ensure that history is consistent
# up to this point
for pid, seq in fromHistory.items():
if toHistory[pid] < seq:
print("File {}, Line {}: \n\tDelivered dependent message {}:{} with unresolved dependency\n"
"\t Message was dependent on {}:{} but only delivered up to {}:{}".format(filename, lineNumber + 1,fromPid, broadcastSeq, pid,seq, pid, toHistory[pid]))
ok = False
return ok
def checkLCausal(self, pid):
if pid not in self.lcausalDeps:
return True
depHistory = defaultdict(lambda: 0)
pidFile = open(self.filePathForPID(pid))
for line in pidFile.readlines():
ltokens = line.split()
# If token is broadcast, for each other process in the system,
# verify that that token was delivered with adherance to the
# causality as recorded in the @var depHistory up to this point
if ltokens[0] == 'b':
for i in range(1, self.processes+1):
if i == pid:
continue
if not self.verifyCausality(
fromPid=str(pid),
broadcastSeq=ltokens[1],
toPid=i,
fromHistory=depHistory
):
return False
# If token is deliver and is in the set of causally dependent processes,
# record it as being part of the dependent history up to this point.
if ltokens[0] == 'd' and int(ltokens[1]) in self.lcausalDeps[pid]:
depHistory[ltokens[1]] = int(ltokens[2])
return True
def checkFIFO(self, pid):
filePath = os.path.join(self.outputDirPath, 'proc{:02d}.output'.format(pid))
i = 1
nextMessage = defaultdict(lambda : 1)
filename = os.path.basename(filePath)
with open(filePath) as f:
for lineNumber, line in enumerate(f):
tokens = line.split()
# Check broadcast
if tokens[0] == 'b':
msg = int(tokens[1])
if msg != i:
print("File {}, Line {}: Messages broadcast out of order. Expected message {} but broadcast message {}".format(filename, lineNumber, i, msg))
return False
i += 1
# Check delivery
if tokens[0] == 'd':
sender = int(tokens[1])
msg = int(tokens[2])
if msg != nextMessage[sender]:
print("File {}, Line {}: Message delivered out of order. Expected message {}, but delivered message {}".format(filename, lineNumber, nextMessage[sender], msg))
return False
else:
nextMessage[sender] = msg + 1
return True
def checkAll(self, continueOnError=True):
print("LCausal verification:")
for pid in range(1, self.processes+1):
ret = self.checkFIFO(pid)
ret &= self.checkLCausal(pid)
if not ret and not continueOnError:
return False
return True
class StressTest: class StressTest:
def __init__(self, procs, concurrency, attempts, attemptsRatio): def __init__(self, procs, concurrency, attempts, attemptsRatio):
@ -359,7 +489,7 @@ def main(processes, messages, runscript, broadcastType, logsDir, testConfig):
if broadcastType == "fifo": if broadcastType == "fifo":
validation = FifoBroadcastValidation(processes, messages, logsDir) validation = FifoBroadcastValidation(processes, messages, logsDir)
else: else:
validation = LCausalBroadcastValidation(processes, messages, logsDir, None) validation = LCausalBroadcastValidation(processes, messages, logsDir)
hostsFile, configFile = validation.generateConfig() hostsFile, configFile = validation.generateConfig()