From a6cc233ca37827ac07ad6d9688f508dbec9013f0 Mon Sep 17 00:00:00 2001 From: Athanasios Xygkis Date: Mon, 21 Sep 2020 12:47:54 +0200 Subject: [PATCH] Add signal parameter to java --- .../src/main/java/cs451/BarrierParser.java | 18 ----- .../src/main/java/cs451/Constants.java | 14 ++-- .../src/main/java/cs451/Coordinator.java | 75 +++++++++++++++++++ template_java/src/main/java/cs451/Main.java | 19 ++++- template_java/src/main/java/cs451/Parser.java | 16 +++- .../src/main/java/cs451/SignalParser.java | 58 ++++++++++++++ 6 files changed, 174 insertions(+), 26 deletions(-) create mode 100644 template_java/src/main/java/cs451/Coordinator.java create mode 100644 template_java/src/main/java/cs451/SignalParser.java diff --git a/template_java/src/main/java/cs451/BarrierParser.java b/template_java/src/main/java/cs451/BarrierParser.java index 9f6245c..4af8ad9 100644 --- a/template_java/src/main/java/cs451/BarrierParser.java +++ b/template_java/src/main/java/cs451/BarrierParser.java @@ -55,22 +55,4 @@ public class BarrierParser { public int getPort() { return port; } - - public static class Barrier { - - public static void waitOnBarrier() { - try (Socket socket = new Socket(ip, port)) { - InputStream input = socket.getInputStream(); - InputStreamReader reader = new InputStreamReader(input); - System.out.println("Accessing barrier..."); - - int character; - while ((character = reader.read()) != -1) {} - } catch (IOException ex) { - System.out.println("I/O error: " + ex.getMessage()); - } - } - - } - } diff --git a/template_java/src/main/java/cs451/Constants.java b/template_java/src/main/java/cs451/Constants.java index bbba87d..94d3c82 100644 --- a/template_java/src/main/java/cs451/Constants.java +++ b/template_java/src/main/java/cs451/Constants.java @@ -2,8 +2,8 @@ package cs451; public class Constants { - public static final int ARG_LIMIT_NO_CONFIG = 8; - public static final int ARG_LIMIT_CONFIG = 9; + public static final int ARG_LIMIT_NO_CONFIG = 10; + public static final int ARG_LIMIT_CONFIG = 11; // indexes for id public static final int ID_KEY = 0; @@ -17,10 +17,14 @@ public class Constants { public static final int BARRIER_KEY = 4; public static final int BARRIER_VALUE = 5; + // indexes for signal + public static final int SIGNAL_KEY = 6; + public static final int SIGNAL_VALUE = 7; + // indexes for output - public static final int OUTPUT_KEY = 6; - public static final int OUTPUT_VALUE = 7; + public static final int OUTPUT_KEY = 8; + public static final int OUTPUT_VALUE = 9; // indexes for config - public static final int CONFIG_VALUE = 8; + public static final int CONFIG_VALUE = 10; } diff --git a/template_java/src/main/java/cs451/Coordinator.java b/template_java/src/main/java/cs451/Coordinator.java new file mode 100644 index 0000000..9a874a9 --- /dev/null +++ b/template_java/src/main/java/cs451/Coordinator.java @@ -0,0 +1,75 @@ +package cs451; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.DataOutputStream; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.net.Socket; +import java.net.UnknownHostException; + +public class Coordinator { + + private int pid; + + private String barrierIp; + private int barrierPort; + + private String signalIp; + private int signalPort; + + private Socket signalSocket = null; + + public Coordinator(int pid, String barrierIp, int barrierPort, String signalIp, int signalPort) { + this.pid = pid; + this.barrierIp = barrierIp; + this.barrierPort = barrierPort; + this.signalIp = signalIp; + this.signalPort = signalPort; + + signalSocket = connectToHost(this.signalIp, this.signalPort); + } + + public void waitOnBarrier() { + try { + Socket socket = connectToHost(barrierIp, barrierPort); + InputStream input = socket.getInputStream(); + InputStreamReader reader = new InputStreamReader(input); + System.out.println("Accessing barrier..."); + int character; + while ((character = reader.read()) != -1) {} + } catch (IOException ex) { + System.out.println("I/O error: " + ex.getMessage()); + } + } + + public void finishedBroadcasting() { + try { + signalSocket.close(); + } catch (IOException ex) { + System.out.println("I/O error: " + ex.getMessage()); + } + } + + private Socket connectToHost(String ip, int port) { + Socket socket = null; + try { + socket = new Socket(ip, port); + OutputStream output = socket.getOutputStream(); + DataOutputStream writer = new DataOutputStream(output); + + ByteBuffer bb = ByteBuffer.allocate(8); + bb.order(ByteOrder.BIG_ENDIAN); + bb.putLong((long) pid); + + writer.write(bb.array(), 0, 8); + } catch (IOException ex) { + System.out.println("I/O error: " + ex.getMessage()); + } + + return socket; + } +} diff --git a/template_java/src/main/java/cs451/Main.java b/template_java/src/main/java/cs451/Main.java index 394c8fa..f5b4708 100644 --- a/template_java/src/main/java/cs451/Main.java +++ b/template_java/src/main/java/cs451/Main.java @@ -24,7 +24,7 @@ public class Main { }); } - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { Parser parser = new Parser(args); parser.parse(); @@ -42,12 +42,27 @@ public class Main { } System.out.println("Barrier: " + parser.barrierIp() + ":" + parser.barrierPort()); + System.out.println("Signal: " + parser.signalIp() + ":" + parser.signalPort()); System.out.println("Output: " + parser.output()); // if config is defined; always check before parser.config() if (parser.hasConfig()) { System.out.println("Config: " + parser.config()); } - BarrierParser.Barrier.waitOnBarrier(); + + Coordinator coordinator = new Coordinator(parser.myId(), parser.barrierIp(), parser.barrierPort(), parser.signalIp(), parser.signalPort()); + + System.out.println("Waiting for all processes for finish initialization"); + coordinator.waitOnBarrier(); + + System.out.println("Broadcasting messages..."); + + System.out.println("Signaling end of broadcasting messages"); + coordinator.finishedBroadcasting(); + + while (true) { + // Sleep for 1 hour + Thread.sleep(60 * 60 * 1000); + } } } diff --git a/template_java/src/main/java/cs451/Parser.java b/template_java/src/main/java/cs451/Parser.java index cff51cf..93c86e7 100644 --- a/template_java/src/main/java/cs451/Parser.java +++ b/template_java/src/main/java/cs451/Parser.java @@ -9,6 +9,7 @@ public class Parser { private IdParser idParser; private HostsParser hostsParser; private BarrierParser barrierParser; + private SignalParser signalParser; private OutputParser outputParser; private ConfigParser configParser; @@ -22,6 +23,7 @@ public class Parser { idParser = new IdParser(); hostsParser = new HostsParser(); barrierParser = new BarrierParser(); + signalParser = new SignalParser(); outputParser = new OutputParser(); configParser = null; @@ -46,6 +48,10 @@ public class Parser { help(); } + if (!signalParser.populate(args[Constants.SIGNAL_KEY], args[Constants.SIGNAL_VALUE])) { + help(); + } + if (!outputParser.populate(args[Constants.OUTPUT_KEY], args[Constants.OUTPUT_VALUE])) { help(); } @@ -58,7 +64,7 @@ public class Parser { } private void help() { - System.err.println("Usage: --id ID --hosts HOSTS --barier NAME:PORT --output OUTPUT [config]"); + System.err.println("Usage: ./run.sh --id ID --hosts HOSTS --barrier NAME:PORT --signal NAME:PORT --output OUTPUT [config]"); System.exit(1); } @@ -78,6 +84,14 @@ public class Parser { return barrierParser.getPort(); } + public String signalIp() { + return signalParser.getIp(); + } + + public int signalPort() { + return signalParser.getPort(); + } + public String output() { return outputParser.getPath(); } diff --git a/template_java/src/main/java/cs451/SignalParser.java b/template_java/src/main/java/cs451/SignalParser.java new file mode 100644 index 0000000..9da96b1 --- /dev/null +++ b/template_java/src/main/java/cs451/SignalParser.java @@ -0,0 +1,58 @@ +package cs451; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; + +public class SignalParser { + + private static final String SIGNAL_KEY = "--signal"; + + private static final int SIGNAL_ARGS_NUM = 2; + private static final String COLON_REGEX = ":"; + private static final String IP_START_REGEX = "/"; + + private static String ip; + private static int port; + + public boolean populate(String key, String value) { + if (!key.equals(SIGNAL_KEY)) { + return false; + } + + String[] signal = value.split(COLON_REGEX); + if (signal.length != SIGNAL_ARGS_NUM) { + return false; + } + + try { + String ipTest = InetAddress.getByName(signal[0]).toString(); + if (ipTest.startsWith(IP_START_REGEX)) { + ip = ipTest.substring(1); + } else { + ip = InetAddress.getByName(ipTest.split(IP_START_REGEX)[0]).getHostAddress(); + } + + port = Integer.parseInt(signal[1]); + if (port <= 0) { + System.err.println("Signal port must be a positive number!"); + return false; + } + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + return true; + } + + public String getIp() { + return ip; + } + + public int getPort() { + return port; + } +}