diff --git a/257844/.gitignore b/257844/.gitignore
new file mode 100644
index 0000000..c35584b
--- /dev/null
+++ b/257844/.gitignore
@@ -0,0 +1,32 @@
+# Created by https://www.toptal.com/developers/gitignore/api/java
+# Edit at https://www.toptal.com/developers/gitignore?templates=java
+
+target/
+bin/da_proc.jar
+
+### Java ###
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+# End of https://www.toptal.com/developers/gitignore/api/java
diff --git a/257844/257844.iml b/257844/257844.iml
new file mode 100644
index 0000000..f76df2b
--- /dev/null
+++ b/257844/257844.iml
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/257844/bin/README b/257844/bin/README
new file mode 100644
index 0000000..ced5c3b
--- /dev/null
+++ b/257844/bin/README
@@ -0,0 +1 @@
+This is a reserved directory name! Store the binary generated by `build.sh` in this directory
diff --git a/257844/bin/deploy/README b/257844/bin/deploy/README
new file mode 100644
index 0000000..1a2f94d
--- /dev/null
+++ b/257844/bin/deploy/README
@@ -0,0 +1 @@
+This is a reserved directory name, do not delete or use in your application!
diff --git a/257844/build.sh b/257844/build.sh
new file mode 100755
index 0000000..28a73eb
--- /dev/null
+++ b/257844/build.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+set -e
+
+# Change the current working directory to the location of the present file
+cd "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
+
+mvn clean compile assembly:single
+mv target/da_project-1.0-SNAPSHOT-jar-with-dependencies.jar bin/da_proc.jar
diff --git a/257844/cleanup.sh b/257844/cleanup.sh
new file mode 100755
index 0000000..51b3a48
--- /dev/null
+++ b/257844/cleanup.sh
@@ -0,0 +1,7 @@
+#!/bin/bash
+
+# Change the current working directory to the location of the present file
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
+
+rm -f "$DIR"/bin/da_proc.jar
+rm -rf "$DIR"/target
diff --git a/257844/pom.xml b/257844/pom.xml
new file mode 100644
index 0000000..1e24080
--- /dev/null
+++ b/257844/pom.xml
@@ -0,0 +1,62 @@
+
+
+
+ 4.0.0
+
+ cs451
+ da_project
+ 1.0-SNAPSHOT
+
+ DA_Project
+ https://www.helcel.net
+
+
+ UTF-8
+ 1.11
+ 1.11
+
+
+
+
+
+
+
+ maven-clean-plugin
+ 3.1.0
+
+ 11
+
+
+
+
+ maven-compiler-plugin
+ 3.8.1
+
+ 11
+
+
+
+ maven-jar-plugin
+ 3.2.0
+
+ 11
+
+
+
+ maven-assembly-plugin
+
+
+
+ cs451.Main
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+
+
diff --git a/257844/run.sh b/257844/run.sh
new file mode 100755
index 0000000..4f300c7
--- /dev/null
+++ b/257844/run.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+# Change the current working directory to the location of the present file
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
+
+ret=0
+exec 3>&1; $(java -jar "$DIR"/bin/da_proc.jar "$@" >&3); ret=$?; exec 3>&-
+
+exit $ret
diff --git a/257844/src/main/java/cs451/Main.java b/257844/src/main/java/cs451/Main.java
new file mode 100644
index 0000000..b94aa11
--- /dev/null
+++ b/257844/src/main/java/cs451/Main.java
@@ -0,0 +1,139 @@
+package cs451;
+
+import cs451.net.event.Message;
+import cs451.net.NetManager;
+import cs451.net.event.NetEventType;
+import cs451.parser.Coordinator;
+import cs451.parser.Host;
+import cs451.parser.Parser;
+import cs451.tools.Logger;
+import cs451.tools.Pair;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class Main {
+
+ private static String formatOutput(Integer id, Integer host, Message.TYPE tpe){
+ switch (tpe){
+ case DATA:
+ return "d "+host+" "+id+"\n";
+ case ACK:
+ return "d "+host+" "+id+"\n";
+ case BCST:
+ return "b "+id+"\n";
+ default:
+ return "";
+ }
+ }
+
+ private static void writeOutput(String filepath) throws IOException {
+ FileWriter writer = new FileWriter(filepath);
+
+ for(Pair, Message.TYPE> mhpt : mh_tpe){
+ try {
+ writer.write(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ writer.close();
+ }
+
+ private static void handleSignal() {
+ //immediately stop network packet processing
+ System.out.println("Immediately stopping network packet processing.");
+
+ NetManager.stop();
+
+ //write/flush output file if necessary
+ System.out.println("Writing output.");
+
+ try {
+ writeOutput(parser.output());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private static void initSignalHandlers() {
+ Runtime.getRuntime().addShutdownHook(new Thread(Main::handleSignal));
+ }
+
+ private static Host me = null;
+ private static final Queue,Message.TYPE>> mh_tpe = new ConcurrentLinkedQueue<>();
+
+ private static Parser parser = null;
+
+
+ public static void main(String[] args) throws InterruptedException {
+ parser = new Parser(args);
+ parser.parse();
+
+ initSignalHandlers();
+
+ // example
+ long pid = ProcessHandle.current().pid();
+ System.out.println("My PID is " + pid + ".");
+ System.out.println("Use 'kill -SIGINT " + pid + " ' or 'kill -SIGTERM " + pid + " ' to stop processing packets.");
+
+ System.out.println("My id is " + parser.myId() + ".");
+ System.out.println("List of hosts is:");
+ for (Host host : parser.hosts()) {
+ if (host.getId() == parser.myId()) {
+ me = host;
+ }
+ System.out.println(host.getId() + ", " + host.getIp() + ", " + host.getPort());
+ }
+
+ System.out.println("Barrier: " + parser.barrierIp() + ":" + parser.barrierPort());
+ System.out.println("Signal: " + parser.signalIp() + ":" + parser.signalPort());
+ System.out.println("Output: " + parser.output());
+ // if config is defined; always check before parser.config()
+ if (parser.hasConfig()) {
+ System.out.println("Config: " + parser.config());
+ }
+ Coordinator coordinator = new Coordinator(parser.myId(), parser.barrierIp(), parser.barrierPort(), parser.signalIp(), parser.signalPort());
+
+ NetManager.start(me,parser,(t,ne) -> {
+ Logger.error(t+"(" + ne.message.src + "):" + ne.message.toString());
+ if(t == NetEventType.DLVR){
+ mh_tpe.add(new Pair<>(new Pair<>(ne.message.id,ne.message.src),ne.message.tpe));
+ }else if(t== NetEventType.SEND){
+ mh_tpe.add(new Pair<>(new Pair<>(ne.message.id,ne.message.src),ne.message.tpe));
+ }
+ }, (t,ne) -> {
+ Logger.error("ERR"+t+" - "+ne.getMessage());
+ });
+
+ System.out.println("Waiting for all processes for finish initialization");
+ coordinator.waitOnBarrier();
+
+ System.out.println("Broadcasting messages...");
+ NetManager.send();
+
+ while(!NetManager.isDone()) {
+ Thread.sleep(500);
+ }
+
+ System.out.println("Stopping NetManager");
+ NetManager.stop();
+
+ System.out.println("Signaling end of broadcasting messages");
+ coordinator.finishedBroadcasting();
+
+ try {
+ writeOutput(parser.output());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ while (true) {
+ // Sleep for 1 hour
+ Thread.sleep(60 * 60 * 1000);
+ }
+ }
+}
diff --git a/257844/src/main/java/cs451/net/NetManager.java b/257844/src/main/java/cs451/net/NetManager.java
new file mode 100644
index 0000000..4ee7b28
--- /dev/null
+++ b/257844/src/main/java/cs451/net/NetManager.java
@@ -0,0 +1,138 @@
+package cs451.net;
+
+import cs451.net.event.*;
+
+import cs451.net.handler.*;
+import cs451.parser.Host;
+import cs451.parser.Parser;
+import cs451.tools.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.function.BiConsumer;
+
+
+
+public abstract class NetManager {
+ private static final Integer THREAD_COUNT = 8;
+
+ private static boolean isStopped = false;
+
+ private static final Map, NetEventHandler> nm_listeners = new HashMap<>();
+
+ private static final ThreadPoolExecutor ex = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT,60,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
+
+ private static BiConsumer onCompleteHandler;
+ private static BiConsumer onErrorHandler;
+
+
+ private static void registerNetHandler(NetEventHandler handler){
+ nm_listeners.put(handler.getClass(),handler);
+ }
+
+ public static void start(Host h, Parser p, BiConsumer och, BiConsumer oeh) {
+ onCompleteHandler = och;
+ onErrorHandler = oeh;
+
+ registerNetHandler(new NetHandlerSCKT( NetHandlerSL.class, NetHandlerDFLT.class));
+ registerNetHandler(new NetHandlerSL( NetHandlerPL.class, NetHandlerSCKT.class));
+ registerNetHandler(new NetHandlerPL( NetHandlerBEB.class, NetHandlerSL.class));
+ registerNetHandler(new NetHandlerBEB( NetHandlerURB.class, NetHandlerPL.class));
+ registerNetHandler(new NetHandlerURB( NetHandlerFIFO.class, NetHandlerBEB.class));
+ registerNetHandler(new NetHandlerFIFO( NetHandlerTOPL.class, NetHandlerURB.class));
+ registerNetHandler(new NetHandlerTOPL( NetHandlerDFLT.class, NetHandlerFIFO.class));
+ registerNetHandler(new NetHandlerDFLT( NetHandlerDFLT.class, NetHandlerDFLT.class));
+
+ nm_listeners.values().forEach(neh -> neh.start(h,p));
+ ex.prestartAllCoreThreads();
+ }
+
+ public static void stop() {
+ isStopped = true;
+ nm_listeners.values().forEach(NetEventHandler::stop);
+ ex.shutdown();
+ System.out.println("NetManager handled "+ex.getCompletedTaskCount()+" tasks during this run.");
+ }
+
+ public static boolean isDone() {
+ return isStopped || nm_listeners.values().stream().map(NetEventHandler::isDone).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2);
+ }
+
+
+//=================================================================================================================
+//=================================================================================================================
+
+ public static void handle(Class extends NetEventHandler> layer, NetEventType event, NetEvent ne){
+ try {
+ ex.getQueue().add(new NetEventRunner(ne, layer, event));
+ }catch(IllegalStateException ise){
+ error(event,ise);
+ }
+ }
+
+ public static void complete(NetEventType t,NetEvent ne){
+ onCompleteHandler.accept(t,ne);
+ }
+
+ public static void error(NetEventType t,Exception e){
+ onErrorHandler.accept(t,e);
+ }
+
+//=================================================================================================================
+//=================================================================================================================
+
+ public static void deliver(Class extends NetEventHandler> layer, NetEvent ne) {
+ handle(layer,NetEventType.DLVR,ne);
+ }
+
+ public static void send(Class extends NetEventHandler> layer, NetEvent ne) {
+ handle(layer,NetEventType.SEND,ne);
+ }
+
+ public static void crash(Class extends NetEventHandler> layer, NetEvent ne) {
+ handle(layer,NetEventType.CRSH,ne);
+ }
+
+ public static void send() {
+ new NetEventRunner(NetEvent.EMPTY(),NetHandlerTOPL.class,NetEventType.SEND).run();
+ }
+
+ public static void deliverSync(Class extends NetEventHandler> layer, NetEvent ne) {
+ new NetEventRunner(ne,layer,NetEventType.DLVR).run();
+ }
+ public static void sendSync(Class extends NetEventHandler> layer, NetEvent ne) {
+ new NetEventRunner(ne,layer,NetEventType.SEND).run();
+ }
+
+//=================================================================================================================
+//=================================================================================================================
+
+ private static class NetEventRunner implements Runnable {
+
+ private final NetEvent ne;
+ private final Class extends NetEventHandler> lt;
+ private final NetEventType net;
+
+ private NetEventRunner(NetEvent ne, Class extends NetEventHandler> lt, NetEventType net) {
+ this.ne = ne;
+ this.lt = lt;
+ this.net = net;
+ }
+
+ @Override
+ public void run() {
+ Logger.debug(this.lt.getSimpleName()+"_"+this.net+ ": " + ((ne.peer==null)?"?":ne.peer.getId()) + " - "+ne.message.toString()+"");
+
+ NetEventHandler nl = nm_listeners.getOrDefault(this.lt, null);
+ if (nl != null) {
+ nl.onEvent(this.net,ne);
+ }else{
+ error(this.net,new UnsupportedOperationException("No Handler for "+this.lt));
+ }
+ }
+ }
+
+}
+
+
diff --git a/257844/src/main/java/cs451/net/event/Message.java b/257844/src/main/java/cs451/net/event/Message.java
new file mode 100644
index 0000000..2f66ebb
--- /dev/null
+++ b/257844/src/main/java/cs451/net/event/Message.java
@@ -0,0 +1,110 @@
+package cs451.net.event;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class Message implements Comparable {
+
+ public enum TYPE {
+ NONE("NONE", 'N'),
+ ERR("ERR", 'E'),
+ DATA("DATA", ' '),
+ ACK("ACK", 'A'),
+ BCST("BCST", 'B');
+
+ public final Character c;
+ public final String tag;
+ TYPE(String tag, Character c){
+ this.tag = tag;
+ this.c = c;
+ }
+
+ }
+
+ public int src = -1;
+ public final int id;
+ public TYPE tpe;
+
+ public static Message DMSG(Integer id){
+ return new Message(id,TYPE.DATA);
+ }
+ public static Message DMSG(Integer id, Integer src) {
+ return new Message(id,TYPE.DATA,src);
+ }
+
+ public static Message TMSG(Integer mess_id, TYPE tpe) {
+ return new Message(mess_id, tpe);
+ }
+
+ public static Message EMPTY(){
+ return new Message(-1, TYPE.NONE);
+ }
+
+ private static TYPE CharacterToTpe(Character c){
+ return Arrays.stream(TYPE.values()).filter(type -> type.c==c).findFirst().orElse(TYPE.NONE);
+ }
+ public static Message FromBuffer(ByteBuffer b) {
+ Character tpe = b.getChar();
+ Integer id = b.getInt();
+ Integer src = b.getInt();
+ return new Message(id,CharacterToTpe(tpe),src);
+ }
+
+ public void ToBuffer(ByteBuffer b){
+ b.putChar(tpe.c);
+ b.putInt(id);
+ b.putInt(src);
+ }
+
+ public Message(Message m) {
+ this.id = m.id;
+ this.tpe = m.tpe;
+ this.src = m.src;
+ }
+ private Message(Integer id, TYPE tpe){
+ this.id = id;
+ this.tpe = tpe;
+ }
+ private Message(Integer id, TYPE tpe, Integer src){
+ this.id = id;
+ this.tpe = tpe;
+ this.src = src;
+ }
+
+ @Override
+ public String toString() {
+ switch(tpe) {
+ case DATA:
+ return src+"-"+ id;
+ default:
+ return src+"-"+tpe.c + id;
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final Message m = (Message) obj;
+ return id == m.id && src == m.src;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id,src);
+ }
+
+ @Override
+ public int compareTo(Object obj) {
+ final Message m = (Message) obj;
+ return id-m.id;
+ }
+}
diff --git a/257844/src/main/java/cs451/net/event/NetEvent.java b/257844/src/main/java/cs451/net/event/NetEvent.java
new file mode 100644
index 0000000..5c90aae
--- /dev/null
+++ b/257844/src/main/java/cs451/net/event/NetEvent.java
@@ -0,0 +1,52 @@
+package cs451.net.event;
+
+import cs451.parser.Host;
+
+public class NetEvent {
+
+ public final Host peer;
+ public final Message message;
+
+ private NetEvent(Host peer, Message message) {
+ this.peer = peer;
+ this.message = message;
+ }
+
+
+ public static NetEvent Message(Host peer, Integer mess_id){
+ return new NetEvent(peer, Message.DMSG(mess_id));
+ }
+
+ public static NetEvent Message(Host peer, Integer mess_id, Message.TYPE tpe){
+ return new NetEvent(peer, Message.TMSG(mess_id,tpe));
+ }
+
+ public static NetEvent Message(Host peer, Message m){
+ return new NetEvent(peer,new Message(m));
+ }
+
+ public static NetEvent Message(Message m){
+ return new NetEvent(NO_PEER,new Message(m));
+ }
+
+ public static NetEvent Message(Integer mess_id){
+ return new NetEvent(NO_PEER, Message.DMSG(mess_id));
+ }
+
+ public static NetEvent Message(Integer src, Integer mess_id) {
+ return new NetEvent(NO_PEER, Message.DMSG(mess_id,src));
+ }
+
+ public static NetEvent MessageACK(Host peer, Message message) {
+ NetEvent ne = NetEvent.Message(peer,message);
+ ne.message.tpe = Message.TYPE.ACK;
+ return ne;
+ }
+
+
+ public static NetEvent EMPTY(){
+ return new NetEvent(NO_PEER, Message.EMPTY());
+ }
+
+ public static final Host NO_PEER = null;
+}
\ No newline at end of file
diff --git a/257844/src/main/java/cs451/net/event/NetEventType.java b/257844/src/main/java/cs451/net/event/NetEventType.java
new file mode 100644
index 0000000..3ff2590
--- /dev/null
+++ b/257844/src/main/java/cs451/net/event/NetEventType.java
@@ -0,0 +1,8 @@
+package cs451.net.event;
+
+public enum NetEventType {
+ DLVR,
+ SEND,
+ CRSH,
+ RCVR;
+}
diff --git a/257844/src/main/java/cs451/net/handler/NetEventHandler.java b/257844/src/main/java/cs451/net/handler/NetEventHandler.java
new file mode 100644
index 0000000..d37b5e1
--- /dev/null
+++ b/257844/src/main/java/cs451/net/handler/NetEventHandler.java
@@ -0,0 +1,84 @@
+package cs451.net.handler;
+
+import cs451.net.NetManager;
+import cs451.net.event.NetEvent;
+import cs451.net.event.NetEventType;
+import cs451.parser.Host;
+import cs451.parser.Parser;
+import cs451.tools.Logger;
+
+public abstract class NetEventHandler {
+
+ /*
+ * 1 - 50ms - 50ms
+ * 2 - 400ms - 450ms
+ * 3 - 1350ms - 1800ms
+ * 4 - 3200ms - 5000ms
+ */
+ public static final int MS_WAIT = 50;
+ public static final int MAX_TRIES = 5;
+
+ public static final Integer SEND_WINDOW = 3;
+
+
+
+ private final Class extends NetEventHandler> deliverLayer;
+ private final Class extends NetEventHandler> broadcastLayer;
+
+
+ NetEventHandler(Class extends NetEventHandler> deliverLayer, Class extends NetEventHandler> broadcastLayer) {
+ this.deliverLayer = deliverLayer;
+ this.broadcastLayer = broadcastLayer;
+ }
+
+ public void deliverNextSync(NetEvent ne){
+ NetManager.deliverSync(deliverLayer,ne);
+ }
+ public void sendNextSync(NetEvent ne){
+ NetManager.sendSync(broadcastLayer,ne);
+ }
+
+ public void deliverNext(NetEvent ne){
+ NetManager.deliver(deliverLayer,ne);
+ }
+ public void sendNext(NetEvent ne){
+ NetManager.send(broadcastLayer,ne);
+ }
+
+ public void onEvent(NetEventType et, NetEvent ne){
+ switch (et){
+ case DLVR: deliver(ne);
+ break;
+ case SEND: send(ne);
+ break;
+ case CRSH: crash(ne);
+ break;
+ case RCVR: recover(ne);
+ break;
+ default:
+ Logger.error("Unhandled EventType:"+et);
+ }
+ }
+
+ public void send(NetEvent ne){
+ NetManager.error(NetEventType.SEND,new Exception("Default Handler"));
+ }
+
+ public void deliver(NetEvent ne){
+ NetManager.error(NetEventType.DLVR,new Exception("Default Handler"));
+ }
+
+ public void crash(NetEvent ne){
+ NetManager.error(NetEventType.CRSH,new Exception("Default Handler"));
+ }
+
+ public void recover(NetEvent ne){
+ NetManager.error(NetEventType.RCVR,new Exception("Default Handler"));
+ }
+
+ public void start(Host h, Parser p) {}
+
+ public void stop() {}
+
+ public boolean isDone(){ return true;}
+}
diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java b/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java
new file mode 100644
index 0000000..1016704
--- /dev/null
+++ b/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java
@@ -0,0 +1,40 @@
+package cs451.net.handler;
+
+import cs451.net.event.NetEvent;
+import cs451.parser.Host;
+import cs451.parser.Parser;
+
+import java.util.List;
+
+public class NetHandlerBEB extends NetEventHandler {
+
+ private List hosts;
+ private Host h;
+
+ public NetHandlerBEB(Class extends NetEventHandler> deliverLayer, Class extends NetEventHandler> broadcastLayer) {
+ super(deliverLayer,broadcastLayer);
+ }
+
+
+ @Override
+ public void send(NetEvent ne) {
+ for(Host h : hosts){
+ if(this.h.getId() != h.getId()) {
+ sendNext(NetEvent.Message(h, ne.message));
+ }else{
+ deliverNext(NetEvent.Message(h,ne.message));
+ }
+ }
+ }
+
+ @Override
+ public void deliver(NetEvent ne) {
+ deliverNext(ne);
+ }
+
+ public void start(Host h, Parser p) {
+ this.h = h;
+ hosts = p.hosts();
+ }
+
+}
diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java b/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java
new file mode 100644
index 0000000..385b516
--- /dev/null
+++ b/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java
@@ -0,0 +1,8 @@
+package cs451.net.handler;
+
+public class NetHandlerDFLT extends NetEventHandler {
+
+ public NetHandlerDFLT(Class extends NetEventHandler> deliverLayer, Class extends NetEventHandler> broadcastLayer) {
+ super(deliverLayer,broadcastLayer);
+ }
+}
diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java b/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java
new file mode 100644
index 0000000..4364a1c
--- /dev/null
+++ b/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java
@@ -0,0 +1,59 @@
+package cs451.net.handler;
+
+import cs451.net.NetManager;
+import cs451.net.event.Message;
+import cs451.net.event.NetEvent;
+import cs451.net.event.NetEventType;
+import cs451.parser.Host;
+import cs451.parser.Parser;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NetHandlerFIFO extends NetEventHandler {
+
+ private final AtomicInteger sn = new AtomicInteger(1);
+ private final Map rsn = new ConcurrentHashMap<>();
+ private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>();
+
+ private Host me;
+
+ public NetHandlerFIFO(Class extends NetEventHandler> deliverLayer, Class extends NetEventHandler> broadcastLayer) {
+ super(deliverLayer,broadcastLayer);
+ }
+
+ @Override
+ public void send(NetEvent ne) {
+ Integer snv = sn.getAndIncrement();
+ NetManager.complete(NetEventType.SEND, NetEvent.Message(me, snv, Message.TYPE.BCST));
+ sendNext(NetEvent.Message(me.getId(),snv));
+ }
+
+ private void deliverIf(){
+ synchronized (pending) {
+ pending.removeIf(hmp -> {
+ Integer crsn = rsn.getOrDefault(hmp.src, 1);
+ if (hmp.id == crsn) {
+ deliverNextSync(NetEvent.Message(hmp));
+ NetManager.complete(NetEventType.DLVR,NetEvent.Message(hmp));
+ rsn.put(hmp.src, crsn + 1);
+ return true;
+ }
+ return false;
+ });
+ }
+ }
+
+ @Override
+ public void deliver(NetEvent ne) {
+ pending.add(ne.message);
+ deliverIf();
+ }
+
+ @Override
+ public void start(Host h, Parser p) {
+ me = h;
+ }
+}
diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerPL.java b/257844/src/main/java/cs451/net/handler/NetHandlerPL.java
new file mode 100644
index 0000000..88344a7
--- /dev/null
+++ b/257844/src/main/java/cs451/net/handler/NetHandlerPL.java
@@ -0,0 +1,30 @@
+package cs451.net.handler;
+
+import cs451.net.event.*;
+import cs451.tools.Pair;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class NetHandlerPL extends NetEventHandler {
+
+ private final Set> delivered = ConcurrentHashMap.newKeySet();
+
+ public NetHandlerPL(Class extends NetEventHandler> deliverLayer, Class extends NetEventHandler> broadcastLayer) {
+ super(deliverLayer,broadcastLayer);
+ }
+
+ @Override
+ public void send(NetEvent event) {
+ sendNext(event);
+ }
+
+ @Override
+ public void deliver(NetEvent event) {
+ if(delivered.add(new Pair<>(event.peer.getId(),event.message))){
+ deliverNext(event);
+ }
+ }
+
+}
diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java b/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java
new file mode 100644
index 0000000..698f421
--- /dev/null
+++ b/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java
@@ -0,0 +1,94 @@
+package cs451.net.handler;
+
+import cs451.net.event.Message;
+import cs451.net.NetManager;
+import cs451.net.event.*;
+import cs451.parser.Host;
+import cs451.parser.Parser;
+
+import java.io.IOException;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class NetHandlerSCKT extends NetEventHandler {
+
+ private static final Integer BUFF_SIZE = 128;
+
+ private List hosts;
+
+ private DatagramSocket socket = null;
+ private final AtomicBoolean receiving = new AtomicBoolean(true);
+
+
+ public NetHandlerSCKT(Class extends NetEventHandler> deliverLayer, Class extends NetEventHandler> broadcastLayer) {
+ super(deliverLayer,broadcastLayer);
+ }
+
+ @Override
+ public void send(NetEvent event) {
+ if(socket.isClosed()){
+ NetManager.error(NetEventType.SEND,new SocketException("Socket Closed"));
+ return;
+ }
+ ByteBuffer b = ByteBuffer.allocate(BUFF_SIZE);
+ event.message.ToBuffer(b);
+ DatagramPacket datagram = new DatagramPacket(b.array(), b.position(), event.peer.getAddr());
+ try {
+ socket.send(datagram);
+ } catch (IOException e) {
+ NetManager.error(NetEventType.SEND,e);
+ }
+ }
+
+ @Override
+ public void deliver(NetEvent event) {
+ if(socket.isClosed()){
+ NetManager.error(NetEventType.DLVR,new SocketException("Socket Closed"));
+ return;
+ }
+
+ byte[] buff = new byte[BUFF_SIZE];
+ DatagramPacket datagram = new DatagramPacket(buff, buff.length);
+ try {
+ socket.receive(datagram);
+ Optional rhid = hosts.stream().filter(h -> h.getAddr().equals(datagram.getSocketAddress())).map(Host::getId).findFirst();
+
+ deliverNext(NetEvent.Message(
+ new Host((InetSocketAddress) datagram.getSocketAddress(), rhid.orElse(-1)),
+ Message.FromBuffer(ByteBuffer.wrap(datagram.getData()))
+ ));
+ } catch (IOException e) {
+ NetManager.error(NetEventType.DLVR,e);
+ }
+ }
+
+
+ @Override
+ public void start(Host h, Parser p) {
+ Objects.requireNonNull(h);
+ Objects.requireNonNull(p);
+ hosts = p.hosts();
+
+ try {
+ this.socket = new DatagramSocket(h.getPort());
+ } catch (SocketException e) {
+ throw new Error(e);
+ }
+ receiving.set(true);
+ new Thread(() -> {
+ while (receiving.get()) {
+ this.deliver(NetEvent.EMPTY());
+ }
+ }).start();
+ }
+
+ @Override
+ public void stop() {
+ socket.close();
+ receiving.set(false);
+ }
+}
diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerSL.java b/257844/src/main/java/cs451/net/handler/NetHandlerSL.java
new file mode 100644
index 0000000..8fa598d
--- /dev/null
+++ b/257844/src/main/java/cs451/net/handler/NetHandlerSL.java
@@ -0,0 +1,89 @@
+package cs451.net.handler;
+
+import cs451.net.event.Message;
+import cs451.net.NetManager;
+import cs451.net.event.*;
+import cs451.tools.Pair;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class NetHandlerSL extends NetEventHandler {
+
+ private static final Set EMPTY_SET = ConcurrentHashMap.newKeySet();
+
+ Set hasTimeout = ConcurrentHashMap.newKeySet();
+ Map> hasAck = new ConcurrentHashMap<>();
+ Map, Integer> retryCounter = new ConcurrentHashMap<>();
+
+ public NetHandlerSL(Class extends NetEventHandler> deliverLayer, Class extends NetEventHandler> broadcastLayer) {
+ super(deliverLayer,broadcastLayer);
+ }
+
+
+ private void handleTimeout(NetEvent ne) {
+ if (hasAck.getOrDefault(ne.peer.getId(),EMPTY_SET).contains(ne.message)) {
+ return;
+ }
+ if (retryCounter.compute(new Pair<>(ne.peer.getId(),ne.message), (k, ov) -> (ov == null) ? 2 : ov + 1) >= MAX_TRIES) {
+ timeout(ne);
+ } else {
+ send(ne);
+ }
+ }
+
+ private void checkTimeout(NetEvent ne) {
+ new Thread(()->{
+ Integer t = retryCounter.getOrDefault(new Pair<>(ne.peer.getId(), ne.message), 1);
+ new Timer().schedule(
+ new TimerTask() {
+ @Override
+ public void run () {
+ handleTimeout(ne);
+ }
+ }, MS_WAIT*t*t*t);
+ }).start();
+ }
+
+ @Override
+ public void send(NetEvent ne) {
+ if (hasTimeout.contains(ne.peer.getId())) {
+ return;
+ }
+ sendNext(ne);
+ checkTimeout(ne);
+ }
+
+ @Override
+ public void deliver(NetEvent ne) {
+ if (hasTimeout.contains(ne.peer.getId())) {
+ return;
+ }
+ switch (ne.message.tpe){
+ case ACK:
+ hasAck.compute(ne.peer.getId(),(k, s) -> {
+ if(s == null) s = ConcurrentHashMap.newKeySet();
+ s.add(ne.message);
+ return s;
+ });
+ break;
+ case DATA:
+ sendNext(NetEvent.MessageACK(ne.peer,ne.message));
+ deliverNext(ne);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void timeout(NetEvent event) {
+ if(hasTimeout.add(event.peer.getId())) {
+ NetManager.crash(NetHandlerURB.class, event);
+ }
+ }
+
+}
diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java b/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java
new file mode 100644
index 0000000..68aaed4
--- /dev/null
+++ b/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java
@@ -0,0 +1,58 @@
+package cs451.net.handler;
+
+import cs451.net.event.NetEvent;
+import cs451.parser.Host;
+import cs451.parser.Parser;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NetHandlerTOPL extends NetEventHandler {
+
+ private final AtomicInteger toSend = new AtomicInteger(0);
+ private final AtomicInteger delivered = new AtomicInteger(0);
+ private final AtomicInteger waiting = new AtomicInteger(0);
+
+ private Host me;
+
+ public NetHandlerTOPL(Class extends NetEventHandler> deliverLayer, Class extends NetEventHandler> broadcastLayer) {
+ super(deliverLayer, broadcastLayer);
+ }
+
+
+ private void sendIf(){
+ synchronized (waiting) {
+ while (waiting.get() < SEND_WINDOW && toSend.get() > 0) {
+ toSend.getAndDecrement();
+ waiting.getAndIncrement();
+ sendNextSync(NetEvent.EMPTY());
+ }
+ }
+ }
+
+ @Override
+ public void send(NetEvent ne) {
+ sendIf();
+ }
+
+ @Override
+ public void deliver(NetEvent ne) {
+ if (ne.message.src == me.getId()) {
+ synchronized (waiting) {
+ delivered.getAndIncrement();
+ waiting.getAndDecrement();
+ }
+ sendIf();
+ }
+ }
+
+ @Override
+ public boolean isDone() {
+ return toSend.get()==0 && waiting.get()==0;
+ }
+
+ @Override
+ public void start(Host h, Parser p) {
+ me = h;
+ toSend.set(p.messageCount());
+ }
+}
diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerURB.java b/257844/src/main/java/cs451/net/handler/NetHandlerURB.java
new file mode 100644
index 0000000..fa876cc
--- /dev/null
+++ b/257844/src/main/java/cs451/net/handler/NetHandlerURB.java
@@ -0,0 +1,65 @@
+package cs451.net.handler;
+
+import cs451.net.event.NetEvent;
+import cs451.parser.Host;
+import cs451.parser.Parser;
+import cs451.tools.Pair;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.*;
+
+
+public class NetHandlerURB extends NetEventHandler {
+
+
+ private final Set correct = ConcurrentHashMap.newKeySet();
+ private final Map, Set> ack = new ConcurrentHashMap<>();
+ private final Set> delivered = ConcurrentHashMap.newKeySet();
+ private final Set> pending = ConcurrentHashMap.newKeySet();
+
+ private Integer myId;
+
+
+ public NetHandlerURB(Class extends NetEventHandler> deliverLayer, Class extends NetEventHandler> broadcastLayer) {
+ super(deliverLayer,broadcastLayer);
+ }
+
+ private void deliverIf(){
+ pending.stream().filter(smp -> ack.getOrDefault(smp,ConcurrentHashMap.newKeySet()).containsAll(correct)).
+ filter(delivered::add).
+ forEach(smp -> deliverNext(NetEvent.Message(smp.first(),smp.second())));
+ }
+
+ @Override
+ public void send(NetEvent ne) {
+ pending.add(new Pair<>(myId, ne.message.id));
+ ne.message.src = myId;
+ sendNext(ne);
+ }
+
+ @Override
+ public void deliver(NetEvent ne) {
+ ack.compute(new Pair<>(ne.message.src,ne.message.id),(k, v) -> {
+ if(v == null){
+ v = ConcurrentHashMap.newKeySet();
+ }
+ v.add(ne.peer.getId());
+ return v;
+ });
+ if(pending.add(new Pair<>(ne.message.src,ne.message.id))){
+ sendNext(ne);
+ }
+ deliverIf();
+ }
+
+ public void crash(NetEvent ne) {
+ correct.remove(ne.peer.getId());
+ deliverIf();
+ }
+
+ public void start(Host h, Parser p) {
+ p.hosts().forEach(ch-> correct.add(ch.getId()));
+ myId = h.getId();
+ }
+}
\ No newline at end of file
diff --git a/257844/src/main/java/cs451/parser/BarrierParser.java b/257844/src/main/java/cs451/parser/BarrierParser.java
new file mode 100644
index 0000000..852d37e
--- /dev/null
+++ b/257844/src/main/java/cs451/parser/BarrierParser.java
@@ -0,0 +1,54 @@
+package cs451.parser;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class BarrierParser {
+
+ private static final String BARRIER_KEY = "--barrier";
+
+ private static final int BARRIER_ARGS_NUM = 2;
+ private static final String COLON_REGEX = ":";
+ private static final String IP_START_REGEX = "/";
+
+ private static String ip;
+ private static int port;
+
+ public boolean populate(String key, String value) {
+ if (!key.equals(BARRIER_KEY)) {
+ return false;
+ }
+
+ String[] barrier = value.split(COLON_REGEX);
+ if (barrier.length != BARRIER_ARGS_NUM) {
+ return false;
+ }
+
+ try {
+ String ipTest = InetAddress.getByName(barrier[0]).toString();
+ if (ipTest.startsWith(IP_START_REGEX)) {
+ ip = ipTest.substring(1);
+ } else {
+ ip = InetAddress.getByName(ipTest.split(IP_START_REGEX)[0]).getHostAddress();
+ }
+
+ port = Integer.parseInt(barrier[1]);
+ if (port <= 0) {
+ System.err.println("Barrier port must be a positive number!");
+ return false;
+ }
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+
+ return true;
+ }
+
+ public String getIp() {
+ return ip;
+ }
+
+ public int getPort() {
+ return port;
+ }
+}
diff --git a/257844/src/main/java/cs451/parser/ConfigParser.java b/257844/src/main/java/cs451/parser/ConfigParser.java
new file mode 100644
index 0000000..5837cbe
--- /dev/null
+++ b/257844/src/main/java/cs451/parser/ConfigParser.java
@@ -0,0 +1,44 @@
+package cs451.parser;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+
+public class ConfigParser {
+
+ private String path;
+
+ private int messages;
+
+
+ public boolean populate(String value) {
+ File file = new File(value);
+ path = file.getPath();
+
+ try (BufferedReader br = new BufferedReader(new FileReader(path))) {
+ int lineNum = 1;
+ for (String line; (line = br.readLine()) != null; lineNum++) {
+ switch(lineNum){
+ case 1:
+ messages = Integer.parseInt(line);
+ break;
+ default:
+ break;
+ }
+ }
+ } catch (IOException e) {
+ System.err.println("Problem with the hosts file!");
+ return false;
+ }
+ return true;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public int getMessages() {
+ return messages;
+ }
+}
diff --git a/257844/src/main/java/cs451/parser/Constants.java b/257844/src/main/java/cs451/parser/Constants.java
new file mode 100644
index 0000000..f855908
--- /dev/null
+++ b/257844/src/main/java/cs451/parser/Constants.java
@@ -0,0 +1,30 @@
+package cs451.parser;
+
+public class Constants {
+
+ public static final int ARG_LIMIT_NO_CONFIG = 10;
+ public static final int ARG_LIMIT_CONFIG = 11;
+
+ // indexes for id
+ public static final int ID_KEY = 0;
+ public static final int ID_VALUE = 1;
+
+ // indexes for hosts
+ public static final int HOSTS_KEY = 2;
+ public static final int HOSTS_VALUE = 3;
+
+ // indexes for barrier
+ public static final int BARRIER_KEY = 4;
+ public static final int BARRIER_VALUE = 5;
+
+ // indexes for signal
+ public static final int SIGNAL_KEY = 6;
+ public static final int SIGNAL_VALUE = 7;
+
+ // indexes for output
+ public static final int OUTPUT_KEY = 8;
+ public static final int OUTPUT_VALUE = 9;
+
+ // indexes for config
+ public static final int CONFIG_VALUE = 10;
+}
diff --git a/257844/src/main/java/cs451/parser/Coordinator.java b/257844/src/main/java/cs451/parser/Coordinator.java
new file mode 100644
index 0000000..57f6823
--- /dev/null
+++ b/257844/src/main/java/cs451/parser/Coordinator.java
@@ -0,0 +1,74 @@
+package cs451.parser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.net.Socket;
+
+public class Coordinator {
+
+ private final int pid;
+
+ private final String barrierIp;
+ private final int barrierPort;
+
+ private final String signalIp;
+ private final int signalPort;
+
+ private final Socket signalSocket;
+
+ public Coordinator(int pid, String barrierIp, int barrierPort, String signalIp, int signalPort) {
+ this.pid = pid;
+ this.barrierIp = barrierIp;
+ this.barrierPort = barrierPort;
+ this.signalIp = signalIp;
+ this.signalPort = signalPort;
+
+ signalSocket = connectToHost(this.signalIp, this.signalPort);
+ }
+
+ public void waitOnBarrier() {
+ try {
+ Socket socket = connectToHost(barrierIp, barrierPort);
+ InputStream input = socket.getInputStream();
+ InputStreamReader reader = new InputStreamReader(input);
+ System.out.println("Accessing barrier...");
+
+ while (reader.read() != -1) {
+ }
+ } catch (IOException ex) {
+ System.out.println("I/O error: " + ex.getMessage());
+ }
+ }
+
+ public void finishedBroadcasting() {
+ try {
+ signalSocket.close();
+ } catch (IOException ex) {
+ System.out.println("I/O error: " + ex.getMessage());
+ }
+ }
+
+ private Socket connectToHost(String ip, int port) {
+ Socket socket = null;
+ try {
+ socket = new Socket(ip, port);
+ OutputStream output = socket.getOutputStream();
+ DataOutputStream writer = new DataOutputStream(output);
+
+ ByteBuffer bb = ByteBuffer.allocate(8);
+ bb.order(ByteOrder.BIG_ENDIAN);
+ bb.putLong(pid);
+
+ writer.write(bb.array(), 0, 8);
+ } catch (IOException ex) {
+ System.out.println("I/O error: " + ex.getMessage());
+ }
+
+ return socket;
+ }
+}
diff --git a/257844/src/main/java/cs451/parser/Host.java b/257844/src/main/java/cs451/parser/Host.java
new file mode 100644
index 0000000..03a1401
--- /dev/null
+++ b/257844/src/main/java/cs451/parser/Host.java
@@ -0,0 +1,78 @@
+package cs451.parser;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+
+public class Host {
+
+ private static final String IP_START_REGEX = "/";
+
+ private int id = -1;
+ private String ip = null;
+ private int port = -1;
+ private SocketAddress addr = null;
+
+ public Host(InetSocketAddress addr, Integer id){
+ this.addr = addr;
+ this.ip = addr.getHostString();
+ this.port = addr.getPort();
+ this.id = id;
+ }
+
+ public Host(){}
+
+ public Host(Integer id){
+ this.id = id;
+ }
+
+ public boolean populate(String idString, String ipString, String portString) {
+ try {
+ id = Integer.parseInt(idString);
+
+ String ipTest = InetAddress.getByName(ipString).toString();
+ if (ipTest.startsWith(IP_START_REGEX)) {
+ ip = ipTest.substring(1);
+ } else {
+ ip = InetAddress.getByName(ipTest.split(IP_START_REGEX)[0]).getHostAddress();
+ }
+
+ port = Integer.parseInt(portString);
+ if (port <= 0) {
+ System.err.println("Port in the hosts file must be a positive number!");
+ return false;
+ }
+ } catch (NumberFormatException e) {
+ if (port == -1) {
+ System.err.println("Id in the hosts file must be a number!");
+ } else {
+ System.err.println("Port in the hosts file must be a number!");
+ }
+ return false;
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+
+ this.addr = new InetSocketAddress(ip, port);
+
+ return true;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getIp() {
+ return ip;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public SocketAddress getAddr() {
+ return addr;
+ }
+
+}
diff --git a/257844/src/main/java/cs451/parser/HostsParser.java b/257844/src/main/java/cs451/parser/HostsParser.java
new file mode 100644
index 0000000..76f3ce7
--- /dev/null
+++ b/257844/src/main/java/cs451/parser/HostsParser.java
@@ -0,0 +1,87 @@
+package cs451.parser;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+public class HostsParser {
+
+ private static final String HOSTS_KEY = "--hosts";
+ private static final String SPACES_REGEX = "\\s+";
+
+ private String filename;
+ private final List hosts = new ArrayList<>();
+
+ public boolean populate(String key, String filename) {
+ if (!key.equals(HOSTS_KEY)) {
+ return false;
+ }
+
+ this.filename = filename;
+ try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
+ int lineNum = 1;
+ for (String line; (line = br.readLine()) != null; lineNum++) {
+ if (line.isBlank()) {
+ continue;
+ }
+
+ String[] splits = line.split(SPACES_REGEX);
+ if (splits.length != 3) {
+ System.err.println("Problem with the line " + lineNum + " in the hosts file!");
+ return false;
+ }
+
+ Host newHost = new Host();
+ if (!newHost.populate(splits[0], splits[1], splits[2])) {
+ return false;
+ }
+
+ hosts.add(newHost);
+ }
+ } catch (IOException e) {
+ System.err.println("Problem with the hosts file!");
+ return false;
+ }
+
+ if (!checkIdRange()) {
+ System.err.println("Hosts ids are not within the range!");
+ return false;
+ }
+
+ // sort by id
+ hosts.sort(new HostsComparator());
+ return true;
+ }
+
+ private boolean checkIdRange() {
+ int num = hosts.size();
+ for (Host host : hosts) {
+ if (host.getId() < 1 || host.getId() > num) {
+ System.err.println("Id of a host is not in the right range!");
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public boolean inRange(int id) {
+ return id <= hosts.size();
+ }
+
+ public List getHosts() {
+ return hosts;
+ }
+
+ static class HostsComparator implements Comparator {
+
+ public int compare(Host a, Host b) {
+ return a.getId() - b.getId();
+ }
+
+ }
+
+}
diff --git a/257844/src/main/java/cs451/parser/IdParser.java b/257844/src/main/java/cs451/parser/IdParser.java
new file mode 100644
index 0000000..47bca2f
--- /dev/null
+++ b/257844/src/main/java/cs451/parser/IdParser.java
@@ -0,0 +1,31 @@
+package cs451.parser;
+
+public class IdParser {
+
+ private static final String ID_KEY = "--id";
+
+ private int id;
+
+ public boolean populate(String key, String value) {
+ if (!key.equals(ID_KEY)) {
+ return false;
+ }
+
+ try {
+ id = Integer.parseInt(value);
+ if (id <= 0) {
+ System.err.println("Id must be a positive number!");
+ }
+ } catch (NumberFormatException e) {
+ System.err.println("Id must be a number!");
+ return false;
+ }
+
+ return true;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+}
diff --git a/257844/src/main/java/cs451/parser/OutputParser.java b/257844/src/main/java/cs451/parser/OutputParser.java
new file mode 100644
index 0000000..9324dd9
--- /dev/null
+++ b/257844/src/main/java/cs451/parser/OutputParser.java
@@ -0,0 +1,25 @@
+package cs451.parser;
+
+import java.io.File;
+
+public class OutputParser {
+
+ private static final String OUTPUT_KEY = "--output";
+
+ private String path;
+
+ public boolean populate(String key, String value) {
+ if (!key.equals(OUTPUT_KEY)) {
+ return false;
+ }
+
+ File file = new File(value);
+ path = file.getPath();
+ return true;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+}
diff --git a/257844/src/main/java/cs451/parser/Parser.java b/257844/src/main/java/cs451/parser/Parser.java
new file mode 100644
index 0000000..3b6552a
--- /dev/null
+++ b/257844/src/main/java/cs451/parser/Parser.java
@@ -0,0 +1,111 @@
+package cs451.parser;
+
+import java.util.List;
+
+public class Parser {
+
+ private final String[] args;
+ private long pid;
+ private IdParser idParser;
+ private HostsParser hostsParser;
+ private BarrierParser barrierParser;
+ private SignalParser signalParser;
+ private OutputParser outputParser;
+ private ConfigParser configParser;
+
+ public Parser(String[] args) {
+ this.args = args;
+ }
+
+ public void parse() {
+ pid = ProcessHandle.current().pid();
+
+ idParser = new IdParser();
+ hostsParser = new HostsParser();
+ barrierParser = new BarrierParser();
+ signalParser = new SignalParser();
+ outputParser = new OutputParser();
+ configParser = null;
+
+ int argsNum = args.length;
+ if (argsNum != Constants.ARG_LIMIT_NO_CONFIG && argsNum != Constants.ARG_LIMIT_CONFIG) {
+ help();
+ }
+
+ if (!idParser.populate(args[Constants.ID_KEY], args[Constants.ID_VALUE])) {
+ help();
+ }
+
+ if (!hostsParser.populate(args[Constants.HOSTS_KEY], args[Constants.HOSTS_VALUE])) {
+ help();
+ }
+
+ if (!hostsParser.inRange(idParser.getId())) {
+ help();
+ }
+
+ if (!barrierParser.populate(args[Constants.BARRIER_KEY], args[Constants.BARRIER_VALUE])) {
+ help();
+ }
+
+ if (!signalParser.populate(args[Constants.SIGNAL_KEY], args[Constants.SIGNAL_VALUE])) {
+ help();
+ }
+
+ if (!outputParser.populate(args[Constants.OUTPUT_KEY], args[Constants.OUTPUT_VALUE])) {
+ help();
+ }
+
+ if (argsNum == Constants.ARG_LIMIT_CONFIG) {
+ configParser = new ConfigParser();
+ if (!configParser.populate(args[Constants.CONFIG_VALUE])) {
+ }
+ }
+ }
+
+ private void help() {
+ System.err.println("Usage: ./run.sh --id ID --hosts HOSTS --barrier NAME:PORT --signal NAME:PORT --output OUTPUT [config]");
+ System.exit(1);
+ }
+
+ public int myId() {
+ return idParser.getId();
+ }
+
+ public List hosts() {
+ return hostsParser.getHosts();
+ }
+
+ public String barrierIp() {
+ return barrierParser.getIp();
+ }
+
+ public int barrierPort() {
+ return barrierParser.getPort();
+ }
+
+ public String signalIp() {
+ return signalParser.getIp();
+ }
+
+ public int signalPort() {
+ return signalParser.getPort();
+ }
+
+ public String output() {
+ return outputParser.getPath();
+ }
+
+ public boolean hasConfig() {
+ return configParser != null;
+ }
+
+ public String config() {
+ return configParser.getPath();
+ }
+
+ public int messageCount(){
+ return configParser.getMessages();
+ }
+
+}
diff --git a/257844/src/main/java/cs451/parser/SignalParser.java b/257844/src/main/java/cs451/parser/SignalParser.java
new file mode 100644
index 0000000..f6e937f
--- /dev/null
+++ b/257844/src/main/java/cs451/parser/SignalParser.java
@@ -0,0 +1,54 @@
+package cs451.parser;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class SignalParser {
+
+ private static final String SIGNAL_KEY = "--signal";
+
+ private static final int SIGNAL_ARGS_NUM = 2;
+ private static final String COLON_REGEX = ":";
+ private static final String IP_START_REGEX = "/";
+
+ private static String ip;
+ private static int port;
+
+ public boolean populate(String key, String value) {
+ if (!key.equals(SIGNAL_KEY)) {
+ return false;
+ }
+
+ String[] signal = value.split(COLON_REGEX);
+ if (signal.length != SIGNAL_ARGS_NUM) {
+ return false;
+ }
+
+ try {
+ String ipTest = InetAddress.getByName(signal[0]).toString();
+ if (ipTest.startsWith(IP_START_REGEX)) {
+ ip = ipTest.substring(1);
+ } else {
+ ip = InetAddress.getByName(ipTest.split(IP_START_REGEX)[0]).getHostAddress();
+ }
+
+ port = Integer.parseInt(signal[1]);
+ if (port <= 0) {
+ System.err.println("Signal port must be a positive number!");
+ return false;
+ }
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ }
+
+ return true;
+ }
+
+ public String getIp() {
+ return ip;
+ }
+
+ public int getPort() {
+ return port;
+ }
+}
diff --git a/257844/src/main/java/cs451/tools/Logger.java b/257844/src/main/java/cs451/tools/Logger.java
new file mode 100644
index 0000000..cb8c202
--- /dev/null
+++ b/257844/src/main/java/cs451/tools/Logger.java
@@ -0,0 +1,42 @@
+package cs451.tools;
+
+public abstract class Logger {
+
+ public enum LOG_LEVEL{
+ ERR(-1),
+ INFO(0),
+ DEBUG(1),
+ ALL(99);
+
+ public final Integer v;
+ LOG_LEVEL(Integer v){
+ this.v = v;
+ }
+ public boolean isEnabled(){
+ return this.v <= LVL.v;
+ }
+ }
+
+ private static final LOG_LEVEL LVL = LOG_LEVEL.ALL;
+
+ public static void info(String msg){
+ if(LOG_LEVEL.INFO.isEnabled()){
+ System.out.println(msg);
+ }
+ }
+
+ public static void debug(String msg){
+ if(LOG_LEVEL.DEBUG.isEnabled()){
+ System.err.println(msg);
+ }
+ }
+
+ public static void error(String msg){
+ if(LOG_LEVEL.ERR.isEnabled()){
+ System.err.println(msg);
+ }
+ }
+
+
+
+}
diff --git a/257844/src/main/java/cs451/tools/Pair.java b/257844/src/main/java/cs451/tools/Pair.java
new file mode 100644
index 0000000..2f28ee1
--- /dev/null
+++ b/257844/src/main/java/cs451/tools/Pair.java
@@ -0,0 +1,58 @@
+package cs451.tools;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Pair implements Comparable {
+ private final S x;
+ private final T y;
+ private final List