From 9e4e8190ace576f556265ac3cab722a3306836e3 Mon Sep 17 00:00:00 2001 From: choelzl Date: Sun, 1 Nov 2020 20:55:20 +0100 Subject: [PATCH] implemented fifo --- 257844/.gitignore | 32 ++++ 257844/257844.iml | 11 ++ 257844/bin/README | 1 + 257844/bin/deploy/README | 1 + 257844/build.sh | 9 ++ 257844/cleanup.sh | 7 + 257844/pom.xml | 62 ++++++++ 257844/run.sh | 9 ++ 257844/src/main/java/cs451/Main.java | 139 ++++++++++++++++++ .../src/main/java/cs451/net/NetManager.java | 138 +++++++++++++++++ .../main/java/cs451/net/event/Message.java | 110 ++++++++++++++ .../main/java/cs451/net/event/NetEvent.java | 52 +++++++ .../java/cs451/net/event/NetEventType.java | 8 + .../cs451/net/handler/NetEventHandler.java | 84 +++++++++++ .../java/cs451/net/handler/NetHandlerBEB.java | 40 +++++ .../cs451/net/handler/NetHandlerDFLT.java | 8 + .../cs451/net/handler/NetHandlerFIFO.java | 59 ++++++++ .../java/cs451/net/handler/NetHandlerPL.java | 30 ++++ .../cs451/net/handler/NetHandlerSCKT.java | 94 ++++++++++++ .../java/cs451/net/handler/NetHandlerSL.java | 89 +++++++++++ .../cs451/net/handler/NetHandlerTOPL.java | 58 ++++++++ .../java/cs451/net/handler/NetHandlerURB.java | 65 ++++++++ .../main/java/cs451/parser/BarrierParser.java | 54 +++++++ .../main/java/cs451/parser/ConfigParser.java | 44 ++++++ .../src/main/java/cs451/parser/Constants.java | 30 ++++ .../main/java/cs451/parser/Coordinator.java | 74 ++++++++++ 257844/src/main/java/cs451/parser/Host.java | 78 ++++++++++ .../main/java/cs451/parser/HostsParser.java | 87 +++++++++++ .../src/main/java/cs451/parser/IdParser.java | 31 ++++ .../main/java/cs451/parser/OutputParser.java | 25 ++++ 257844/src/main/java/cs451/parser/Parser.java | 111 ++++++++++++++ .../main/java/cs451/parser/SignalParser.java | 54 +++++++ 257844/src/main/java/cs451/tools/Logger.java | 42 ++++++ 257844/src/main/java/cs451/tools/Pair.java | 58 ++++++++ bnr.sh | 6 + 35 files changed, 1800 insertions(+) create mode 100644 257844/.gitignore create mode 100644 257844/257844.iml create mode 100644 257844/bin/README create mode 100644 257844/bin/deploy/README create mode 100755 257844/build.sh create mode 100755 257844/cleanup.sh create mode 100644 257844/pom.xml create mode 100755 257844/run.sh create mode 100644 257844/src/main/java/cs451/Main.java create mode 100644 257844/src/main/java/cs451/net/NetManager.java create mode 100644 257844/src/main/java/cs451/net/event/Message.java create mode 100644 257844/src/main/java/cs451/net/event/NetEvent.java create mode 100644 257844/src/main/java/cs451/net/event/NetEventType.java create mode 100644 257844/src/main/java/cs451/net/handler/NetEventHandler.java create mode 100644 257844/src/main/java/cs451/net/handler/NetHandlerBEB.java create mode 100644 257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java create mode 100644 257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java create mode 100644 257844/src/main/java/cs451/net/handler/NetHandlerPL.java create mode 100644 257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java create mode 100644 257844/src/main/java/cs451/net/handler/NetHandlerSL.java create mode 100644 257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java create mode 100644 257844/src/main/java/cs451/net/handler/NetHandlerURB.java create mode 100644 257844/src/main/java/cs451/parser/BarrierParser.java create mode 100644 257844/src/main/java/cs451/parser/ConfigParser.java create mode 100644 257844/src/main/java/cs451/parser/Constants.java create mode 100644 257844/src/main/java/cs451/parser/Coordinator.java create mode 100644 257844/src/main/java/cs451/parser/Host.java create mode 100644 257844/src/main/java/cs451/parser/HostsParser.java create mode 100644 257844/src/main/java/cs451/parser/IdParser.java create mode 100644 257844/src/main/java/cs451/parser/OutputParser.java create mode 100644 257844/src/main/java/cs451/parser/Parser.java create mode 100644 257844/src/main/java/cs451/parser/SignalParser.java create mode 100644 257844/src/main/java/cs451/tools/Logger.java create mode 100644 257844/src/main/java/cs451/tools/Pair.java create mode 100755 bnr.sh diff --git a/257844/.gitignore b/257844/.gitignore new file mode 100644 index 0000000..c35584b --- /dev/null +++ b/257844/.gitignore @@ -0,0 +1,32 @@ +# Created by https://www.toptal.com/developers/gitignore/api/java +# Edit at https://www.toptal.com/developers/gitignore?templates=java + +target/ +bin/da_proc.jar + +### Java ### +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +# End of https://www.toptal.com/developers/gitignore/api/java diff --git a/257844/257844.iml b/257844/257844.iml new file mode 100644 index 0000000..f76df2b --- /dev/null +++ b/257844/257844.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/257844/bin/README b/257844/bin/README new file mode 100644 index 0000000..ced5c3b --- /dev/null +++ b/257844/bin/README @@ -0,0 +1 @@ +This is a reserved directory name! Store the binary generated by `build.sh` in this directory diff --git a/257844/bin/deploy/README b/257844/bin/deploy/README new file mode 100644 index 0000000..1a2f94d --- /dev/null +++ b/257844/bin/deploy/README @@ -0,0 +1 @@ +This is a reserved directory name, do not delete or use in your application! diff --git a/257844/build.sh b/257844/build.sh new file mode 100755 index 0000000..28a73eb --- /dev/null +++ b/257844/build.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +set -e + +# Change the current working directory to the location of the present file +cd "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" + +mvn clean compile assembly:single +mv target/da_project-1.0-SNAPSHOT-jar-with-dependencies.jar bin/da_proc.jar diff --git a/257844/cleanup.sh b/257844/cleanup.sh new file mode 100755 index 0000000..51b3a48 --- /dev/null +++ b/257844/cleanup.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +# Change the current working directory to the location of the present file +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" + +rm -f "$DIR"/bin/da_proc.jar +rm -rf "$DIR"/target diff --git a/257844/pom.xml b/257844/pom.xml new file mode 100644 index 0000000..1e24080 --- /dev/null +++ b/257844/pom.xml @@ -0,0 +1,62 @@ + + + + 4.0.0 + + cs451 + da_project + 1.0-SNAPSHOT + + DA_Project + https://www.helcel.net + + + UTF-8 + 1.11 + 1.11 + + + + + + + + maven-clean-plugin + 3.1.0 + + 11 + + + + + maven-compiler-plugin + 3.8.1 + + 11 + + + + maven-jar-plugin + 3.2.0 + + 11 + + + + maven-assembly-plugin + + + + cs451.Main + + + + jar-with-dependencies + + + + + + + diff --git a/257844/run.sh b/257844/run.sh new file mode 100755 index 0000000..4f300c7 --- /dev/null +++ b/257844/run.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +# Change the current working directory to the location of the present file +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" + +ret=0 +exec 3>&1; $(java -jar "$DIR"/bin/da_proc.jar "$@" >&3); ret=$?; exec 3>&- + +exit $ret diff --git a/257844/src/main/java/cs451/Main.java b/257844/src/main/java/cs451/Main.java new file mode 100644 index 0000000..b94aa11 --- /dev/null +++ b/257844/src/main/java/cs451/Main.java @@ -0,0 +1,139 @@ +package cs451; + +import cs451.net.event.Message; +import cs451.net.NetManager; +import cs451.net.event.NetEventType; +import cs451.parser.Coordinator; +import cs451.parser.Host; +import cs451.parser.Parser; +import cs451.tools.Logger; +import cs451.tools.Pair; + +import java.io.FileWriter; +import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class Main { + + private static String formatOutput(Integer id, Integer host, Message.TYPE tpe){ + switch (tpe){ + case DATA: + return "d "+host+" "+id+"\n"; + case ACK: + return "d "+host+" "+id+"\n"; + case BCST: + return "b "+id+"\n"; + default: + return ""; + } + } + + private static void writeOutput(String filepath) throws IOException { + FileWriter writer = new FileWriter(filepath); + + for(Pair, Message.TYPE> mhpt : mh_tpe){ + try { + writer.write(formatOutput(mhpt.first().first(),mhpt.first().second(),mhpt.second())); + } catch (IOException e) { + e.printStackTrace(); + } + } + + writer.close(); + } + + private static void handleSignal() { + //immediately stop network packet processing + System.out.println("Immediately stopping network packet processing."); + + NetManager.stop(); + + //write/flush output file if necessary + System.out.println("Writing output."); + + try { + writeOutput(parser.output()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private static void initSignalHandlers() { + Runtime.getRuntime().addShutdownHook(new Thread(Main::handleSignal)); + } + + private static Host me = null; + private static final Queue,Message.TYPE>> mh_tpe = new ConcurrentLinkedQueue<>(); + + private static Parser parser = null; + + + public static void main(String[] args) throws InterruptedException { + parser = new Parser(args); + parser.parse(); + + initSignalHandlers(); + + // example + long pid = ProcessHandle.current().pid(); + System.out.println("My PID is " + pid + "."); + System.out.println("Use 'kill -SIGINT " + pid + " ' or 'kill -SIGTERM " + pid + " ' to stop processing packets."); + + System.out.println("My id is " + parser.myId() + "."); + System.out.println("List of hosts is:"); + for (Host host : parser.hosts()) { + if (host.getId() == parser.myId()) { + me = host; + } + System.out.println(host.getId() + ", " + host.getIp() + ", " + host.getPort()); + } + + System.out.println("Barrier: " + parser.barrierIp() + ":" + parser.barrierPort()); + System.out.println("Signal: " + parser.signalIp() + ":" + parser.signalPort()); + System.out.println("Output: " + parser.output()); + // if config is defined; always check before parser.config() + if (parser.hasConfig()) { + System.out.println("Config: " + parser.config()); + } + Coordinator coordinator = new Coordinator(parser.myId(), parser.barrierIp(), parser.barrierPort(), parser.signalIp(), parser.signalPort()); + + NetManager.start(me,parser,(t,ne) -> { + Logger.error(t+"(" + ne.message.src + "):" + ne.message.toString()); + if(t == NetEventType.DLVR){ + mh_tpe.add(new Pair<>(new Pair<>(ne.message.id,ne.message.src),ne.message.tpe)); + }else if(t== NetEventType.SEND){ + mh_tpe.add(new Pair<>(new Pair<>(ne.message.id,ne.message.src),ne.message.tpe)); + } + }, (t,ne) -> { + Logger.error("ERR"+t+" - "+ne.getMessage()); + }); + + System.out.println("Waiting for all processes for finish initialization"); + coordinator.waitOnBarrier(); + + System.out.println("Broadcasting messages..."); + NetManager.send(); + + while(!NetManager.isDone()) { + Thread.sleep(500); + } + + System.out.println("Stopping NetManager"); + NetManager.stop(); + + System.out.println("Signaling end of broadcasting messages"); + coordinator.finishedBroadcasting(); + + try { + writeOutput(parser.output()); + } catch (IOException e) { + e.printStackTrace(); + } + + while (true) { + // Sleep for 1 hour + Thread.sleep(60 * 60 * 1000); + } + } +} diff --git a/257844/src/main/java/cs451/net/NetManager.java b/257844/src/main/java/cs451/net/NetManager.java new file mode 100644 index 0000000..4ee7b28 --- /dev/null +++ b/257844/src/main/java/cs451/net/NetManager.java @@ -0,0 +1,138 @@ +package cs451.net; + +import cs451.net.event.*; + +import cs451.net.handler.*; +import cs451.parser.Host; +import cs451.parser.Parser; +import cs451.tools.Logger; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; +import java.util.function.BiConsumer; + + + +public abstract class NetManager { + private static final Integer THREAD_COUNT = 8; + + private static boolean isStopped = false; + + private static final Map, NetEventHandler> nm_listeners = new HashMap<>(); + + private static final ThreadPoolExecutor ex = new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT,60,TimeUnit.SECONDS,new LinkedBlockingQueue<>()); + + private static BiConsumer onCompleteHandler; + private static BiConsumer onErrorHandler; + + + private static void registerNetHandler(NetEventHandler handler){ + nm_listeners.put(handler.getClass(),handler); + } + + public static void start(Host h, Parser p, BiConsumer och, BiConsumer oeh) { + onCompleteHandler = och; + onErrorHandler = oeh; + + registerNetHandler(new NetHandlerSCKT( NetHandlerSL.class, NetHandlerDFLT.class)); + registerNetHandler(new NetHandlerSL( NetHandlerPL.class, NetHandlerSCKT.class)); + registerNetHandler(new NetHandlerPL( NetHandlerBEB.class, NetHandlerSL.class)); + registerNetHandler(new NetHandlerBEB( NetHandlerURB.class, NetHandlerPL.class)); + registerNetHandler(new NetHandlerURB( NetHandlerFIFO.class, NetHandlerBEB.class)); + registerNetHandler(new NetHandlerFIFO( NetHandlerTOPL.class, NetHandlerURB.class)); + registerNetHandler(new NetHandlerTOPL( NetHandlerDFLT.class, NetHandlerFIFO.class)); + registerNetHandler(new NetHandlerDFLT( NetHandlerDFLT.class, NetHandlerDFLT.class)); + + nm_listeners.values().forEach(neh -> neh.start(h,p)); + ex.prestartAllCoreThreads(); + } + + public static void stop() { + isStopped = true; + nm_listeners.values().forEach(NetEventHandler::stop); + ex.shutdown(); + System.out.println("NetManager handled "+ex.getCompletedTaskCount()+" tasks during this run."); + } + + public static boolean isDone() { + return isStopped || nm_listeners.values().stream().map(NetEventHandler::isDone).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2); + } + + +//================================================================================================================= +//================================================================================================================= + + public static void handle(Class layer, NetEventType event, NetEvent ne){ + try { + ex.getQueue().add(new NetEventRunner(ne, layer, event)); + }catch(IllegalStateException ise){ + error(event,ise); + } + } + + public static void complete(NetEventType t,NetEvent ne){ + onCompleteHandler.accept(t,ne); + } + + public static void error(NetEventType t,Exception e){ + onErrorHandler.accept(t,e); + } + +//================================================================================================================= +//================================================================================================================= + + public static void deliver(Class layer, NetEvent ne) { + handle(layer,NetEventType.DLVR,ne); + } + + public static void send(Class layer, NetEvent ne) { + handle(layer,NetEventType.SEND,ne); + } + + public static void crash(Class layer, NetEvent ne) { + handle(layer,NetEventType.CRSH,ne); + } + + public static void send() { + new NetEventRunner(NetEvent.EMPTY(),NetHandlerTOPL.class,NetEventType.SEND).run(); + } + + public static void deliverSync(Class layer, NetEvent ne) { + new NetEventRunner(ne,layer,NetEventType.DLVR).run(); + } + public static void sendSync(Class layer, NetEvent ne) { + new NetEventRunner(ne,layer,NetEventType.SEND).run(); + } + +//================================================================================================================= +//================================================================================================================= + + private static class NetEventRunner implements Runnable { + + private final NetEvent ne; + private final Class lt; + private final NetEventType net; + + private NetEventRunner(NetEvent ne, Class lt, NetEventType net) { + this.ne = ne; + this.lt = lt; + this.net = net; + } + + @Override + public void run() { + Logger.debug(this.lt.getSimpleName()+"_"+this.net+ ": " + ((ne.peer==null)?"?":ne.peer.getId()) + " - "+ne.message.toString()+""); + + NetEventHandler nl = nm_listeners.getOrDefault(this.lt, null); + if (nl != null) { + nl.onEvent(this.net,ne); + }else{ + error(this.net,new UnsupportedOperationException("No Handler for "+this.lt)); + } + } + } + +} + + diff --git a/257844/src/main/java/cs451/net/event/Message.java b/257844/src/main/java/cs451/net/event/Message.java new file mode 100644 index 0000000..2f66ebb --- /dev/null +++ b/257844/src/main/java/cs451/net/event/Message.java @@ -0,0 +1,110 @@ +package cs451.net.event; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Objects; + +public class Message implements Comparable { + + public enum TYPE { + NONE("NONE", 'N'), + ERR("ERR", 'E'), + DATA("DATA", ' '), + ACK("ACK", 'A'), + BCST("BCST", 'B'); + + public final Character c; + public final String tag; + TYPE(String tag, Character c){ + this.tag = tag; + this.c = c; + } + + } + + public int src = -1; + public final int id; + public TYPE tpe; + + public static Message DMSG(Integer id){ + return new Message(id,TYPE.DATA); + } + public static Message DMSG(Integer id, Integer src) { + return new Message(id,TYPE.DATA,src); + } + + public static Message TMSG(Integer mess_id, TYPE tpe) { + return new Message(mess_id, tpe); + } + + public static Message EMPTY(){ + return new Message(-1, TYPE.NONE); + } + + private static TYPE CharacterToTpe(Character c){ + return Arrays.stream(TYPE.values()).filter(type -> type.c==c).findFirst().orElse(TYPE.NONE); + } + public static Message FromBuffer(ByteBuffer b) { + Character tpe = b.getChar(); + Integer id = b.getInt(); + Integer src = b.getInt(); + return new Message(id,CharacterToTpe(tpe),src); + } + + public void ToBuffer(ByteBuffer b){ + b.putChar(tpe.c); + b.putInt(id); + b.putInt(src); + } + + public Message(Message m) { + this.id = m.id; + this.tpe = m.tpe; + this.src = m.src; + } + private Message(Integer id, TYPE tpe){ + this.id = id; + this.tpe = tpe; + } + private Message(Integer id, TYPE tpe, Integer src){ + this.id = id; + this.tpe = tpe; + this.src = src; + } + + @Override + public String toString() { + switch(tpe) { + case DATA: + return src+"-"+ id; + default: + return src+"-"+tpe.c + id; + } + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final Message m = (Message) obj; + return id == m.id && src == m.src; + } + + @Override + public int hashCode() { + return Objects.hash(id,src); + } + + @Override + public int compareTo(Object obj) { + final Message m = (Message) obj; + return id-m.id; + } +} diff --git a/257844/src/main/java/cs451/net/event/NetEvent.java b/257844/src/main/java/cs451/net/event/NetEvent.java new file mode 100644 index 0000000..5c90aae --- /dev/null +++ b/257844/src/main/java/cs451/net/event/NetEvent.java @@ -0,0 +1,52 @@ +package cs451.net.event; + +import cs451.parser.Host; + +public class NetEvent { + + public final Host peer; + public final Message message; + + private NetEvent(Host peer, Message message) { + this.peer = peer; + this.message = message; + } + + + public static NetEvent Message(Host peer, Integer mess_id){ + return new NetEvent(peer, Message.DMSG(mess_id)); + } + + public static NetEvent Message(Host peer, Integer mess_id, Message.TYPE tpe){ + return new NetEvent(peer, Message.TMSG(mess_id,tpe)); + } + + public static NetEvent Message(Host peer, Message m){ + return new NetEvent(peer,new Message(m)); + } + + public static NetEvent Message(Message m){ + return new NetEvent(NO_PEER,new Message(m)); + } + + public static NetEvent Message(Integer mess_id){ + return new NetEvent(NO_PEER, Message.DMSG(mess_id)); + } + + public static NetEvent Message(Integer src, Integer mess_id) { + return new NetEvent(NO_PEER, Message.DMSG(mess_id,src)); + } + + public static NetEvent MessageACK(Host peer, Message message) { + NetEvent ne = NetEvent.Message(peer,message); + ne.message.tpe = Message.TYPE.ACK; + return ne; + } + + + public static NetEvent EMPTY(){ + return new NetEvent(NO_PEER, Message.EMPTY()); + } + + public static final Host NO_PEER = null; +} \ No newline at end of file diff --git a/257844/src/main/java/cs451/net/event/NetEventType.java b/257844/src/main/java/cs451/net/event/NetEventType.java new file mode 100644 index 0000000..3ff2590 --- /dev/null +++ b/257844/src/main/java/cs451/net/event/NetEventType.java @@ -0,0 +1,8 @@ +package cs451.net.event; + +public enum NetEventType { + DLVR, + SEND, + CRSH, + RCVR; +} diff --git a/257844/src/main/java/cs451/net/handler/NetEventHandler.java b/257844/src/main/java/cs451/net/handler/NetEventHandler.java new file mode 100644 index 0000000..d37b5e1 --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetEventHandler.java @@ -0,0 +1,84 @@ +package cs451.net.handler; + +import cs451.net.NetManager; +import cs451.net.event.NetEvent; +import cs451.net.event.NetEventType; +import cs451.parser.Host; +import cs451.parser.Parser; +import cs451.tools.Logger; + +public abstract class NetEventHandler { + + /* + * 1 - 50ms - 50ms + * 2 - 400ms - 450ms + * 3 - 1350ms - 1800ms + * 4 - 3200ms - 5000ms + */ + public static final int MS_WAIT = 50; + public static final int MAX_TRIES = 5; + + public static final Integer SEND_WINDOW = 3; + + + + private final Class deliverLayer; + private final Class broadcastLayer; + + + NetEventHandler(Class deliverLayer, Class broadcastLayer) { + this.deliverLayer = deliverLayer; + this.broadcastLayer = broadcastLayer; + } + + public void deliverNextSync(NetEvent ne){ + NetManager.deliverSync(deliverLayer,ne); + } + public void sendNextSync(NetEvent ne){ + NetManager.sendSync(broadcastLayer,ne); + } + + public void deliverNext(NetEvent ne){ + NetManager.deliver(deliverLayer,ne); + } + public void sendNext(NetEvent ne){ + NetManager.send(broadcastLayer,ne); + } + + public void onEvent(NetEventType et, NetEvent ne){ + switch (et){ + case DLVR: deliver(ne); + break; + case SEND: send(ne); + break; + case CRSH: crash(ne); + break; + case RCVR: recover(ne); + break; + default: + Logger.error("Unhandled EventType:"+et); + } + } + + public void send(NetEvent ne){ + NetManager.error(NetEventType.SEND,new Exception("Default Handler")); + } + + public void deliver(NetEvent ne){ + NetManager.error(NetEventType.DLVR,new Exception("Default Handler")); + } + + public void crash(NetEvent ne){ + NetManager.error(NetEventType.CRSH,new Exception("Default Handler")); + } + + public void recover(NetEvent ne){ + NetManager.error(NetEventType.RCVR,new Exception("Default Handler")); + } + + public void start(Host h, Parser p) {} + + public void stop() {} + + public boolean isDone(){ return true;} +} diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java b/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java new file mode 100644 index 0000000..1016704 --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetHandlerBEB.java @@ -0,0 +1,40 @@ +package cs451.net.handler; + +import cs451.net.event.NetEvent; +import cs451.parser.Host; +import cs451.parser.Parser; + +import java.util.List; + +public class NetHandlerBEB extends NetEventHandler { + + private List hosts; + private Host h; + + public NetHandlerBEB(Class deliverLayer, Class broadcastLayer) { + super(deliverLayer,broadcastLayer); + } + + + @Override + public void send(NetEvent ne) { + for(Host h : hosts){ + if(this.h.getId() != h.getId()) { + sendNext(NetEvent.Message(h, ne.message)); + }else{ + deliverNext(NetEvent.Message(h,ne.message)); + } + } + } + + @Override + public void deliver(NetEvent ne) { + deliverNext(ne); + } + + public void start(Host h, Parser p) { + this.h = h; + hosts = p.hosts(); + } + +} diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java b/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java new file mode 100644 index 0000000..385b516 --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetHandlerDFLT.java @@ -0,0 +1,8 @@ +package cs451.net.handler; + +public class NetHandlerDFLT extends NetEventHandler { + + public NetHandlerDFLT(Class deliverLayer, Class broadcastLayer) { + super(deliverLayer,broadcastLayer); + } +} diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java b/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java new file mode 100644 index 0000000..4364a1c --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetHandlerFIFO.java @@ -0,0 +1,59 @@ +package cs451.net.handler; + +import cs451.net.NetManager; +import cs451.net.event.Message; +import cs451.net.event.NetEvent; +import cs451.net.event.NetEventType; +import cs451.parser.Host; +import cs451.parser.Parser; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +public class NetHandlerFIFO extends NetEventHandler { + + private final AtomicInteger sn = new AtomicInteger(1); + private final Map rsn = new ConcurrentHashMap<>(); + private final PriorityBlockingQueue pending = new PriorityBlockingQueue<>(); + + private Host me; + + public NetHandlerFIFO(Class deliverLayer, Class broadcastLayer) { + super(deliverLayer,broadcastLayer); + } + + @Override + public void send(NetEvent ne) { + Integer snv = sn.getAndIncrement(); + NetManager.complete(NetEventType.SEND, NetEvent.Message(me, snv, Message.TYPE.BCST)); + sendNext(NetEvent.Message(me.getId(),snv)); + } + + private void deliverIf(){ + synchronized (pending) { + pending.removeIf(hmp -> { + Integer crsn = rsn.getOrDefault(hmp.src, 1); + if (hmp.id == crsn) { + deliverNextSync(NetEvent.Message(hmp)); + NetManager.complete(NetEventType.DLVR,NetEvent.Message(hmp)); + rsn.put(hmp.src, crsn + 1); + return true; + } + return false; + }); + } + } + + @Override + public void deliver(NetEvent ne) { + pending.add(ne.message); + deliverIf(); + } + + @Override + public void start(Host h, Parser p) { + me = h; + } +} diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerPL.java b/257844/src/main/java/cs451/net/handler/NetHandlerPL.java new file mode 100644 index 0000000..88344a7 --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetHandlerPL.java @@ -0,0 +1,30 @@ +package cs451.net.handler; + +import cs451.net.event.*; +import cs451.tools.Pair; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + + +public class NetHandlerPL extends NetEventHandler { + + private final Set> delivered = ConcurrentHashMap.newKeySet(); + + public NetHandlerPL(Class deliverLayer, Class broadcastLayer) { + super(deliverLayer,broadcastLayer); + } + + @Override + public void send(NetEvent event) { + sendNext(event); + } + + @Override + public void deliver(NetEvent event) { + if(delivered.add(new Pair<>(event.peer.getId(),event.message))){ + deliverNext(event); + } + } + +} diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java b/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java new file mode 100644 index 0000000..698f421 --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetHandlerSCKT.java @@ -0,0 +1,94 @@ +package cs451.net.handler; + +import cs451.net.event.Message; +import cs451.net.NetManager; +import cs451.net.event.*; +import cs451.parser.Host; +import cs451.parser.Parser; + +import java.io.IOException; +import java.net.*; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +public class NetHandlerSCKT extends NetEventHandler { + + private static final Integer BUFF_SIZE = 128; + + private List hosts; + + private DatagramSocket socket = null; + private final AtomicBoolean receiving = new AtomicBoolean(true); + + + public NetHandlerSCKT(Class deliverLayer, Class broadcastLayer) { + super(deliverLayer,broadcastLayer); + } + + @Override + public void send(NetEvent event) { + if(socket.isClosed()){ + NetManager.error(NetEventType.SEND,new SocketException("Socket Closed")); + return; + } + ByteBuffer b = ByteBuffer.allocate(BUFF_SIZE); + event.message.ToBuffer(b); + DatagramPacket datagram = new DatagramPacket(b.array(), b.position(), event.peer.getAddr()); + try { + socket.send(datagram); + } catch (IOException e) { + NetManager.error(NetEventType.SEND,e); + } + } + + @Override + public void deliver(NetEvent event) { + if(socket.isClosed()){ + NetManager.error(NetEventType.DLVR,new SocketException("Socket Closed")); + return; + } + + byte[] buff = new byte[BUFF_SIZE]; + DatagramPacket datagram = new DatagramPacket(buff, buff.length); + try { + socket.receive(datagram); + Optional rhid = hosts.stream().filter(h -> h.getAddr().equals(datagram.getSocketAddress())).map(Host::getId).findFirst(); + + deliverNext(NetEvent.Message( + new Host((InetSocketAddress) datagram.getSocketAddress(), rhid.orElse(-1)), + Message.FromBuffer(ByteBuffer.wrap(datagram.getData())) + )); + } catch (IOException e) { + NetManager.error(NetEventType.DLVR,e); + } + } + + + @Override + public void start(Host h, Parser p) { + Objects.requireNonNull(h); + Objects.requireNonNull(p); + hosts = p.hosts(); + + try { + this.socket = new DatagramSocket(h.getPort()); + } catch (SocketException e) { + throw new Error(e); + } + receiving.set(true); + new Thread(() -> { + while (receiving.get()) { + this.deliver(NetEvent.EMPTY()); + } + }).start(); + } + + @Override + public void stop() { + socket.close(); + receiving.set(false); + } +} diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerSL.java b/257844/src/main/java/cs451/net/handler/NetHandlerSL.java new file mode 100644 index 0000000..8fa598d --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetHandlerSL.java @@ -0,0 +1,89 @@ +package cs451.net.handler; + +import cs451.net.event.Message; +import cs451.net.NetManager; +import cs451.net.event.*; +import cs451.tools.Pair; + +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; + + +public class NetHandlerSL extends NetEventHandler { + + private static final Set EMPTY_SET = ConcurrentHashMap.newKeySet(); + + Set hasTimeout = ConcurrentHashMap.newKeySet(); + Map> hasAck = new ConcurrentHashMap<>(); + Map, Integer> retryCounter = new ConcurrentHashMap<>(); + + public NetHandlerSL(Class deliverLayer, Class broadcastLayer) { + super(deliverLayer,broadcastLayer); + } + + + private void handleTimeout(NetEvent ne) { + if (hasAck.getOrDefault(ne.peer.getId(),EMPTY_SET).contains(ne.message)) { + return; + } + if (retryCounter.compute(new Pair<>(ne.peer.getId(),ne.message), (k, ov) -> (ov == null) ? 2 : ov + 1) >= MAX_TRIES) { + timeout(ne); + } else { + send(ne); + } + } + + private void checkTimeout(NetEvent ne) { + new Thread(()->{ + Integer t = retryCounter.getOrDefault(new Pair<>(ne.peer.getId(), ne.message), 1); + new Timer().schedule( + new TimerTask() { + @Override + public void run () { + handleTimeout(ne); + } + }, MS_WAIT*t*t*t); + }).start(); + } + + @Override + public void send(NetEvent ne) { + if (hasTimeout.contains(ne.peer.getId())) { + return; + } + sendNext(ne); + checkTimeout(ne); + } + + @Override + public void deliver(NetEvent ne) { + if (hasTimeout.contains(ne.peer.getId())) { + return; + } + switch (ne.message.tpe){ + case ACK: + hasAck.compute(ne.peer.getId(),(k, s) -> { + if(s == null) s = ConcurrentHashMap.newKeySet(); + s.add(ne.message); + return s; + }); + break; + case DATA: + sendNext(NetEvent.MessageACK(ne.peer,ne.message)); + deliverNext(ne); + break; + default: + break; + } + } + + private void timeout(NetEvent event) { + if(hasTimeout.add(event.peer.getId())) { + NetManager.crash(NetHandlerURB.class, event); + } + } + +} diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java b/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java new file mode 100644 index 0000000..68aaed4 --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetHandlerTOPL.java @@ -0,0 +1,58 @@ +package cs451.net.handler; + +import cs451.net.event.NetEvent; +import cs451.parser.Host; +import cs451.parser.Parser; + +import java.util.concurrent.atomic.AtomicInteger; + +public class NetHandlerTOPL extends NetEventHandler { + + private final AtomicInteger toSend = new AtomicInteger(0); + private final AtomicInteger delivered = new AtomicInteger(0); + private final AtomicInteger waiting = new AtomicInteger(0); + + private Host me; + + public NetHandlerTOPL(Class deliverLayer, Class broadcastLayer) { + super(deliverLayer, broadcastLayer); + } + + + private void sendIf(){ + synchronized (waiting) { + while (waiting.get() < SEND_WINDOW && toSend.get() > 0) { + toSend.getAndDecrement(); + waiting.getAndIncrement(); + sendNextSync(NetEvent.EMPTY()); + } + } + } + + @Override + public void send(NetEvent ne) { + sendIf(); + } + + @Override + public void deliver(NetEvent ne) { + if (ne.message.src == me.getId()) { + synchronized (waiting) { + delivered.getAndIncrement(); + waiting.getAndDecrement(); + } + sendIf(); + } + } + + @Override + public boolean isDone() { + return toSend.get()==0 && waiting.get()==0; + } + + @Override + public void start(Host h, Parser p) { + me = h; + toSend.set(p.messageCount()); + } +} diff --git a/257844/src/main/java/cs451/net/handler/NetHandlerURB.java b/257844/src/main/java/cs451/net/handler/NetHandlerURB.java new file mode 100644 index 0000000..fa876cc --- /dev/null +++ b/257844/src/main/java/cs451/net/handler/NetHandlerURB.java @@ -0,0 +1,65 @@ +package cs451.net.handler; + +import cs451.net.event.NetEvent; +import cs451.parser.Host; +import cs451.parser.Parser; +import cs451.tools.Pair; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; + + +public class NetHandlerURB extends NetEventHandler { + + + private final Set correct = ConcurrentHashMap.newKeySet(); + private final Map, Set> ack = new ConcurrentHashMap<>(); + private final Set> delivered = ConcurrentHashMap.newKeySet(); + private final Set> pending = ConcurrentHashMap.newKeySet(); + + private Integer myId; + + + public NetHandlerURB(Class deliverLayer, Class broadcastLayer) { + super(deliverLayer,broadcastLayer); + } + + private void deliverIf(){ + pending.stream().filter(smp -> ack.getOrDefault(smp,ConcurrentHashMap.newKeySet()).containsAll(correct)). + filter(delivered::add). + forEach(smp -> deliverNext(NetEvent.Message(smp.first(),smp.second()))); + } + + @Override + public void send(NetEvent ne) { + pending.add(new Pair<>(myId, ne.message.id)); + ne.message.src = myId; + sendNext(ne); + } + + @Override + public void deliver(NetEvent ne) { + ack.compute(new Pair<>(ne.message.src,ne.message.id),(k, v) -> { + if(v == null){ + v = ConcurrentHashMap.newKeySet(); + } + v.add(ne.peer.getId()); + return v; + }); + if(pending.add(new Pair<>(ne.message.src,ne.message.id))){ + sendNext(ne); + } + deliverIf(); + } + + public void crash(NetEvent ne) { + correct.remove(ne.peer.getId()); + deliverIf(); + } + + public void start(Host h, Parser p) { + p.hosts().forEach(ch-> correct.add(ch.getId())); + myId = h.getId(); + } +} \ No newline at end of file diff --git a/257844/src/main/java/cs451/parser/BarrierParser.java b/257844/src/main/java/cs451/parser/BarrierParser.java new file mode 100644 index 0000000..852d37e --- /dev/null +++ b/257844/src/main/java/cs451/parser/BarrierParser.java @@ -0,0 +1,54 @@ +package cs451.parser; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class BarrierParser { + + private static final String BARRIER_KEY = "--barrier"; + + private static final int BARRIER_ARGS_NUM = 2; + private static final String COLON_REGEX = ":"; + private static final String IP_START_REGEX = "/"; + + private static String ip; + private static int port; + + public boolean populate(String key, String value) { + if (!key.equals(BARRIER_KEY)) { + return false; + } + + String[] barrier = value.split(COLON_REGEX); + if (barrier.length != BARRIER_ARGS_NUM) { + return false; + } + + try { + String ipTest = InetAddress.getByName(barrier[0]).toString(); + if (ipTest.startsWith(IP_START_REGEX)) { + ip = ipTest.substring(1); + } else { + ip = InetAddress.getByName(ipTest.split(IP_START_REGEX)[0]).getHostAddress(); + } + + port = Integer.parseInt(barrier[1]); + if (port <= 0) { + System.err.println("Barrier port must be a positive number!"); + return false; + } + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + return true; + } + + public String getIp() { + return ip; + } + + public int getPort() { + return port; + } +} diff --git a/257844/src/main/java/cs451/parser/ConfigParser.java b/257844/src/main/java/cs451/parser/ConfigParser.java new file mode 100644 index 0000000..5837cbe --- /dev/null +++ b/257844/src/main/java/cs451/parser/ConfigParser.java @@ -0,0 +1,44 @@ +package cs451.parser; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; + +public class ConfigParser { + + private String path; + + private int messages; + + + public boolean populate(String value) { + File file = new File(value); + path = file.getPath(); + + try (BufferedReader br = new BufferedReader(new FileReader(path))) { + int lineNum = 1; + for (String line; (line = br.readLine()) != null; lineNum++) { + switch(lineNum){ + case 1: + messages = Integer.parseInt(line); + break; + default: + break; + } + } + } catch (IOException e) { + System.err.println("Problem with the hosts file!"); + return false; + } + return true; + } + + public String getPath() { + return path; + } + + public int getMessages() { + return messages; + } +} diff --git a/257844/src/main/java/cs451/parser/Constants.java b/257844/src/main/java/cs451/parser/Constants.java new file mode 100644 index 0000000..f855908 --- /dev/null +++ b/257844/src/main/java/cs451/parser/Constants.java @@ -0,0 +1,30 @@ +package cs451.parser; + +public class Constants { + + public static final int ARG_LIMIT_NO_CONFIG = 10; + public static final int ARG_LIMIT_CONFIG = 11; + + // indexes for id + public static final int ID_KEY = 0; + public static final int ID_VALUE = 1; + + // indexes for hosts + public static final int HOSTS_KEY = 2; + public static final int HOSTS_VALUE = 3; + + // indexes for barrier + public static final int BARRIER_KEY = 4; + public static final int BARRIER_VALUE = 5; + + // indexes for signal + public static final int SIGNAL_KEY = 6; + public static final int SIGNAL_VALUE = 7; + + // indexes for output + public static final int OUTPUT_KEY = 8; + public static final int OUTPUT_VALUE = 9; + + // indexes for config + public static final int CONFIG_VALUE = 10; +} diff --git a/257844/src/main/java/cs451/parser/Coordinator.java b/257844/src/main/java/cs451/parser/Coordinator.java new file mode 100644 index 0000000..57f6823 --- /dev/null +++ b/257844/src/main/java/cs451/parser/Coordinator.java @@ -0,0 +1,74 @@ +package cs451.parser; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.net.Socket; + +public class Coordinator { + + private final int pid; + + private final String barrierIp; + private final int barrierPort; + + private final String signalIp; + private final int signalPort; + + private final Socket signalSocket; + + public Coordinator(int pid, String barrierIp, int barrierPort, String signalIp, int signalPort) { + this.pid = pid; + this.barrierIp = barrierIp; + this.barrierPort = barrierPort; + this.signalIp = signalIp; + this.signalPort = signalPort; + + signalSocket = connectToHost(this.signalIp, this.signalPort); + } + + public void waitOnBarrier() { + try { + Socket socket = connectToHost(barrierIp, barrierPort); + InputStream input = socket.getInputStream(); + InputStreamReader reader = new InputStreamReader(input); + System.out.println("Accessing barrier..."); + + while (reader.read() != -1) { + } + } catch (IOException ex) { + System.out.println("I/O error: " + ex.getMessage()); + } + } + + public void finishedBroadcasting() { + try { + signalSocket.close(); + } catch (IOException ex) { + System.out.println("I/O error: " + ex.getMessage()); + } + } + + private Socket connectToHost(String ip, int port) { + Socket socket = null; + try { + socket = new Socket(ip, port); + OutputStream output = socket.getOutputStream(); + DataOutputStream writer = new DataOutputStream(output); + + ByteBuffer bb = ByteBuffer.allocate(8); + bb.order(ByteOrder.BIG_ENDIAN); + bb.putLong(pid); + + writer.write(bb.array(), 0, 8); + } catch (IOException ex) { + System.out.println("I/O error: " + ex.getMessage()); + } + + return socket; + } +} diff --git a/257844/src/main/java/cs451/parser/Host.java b/257844/src/main/java/cs451/parser/Host.java new file mode 100644 index 0000000..03a1401 --- /dev/null +++ b/257844/src/main/java/cs451/parser/Host.java @@ -0,0 +1,78 @@ +package cs451.parser; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; + +public class Host { + + private static final String IP_START_REGEX = "/"; + + private int id = -1; + private String ip = null; + private int port = -1; + private SocketAddress addr = null; + + public Host(InetSocketAddress addr, Integer id){ + this.addr = addr; + this.ip = addr.getHostString(); + this.port = addr.getPort(); + this.id = id; + } + + public Host(){} + + public Host(Integer id){ + this.id = id; + } + + public boolean populate(String idString, String ipString, String portString) { + try { + id = Integer.parseInt(idString); + + String ipTest = InetAddress.getByName(ipString).toString(); + if (ipTest.startsWith(IP_START_REGEX)) { + ip = ipTest.substring(1); + } else { + ip = InetAddress.getByName(ipTest.split(IP_START_REGEX)[0]).getHostAddress(); + } + + port = Integer.parseInt(portString); + if (port <= 0) { + System.err.println("Port in the hosts file must be a positive number!"); + return false; + } + } catch (NumberFormatException e) { + if (port == -1) { + System.err.println("Id in the hosts file must be a number!"); + } else { + System.err.println("Port in the hosts file must be a number!"); + } + return false; + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + this.addr = new InetSocketAddress(ip, port); + + return true; + } + + public int getId() { + return id; + } + + public String getIp() { + return ip; + } + + public int getPort() { + return port; + } + + public SocketAddress getAddr() { + return addr; + } + +} diff --git a/257844/src/main/java/cs451/parser/HostsParser.java b/257844/src/main/java/cs451/parser/HostsParser.java new file mode 100644 index 0000000..76f3ce7 --- /dev/null +++ b/257844/src/main/java/cs451/parser/HostsParser.java @@ -0,0 +1,87 @@ +package cs451.parser; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +public class HostsParser { + + private static final String HOSTS_KEY = "--hosts"; + private static final String SPACES_REGEX = "\\s+"; + + private String filename; + private final List hosts = new ArrayList<>(); + + public boolean populate(String key, String filename) { + if (!key.equals(HOSTS_KEY)) { + return false; + } + + this.filename = filename; + try (BufferedReader br = new BufferedReader(new FileReader(filename))) { + int lineNum = 1; + for (String line; (line = br.readLine()) != null; lineNum++) { + if (line.isBlank()) { + continue; + } + + String[] splits = line.split(SPACES_REGEX); + if (splits.length != 3) { + System.err.println("Problem with the line " + lineNum + " in the hosts file!"); + return false; + } + + Host newHost = new Host(); + if (!newHost.populate(splits[0], splits[1], splits[2])) { + return false; + } + + hosts.add(newHost); + } + } catch (IOException e) { + System.err.println("Problem with the hosts file!"); + return false; + } + + if (!checkIdRange()) { + System.err.println("Hosts ids are not within the range!"); + return false; + } + + // sort by id + hosts.sort(new HostsComparator()); + return true; + } + + private boolean checkIdRange() { + int num = hosts.size(); + for (Host host : hosts) { + if (host.getId() < 1 || host.getId() > num) { + System.err.println("Id of a host is not in the right range!"); + return false; + } + } + + return true; + } + + public boolean inRange(int id) { + return id <= hosts.size(); + } + + public List getHosts() { + return hosts; + } + + static class HostsComparator implements Comparator { + + public int compare(Host a, Host b) { + return a.getId() - b.getId(); + } + + } + +} diff --git a/257844/src/main/java/cs451/parser/IdParser.java b/257844/src/main/java/cs451/parser/IdParser.java new file mode 100644 index 0000000..47bca2f --- /dev/null +++ b/257844/src/main/java/cs451/parser/IdParser.java @@ -0,0 +1,31 @@ +package cs451.parser; + +public class IdParser { + + private static final String ID_KEY = "--id"; + + private int id; + + public boolean populate(String key, String value) { + if (!key.equals(ID_KEY)) { + return false; + } + + try { + id = Integer.parseInt(value); + if (id <= 0) { + System.err.println("Id must be a positive number!"); + } + } catch (NumberFormatException e) { + System.err.println("Id must be a number!"); + return false; + } + + return true; + } + + public int getId() { + return id; + } + +} diff --git a/257844/src/main/java/cs451/parser/OutputParser.java b/257844/src/main/java/cs451/parser/OutputParser.java new file mode 100644 index 0000000..9324dd9 --- /dev/null +++ b/257844/src/main/java/cs451/parser/OutputParser.java @@ -0,0 +1,25 @@ +package cs451.parser; + +import java.io.File; + +public class OutputParser { + + private static final String OUTPUT_KEY = "--output"; + + private String path; + + public boolean populate(String key, String value) { + if (!key.equals(OUTPUT_KEY)) { + return false; + } + + File file = new File(value); + path = file.getPath(); + return true; + } + + public String getPath() { + return path; + } + +} diff --git a/257844/src/main/java/cs451/parser/Parser.java b/257844/src/main/java/cs451/parser/Parser.java new file mode 100644 index 0000000..3b6552a --- /dev/null +++ b/257844/src/main/java/cs451/parser/Parser.java @@ -0,0 +1,111 @@ +package cs451.parser; + +import java.util.List; + +public class Parser { + + private final String[] args; + private long pid; + private IdParser idParser; + private HostsParser hostsParser; + private BarrierParser barrierParser; + private SignalParser signalParser; + private OutputParser outputParser; + private ConfigParser configParser; + + public Parser(String[] args) { + this.args = args; + } + + public void parse() { + pid = ProcessHandle.current().pid(); + + idParser = new IdParser(); + hostsParser = new HostsParser(); + barrierParser = new BarrierParser(); + signalParser = new SignalParser(); + outputParser = new OutputParser(); + configParser = null; + + int argsNum = args.length; + if (argsNum != Constants.ARG_LIMIT_NO_CONFIG && argsNum != Constants.ARG_LIMIT_CONFIG) { + help(); + } + + if (!idParser.populate(args[Constants.ID_KEY], args[Constants.ID_VALUE])) { + help(); + } + + if (!hostsParser.populate(args[Constants.HOSTS_KEY], args[Constants.HOSTS_VALUE])) { + help(); + } + + if (!hostsParser.inRange(idParser.getId())) { + help(); + } + + if (!barrierParser.populate(args[Constants.BARRIER_KEY], args[Constants.BARRIER_VALUE])) { + help(); + } + + if (!signalParser.populate(args[Constants.SIGNAL_KEY], args[Constants.SIGNAL_VALUE])) { + help(); + } + + if (!outputParser.populate(args[Constants.OUTPUT_KEY], args[Constants.OUTPUT_VALUE])) { + help(); + } + + if (argsNum == Constants.ARG_LIMIT_CONFIG) { + configParser = new ConfigParser(); + if (!configParser.populate(args[Constants.CONFIG_VALUE])) { + } + } + } + + private void help() { + System.err.println("Usage: ./run.sh --id ID --hosts HOSTS --barrier NAME:PORT --signal NAME:PORT --output OUTPUT [config]"); + System.exit(1); + } + + public int myId() { + return idParser.getId(); + } + + public List hosts() { + return hostsParser.getHosts(); + } + + public String barrierIp() { + return barrierParser.getIp(); + } + + public int barrierPort() { + return barrierParser.getPort(); + } + + public String signalIp() { + return signalParser.getIp(); + } + + public int signalPort() { + return signalParser.getPort(); + } + + public String output() { + return outputParser.getPath(); + } + + public boolean hasConfig() { + return configParser != null; + } + + public String config() { + return configParser.getPath(); + } + + public int messageCount(){ + return configParser.getMessages(); + } + +} diff --git a/257844/src/main/java/cs451/parser/SignalParser.java b/257844/src/main/java/cs451/parser/SignalParser.java new file mode 100644 index 0000000..f6e937f --- /dev/null +++ b/257844/src/main/java/cs451/parser/SignalParser.java @@ -0,0 +1,54 @@ +package cs451.parser; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class SignalParser { + + private static final String SIGNAL_KEY = "--signal"; + + private static final int SIGNAL_ARGS_NUM = 2; + private static final String COLON_REGEX = ":"; + private static final String IP_START_REGEX = "/"; + + private static String ip; + private static int port; + + public boolean populate(String key, String value) { + if (!key.equals(SIGNAL_KEY)) { + return false; + } + + String[] signal = value.split(COLON_REGEX); + if (signal.length != SIGNAL_ARGS_NUM) { + return false; + } + + try { + String ipTest = InetAddress.getByName(signal[0]).toString(); + if (ipTest.startsWith(IP_START_REGEX)) { + ip = ipTest.substring(1); + } else { + ip = InetAddress.getByName(ipTest.split(IP_START_REGEX)[0]).getHostAddress(); + } + + port = Integer.parseInt(signal[1]); + if (port <= 0) { + System.err.println("Signal port must be a positive number!"); + return false; + } + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + return true; + } + + public String getIp() { + return ip; + } + + public int getPort() { + return port; + } +} diff --git a/257844/src/main/java/cs451/tools/Logger.java b/257844/src/main/java/cs451/tools/Logger.java new file mode 100644 index 0000000..cb8c202 --- /dev/null +++ b/257844/src/main/java/cs451/tools/Logger.java @@ -0,0 +1,42 @@ +package cs451.tools; + +public abstract class Logger { + + public enum LOG_LEVEL{ + ERR(-1), + INFO(0), + DEBUG(1), + ALL(99); + + public final Integer v; + LOG_LEVEL(Integer v){ + this.v = v; + } + public boolean isEnabled(){ + return this.v <= LVL.v; + } + } + + private static final LOG_LEVEL LVL = LOG_LEVEL.ALL; + + public static void info(String msg){ + if(LOG_LEVEL.INFO.isEnabled()){ + System.out.println(msg); + } + } + + public static void debug(String msg){ + if(LOG_LEVEL.DEBUG.isEnabled()){ + System.err.println(msg); + } + } + + public static void error(String msg){ + if(LOG_LEVEL.ERR.isEnabled()){ + System.err.println(msg); + } + } + + + +} diff --git a/257844/src/main/java/cs451/tools/Pair.java b/257844/src/main/java/cs451/tools/Pair.java new file mode 100644 index 0000000..2f28ee1 --- /dev/null +++ b/257844/src/main/java/cs451/tools/Pair.java @@ -0,0 +1,58 @@ +package cs451.tools; + +import java.util.ArrayList; +import java.util.List; + +public class Pair implements Comparable { + private final S x; + private final T y; + private final List vl = new ArrayList<>(); + + public Pair(S x, T y) { + this.x = x; + this.y = y; + vl.add(x.toString()); + vl.add(y.toString()); + } + + public S first(){ + return this.x; + } + + public T second(){ + return this.y; + } + + + @Override + public final String toString() { + return this.vl.toString(); + } + + @Override + public int hashCode() { + return this.vl.hashCode(); + } + + @Override + public final boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final Pair other = (Pair) obj; + return this.vl.equals(other.vl); + } + + @Override + public int compareTo(Object o) { + Pair co = (Pair)o; + + return ((int)co.second())- ((int)second()); + } +} diff --git a/bnr.sh b/bnr.sh new file mode 100755 index 0000000..efddb13 --- /dev/null +++ b/bnr.sh @@ -0,0 +1,6 @@ +#!/bin/sh + +export _JAVA_OPTIONS="-Xmx16G" +./257844/build.sh +sudo echo 1 +echo "\n\n\n" | ./validate.py -r 257844/run.sh -b fifo -l 257844/bin/logs -p 10 -m 200