1
0

Add signal parameter to java

This commit is contained in:
Athanasios Xygkis 2020-09-21 12:47:54 +02:00
parent 8f315b4b09
commit a6cc233ca3
6 changed files with 174 additions and 26 deletions

View File

@ -55,22 +55,4 @@ public class BarrierParser {
public int getPort() { public int getPort() {
return port; return port;
} }
public static class Barrier {
public static void waitOnBarrier() {
try (Socket socket = new Socket(ip, port)) {
InputStream input = socket.getInputStream();
InputStreamReader reader = new InputStreamReader(input);
System.out.println("Accessing barrier...");
int character;
while ((character = reader.read()) != -1) {}
} catch (IOException ex) {
System.out.println("I/O error: " + ex.getMessage());
}
}
}
} }

View File

@ -2,8 +2,8 @@ package cs451;
public class Constants { public class Constants {
public static final int ARG_LIMIT_NO_CONFIG = 8; public static final int ARG_LIMIT_NO_CONFIG = 10;
public static final int ARG_LIMIT_CONFIG = 9; public static final int ARG_LIMIT_CONFIG = 11;
// indexes for id // indexes for id
public static final int ID_KEY = 0; public static final int ID_KEY = 0;
@ -17,10 +17,14 @@ public class Constants {
public static final int BARRIER_KEY = 4; public static final int BARRIER_KEY = 4;
public static final int BARRIER_VALUE = 5; public static final int BARRIER_VALUE = 5;
// indexes for signal
public static final int SIGNAL_KEY = 6;
public static final int SIGNAL_VALUE = 7;
// indexes for output // indexes for output
public static final int OUTPUT_KEY = 6; public static final int OUTPUT_KEY = 8;
public static final int OUTPUT_VALUE = 7; public static final int OUTPUT_VALUE = 9;
// indexes for config // indexes for config
public static final int CONFIG_VALUE = 8; public static final int CONFIG_VALUE = 10;
} }

View File

@ -0,0 +1,75 @@
package cs451;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.DataOutputStream;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.net.Socket;
import java.net.UnknownHostException;
public class Coordinator {
private int pid;
private String barrierIp;
private int barrierPort;
private String signalIp;
private int signalPort;
private Socket signalSocket = null;
public Coordinator(int pid, String barrierIp, int barrierPort, String signalIp, int signalPort) {
this.pid = pid;
this.barrierIp = barrierIp;
this.barrierPort = barrierPort;
this.signalIp = signalIp;
this.signalPort = signalPort;
signalSocket = connectToHost(this.signalIp, this.signalPort);
}
public void waitOnBarrier() {
try {
Socket socket = connectToHost(barrierIp, barrierPort);
InputStream input = socket.getInputStream();
InputStreamReader reader = new InputStreamReader(input);
System.out.println("Accessing barrier...");
int character;
while ((character = reader.read()) != -1) {}
} catch (IOException ex) {
System.out.println("I/O error: " + ex.getMessage());
}
}
public void finishedBroadcasting() {
try {
signalSocket.close();
} catch (IOException ex) {
System.out.println("I/O error: " + ex.getMessage());
}
}
private Socket connectToHost(String ip, int port) {
Socket socket = null;
try {
socket = new Socket(ip, port);
OutputStream output = socket.getOutputStream();
DataOutputStream writer = new DataOutputStream(output);
ByteBuffer bb = ByteBuffer.allocate(8);
bb.order(ByteOrder.BIG_ENDIAN);
bb.putLong((long) pid);
writer.write(bb.array(), 0, 8);
} catch (IOException ex) {
System.out.println("I/O error: " + ex.getMessage());
}
return socket;
}
}

View File

@ -24,7 +24,7 @@ public class Main {
}); });
} }
public static void main(String[] args) { public static void main(String[] args) throws InterruptedException {
Parser parser = new Parser(args); Parser parser = new Parser(args);
parser.parse(); parser.parse();
@ -42,12 +42,27 @@ public class Main {
} }
System.out.println("Barrier: " + parser.barrierIp() + ":" + parser.barrierPort()); System.out.println("Barrier: " + parser.barrierIp() + ":" + parser.barrierPort());
System.out.println("Signal: " + parser.signalIp() + ":" + parser.signalPort());
System.out.println("Output: " + parser.output()); System.out.println("Output: " + parser.output());
// if config is defined; always check before parser.config() // if config is defined; always check before parser.config()
if (parser.hasConfig()) { if (parser.hasConfig()) {
System.out.println("Config: " + parser.config()); System.out.println("Config: " + parser.config());
} }
BarrierParser.Barrier.waitOnBarrier();
Coordinator coordinator = new Coordinator(parser.myId(), parser.barrierIp(), parser.barrierPort(), parser.signalIp(), parser.signalPort());
System.out.println("Waiting for all processes for finish initialization");
coordinator.waitOnBarrier();
System.out.println("Broadcasting messages...");
System.out.println("Signaling end of broadcasting messages");
coordinator.finishedBroadcasting();
while (true) {
// Sleep for 1 hour
Thread.sleep(60 * 60 * 1000);
}
} }
} }

View File

@ -9,6 +9,7 @@ public class Parser {
private IdParser idParser; private IdParser idParser;
private HostsParser hostsParser; private HostsParser hostsParser;
private BarrierParser barrierParser; private BarrierParser barrierParser;
private SignalParser signalParser;
private OutputParser outputParser; private OutputParser outputParser;
private ConfigParser configParser; private ConfigParser configParser;
@ -22,6 +23,7 @@ public class Parser {
idParser = new IdParser(); idParser = new IdParser();
hostsParser = new HostsParser(); hostsParser = new HostsParser();
barrierParser = new BarrierParser(); barrierParser = new BarrierParser();
signalParser = new SignalParser();
outputParser = new OutputParser(); outputParser = new OutputParser();
configParser = null; configParser = null;
@ -46,6 +48,10 @@ public class Parser {
help(); help();
} }
if (!signalParser.populate(args[Constants.SIGNAL_KEY], args[Constants.SIGNAL_VALUE])) {
help();
}
if (!outputParser.populate(args[Constants.OUTPUT_KEY], args[Constants.OUTPUT_VALUE])) { if (!outputParser.populate(args[Constants.OUTPUT_KEY], args[Constants.OUTPUT_VALUE])) {
help(); help();
} }
@ -58,7 +64,7 @@ public class Parser {
} }
private void help() { private void help() {
System.err.println("Usage: --id ID --hosts HOSTS --barier NAME:PORT --output OUTPUT [config]"); System.err.println("Usage: ./run.sh --id ID --hosts HOSTS --barrier NAME:PORT --signal NAME:PORT --output OUTPUT [config]");
System.exit(1); System.exit(1);
} }
@ -78,6 +84,14 @@ public class Parser {
return barrierParser.getPort(); return barrierParser.getPort();
} }
public String signalIp() {
return signalParser.getIp();
}
public int signalPort() {
return signalParser.getPort();
}
public String output() { public String output() {
return outputParser.getPath(); return outputParser.getPath();
} }

View File

@ -0,0 +1,58 @@
package cs451;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
public class SignalParser {
private static final String SIGNAL_KEY = "--signal";
private static final int SIGNAL_ARGS_NUM = 2;
private static final String COLON_REGEX = ":";
private static final String IP_START_REGEX = "/";
private static String ip;
private static int port;
public boolean populate(String key, String value) {
if (!key.equals(SIGNAL_KEY)) {
return false;
}
String[] signal = value.split(COLON_REGEX);
if (signal.length != SIGNAL_ARGS_NUM) {
return false;
}
try {
String ipTest = InetAddress.getByName(signal[0]).toString();
if (ipTest.startsWith(IP_START_REGEX)) {
ip = ipTest.substring(1);
} else {
ip = InetAddress.getByName(ipTest.split(IP_START_REGEX)[0]).getHostAddress();
}
port = Integer.parseInt(signal[1]);
if (port <= 0) {
System.err.println("Signal port must be a positive number!");
return false;
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
return true;
}
public String getIp() {
return ip;
}
public int getPort() {
return port;
}
}