1
0
This commit is contained in:
choelzl 2020-11-01 20:56:41 +01:00
parent 9e4e8190ac
commit 88e8903832
Signed by: sora
GPG Key ID: A362EA0491E2EEA0
33 changed files with 0 additions and 1547 deletions

View File

@ -1,82 +0,0 @@
# Created by https://www.toptal.com/developers/gitignore/api/c,c++
# Edit at https://www.toptal.com/developers/gitignore?templates=c,c++
#
bin/da_proc
target/
### C ###
# Prerequisites
*.d
# Object files
*.o
*.ko
*.obj
*.elf
# Linker output
*.ilk
*.map
*.exp
# Precompiled Headers
*.gch
*.pch
# Libraries
*.lib
*.a
*.la
*.lo
# Shared objects (inc. Windows DLLs)
*.dll
*.so
*.so.*
*.dylib
# Executables
*.exe
*.out
*.app
*.i*86
*.x86_64
*.hex
# Debug files
*.dSYM/
*.su
*.idb
*.pdb
# Kernel Module Compile Results
*.mod*
*.cmd
.tmp_versions/
modules.order
Module.symvers
Mkfile.old
dkms.conf
### C++ ###
# Prerequisites
# Compiled Object files
*.slo
# Precompiled Headers
# Compiled Dynamic libraries
# Fortran module files
*.mod
*.smod
# Compiled Static libraries
*.lai
# Executables
# End of https://www.toptal.com/developers/gitignore/api/c,c++

View File

@ -1,72 +0,0 @@
cmake_minimum_required(VERSION 3.9)
project(da_project)
set(CMAKE_C_STANDARD 11)
set(CMAKE_C_STANDARD_REQUIRED ON)
set(CMAKE_C_EXTENSIONS OFF)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
string(CONCAT CMAKE_CXX_FLAGS_COMMON_STR ""
"-Werror -Wall -Wconversion -Wfloat-equal "
"-Wpedantic -Wpointer-arith -Wswitch-default "
"-Wpacked -Wextra -Winvalid-pch "
"-Wmissing-field-initializers "
"-Wunreachable-code -Wcast-align -Wcast-qual "
"-Wdisabled-optimization -Wformat=2 "
"-Wformat-nonliteral -Wuninitialized "
"-Wformat-security -Wformat-y2k -Winit-self "
"-Wmissing-declarations -Wmissing-include-dirs "
"-Wredundant-decls -Wstrict-overflow=5 -Wundef "
"-Wno-unused -Wctor-dtor-privacy -Wsign-promo "
"-Woverloaded-virtual -Wold-style-cast")
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
string(CONCAT CMAKE_CXX_FLAGS_STR "${CMAKE_CXX_FLAGS_COMMON_STR} "
"-Wlogical-op -Wstrict-null-sentinel -Wnoexcept")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS_STR}")
elseif (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
string(CONCAT CMAKE_CXX_FLAGS_STR "${CMAKE_CXX_FLAGS_COMMON_STR} ")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS_STR}")
endif()
string(CONCAT CMAKE_C_FLAGS_COMMON_STR ""
"-Werror -Wall -Wconversion -Wfloat-equal "
"-Wpedantic -Wpointer-arith -Wswitch-default "
"-Wpacked -Wextra -Winvalid-pch "
"-Wmissing-field-initializers -Wunreachable-code "
"-Wcast-align -Wcast-qual -Wdisabled-optimization "
"-Wformat=2 -Wformat-nonliteral -Wuninitialized "
"-Wformat-security -Wformat-y2k -Winit-self "
"-Wmissing-declarations -Wmissing-include-dirs "
"-Wredundant-decls -Wstrict-overflow=5 "
"-Wundef -Wno-unused")
if (CMAKE_C_COMPILER_ID STREQUAL "GNU")
string(CONCAT CMAKE_C_FLAGS_STR "${CMAKE_C_FLAGS_COMMON_STR} "
"-Wlogical-op")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS_STR}")
elseif (CMAKE_C_COMPILER_ID MATCHES "Clang")
string(CONCAT CMAKE_C_FLAGS_STR "${CMAKE_C_FLAGS_COMMON_STR} ")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS_STR}")
endif()
set(CMAKE_C_FLAGS_DEBUG "-Winline -g")
set(CMAKE_C_FLAGS_RELEASE "-O3 -DNDEBUG")
set(CMAKE_C_FLAGS_RELWITHDEBINFO "-O2 -g -DNDEBUG")
set(CMAKE_CXX_FLAGS_MINSIZEREL "-Os -DNDEBUG")
set(CMAKE_CXX_FLAGS_DEBUG "-Winline -g")
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG")
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g -DNDEBUG")
set(CMAKE_CXX_FLAGS_MINSIZEREL "-Os -DNDEBUG")
# MESSAGE( STATUS "CMAKE_C_FLAGS: " ${CMAKE_C_FLAGS} )
# MESSAGE( STATUS "CMAKE_CXX_FLAGS: " ${CMAKE_CXX_FLAGS} )
# MESSAGE( STATUS "CMAKE_BUILD_TYPE: " ${CMAKE_BUILD_TYPE} )
add_subdirectory(src)

View File

@ -1 +0,0 @@
This is a reserved directory name! Store the binary generated by `build.sh` in this directory

View File

@ -1 +0,0 @@
This is a reserved directory name, do not delete or use in your application!

View File

@ -1 +0,0 @@
This is a reserved directory name, do not delete or use in your application!

View File

@ -1,13 +0,0 @@
#!/bin/bash
set -e
# Change the current working directory to the location of the present file
cd "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
rm -rf target
mkdir target
cd target
cmake -DCMAKE_BUILD_TYPE=Release ..
cmake --build .
mv src/da_proc ../bin

View File

@ -1,7 +0,0 @@
#!/bin/bash
# Change the current working directory to the location of the present file
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
rm -f "$DIR"/bin/da_proc
rm -rf "$DIR"/target

View File

@ -1,9 +0,0 @@
#!/bin/bash
# Change the current working directory to the location of the present file
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
ret=0
exec 3>&1; $("$DIR"/bin/da_proc "$@" >&3); ret=$?; exec 3>&-
exit $ret

View File

@ -1,9 +0,0 @@
# DO NAME THE SYMBOLIC VARIABLE `SOURCES`
include_directories(include)
set(SOURCES src/main.cpp src/hello.c)
# DO NOT EDIT THE FOLLOWING LINE
find_package(Threads)
add_executable(da_proc ${SOURCES})
target_link_libraries(da_proc ${CMAKE_THREAD_LIBS_INIT})

View File

@ -1,90 +0,0 @@
#pragma once
#include "parser.hpp"
#include <endian.h>
#include <algorithm>
class Coordinator {
public:
Coordinator(unsigned long id, Parser::Host barrier, Parser::Host signal)
: id_{id}, barrier_{barrier}, signal_{signal}
{
signalFd_ = connectToHost(signal_, "signal");
}
void waitOnBarrier() {
int fd = connectToHost(barrier_, "barrier");
char dummy;
if (recv(fd, &dummy, sizeof(dummy), 0) < 0) {
throw std::runtime_error("Could not read from the barrier socket: " +
std::string(std::strerror(errno)));
}
close(fd);
}
void finishedBroadcasting() {
close(signalFd_);
}
private:
int connectToHost(Parser::Host const &host, std::string const &reason) {
struct sockaddr_in server;
std::memset(&server, 0, sizeof(server));
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
throw std::runtime_error("Could not create the " + reason + " socket: " +
std::string(std::strerror(errno)));
}
server.sin_family = AF_INET;
server.sin_addr.s_addr = host.ip;
server.sin_port = host.port;
if (connect(fd, reinterpret_cast<struct sockaddr *>(&server),
sizeof(server)) < 0) {
throw std::runtime_error("Could not connect to the " + reason + ": " +
std::string(std::strerror(errno)));
}
auto id = htonT(static_cast<uint64_t>(id_));
if (writeWithRetry(fd, reinterpret_cast<uint8_t *>(&id), sizeof(id))) {
throw std::runtime_error("Could not send my LogicalPID to the " + reason + ": " +
std::string(std::strerror(errno)));
}
return fd;
}
// From https://stackoverflow.com/questions/32683086/handling-incomplete-write-calls
ssize_t writeWithRetry (int fd, uint8_t const* buf, size_t size) {
ssize_t ret;
while (size > 0) {
do
{
ret = write(fd, buf, size);
} while ((ret < 0) && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK));
if (ret < 0)
return ret;
size -= ret;
buf += ret;
}
return 0;
}
template <typename T>
constexpr T htonT (T value) noexcept
{
#if __BYTE_ORDER == __LITTLE_ENDIAN
char* ptr = reinterpret_cast<char*>(&value);
std::reverse(ptr, ptr + sizeof(T));
#endif
return value;
}
private:
unsigned long id_;
Parser::Host barrier_;
Parser::Host signal_;
int signalFd_;
};

View File

@ -1,13 +0,0 @@
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
#include <stdio.h>
void hello();
#ifdef __cplusplus
}
#endif

View File

@ -1,390 +0,0 @@
#pragma once
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
#include <vector>
#include <algorithm>
#include <cctype>
#include <locale>
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
class Parser {
public:
struct Host {
Host() {}
Host(size_t id, std::string &ip_or_hostname, unsigned short port)
: id{id}, port{htons(port)} {
if (isValidIpAddress(ip_or_hostname.c_str())) {
ip = inet_addr(ip_or_hostname.c_str());
} else {
ip = ipLookup(ip_or_hostname.c_str());
}
}
std::string ipReadable() const {
in_addr tmp_ip;
tmp_ip.s_addr = ip;
return std::string(inet_ntoa(tmp_ip));
}
unsigned short portReadable() const { return ntohs(port); }
unsigned long id;
in_addr_t ip;
unsigned short port;
private:
bool isValidIpAddress(const char *ipAddress) {
struct sockaddr_in sa;
int result = inet_pton(AF_INET, ipAddress, &(sa.sin_addr));
return result != 0;
}
in_addr_t ipLookup(const char *host) {
struct addrinfo hints, *res;
char addrstr[128];
void *ptr;
memset(&hints, 0, sizeof(hints));
hints.ai_family = PF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags |= AI_CANONNAME;
if (getaddrinfo(host, NULL, &hints, &res) != 0) {
throw std::runtime_error(
"Could not resolve host `" + std::string(host) +
"` to IP: " + std::string(std::strerror(errno)));
}
while (res) {
inet_ntop(res->ai_family, res->ai_addr->sa_data, addrstr, 128);
switch (res->ai_family) {
case AF_INET:
ptr =
&(reinterpret_cast<struct sockaddr_in *>(res->ai_addr))->sin_addr;
inet_ntop(res->ai_family, ptr, addrstr, 128);
return inet_addr(addrstr);
break;
// case AF_INET6:
// ptr = &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr;
// break;
default:
break;
}
res = res->ai_next;
}
throw std::runtime_error("No host resolves to IPv4");
}
};
public:
Parser(const int argc, char const *const *argv, bool withConfig)
: argc{argc}, argv{argv}, withConfig{withConfig}, parsed{false} {}
void parse() {
if (!parseInternal()) {
help(argc, argv);
}
parsed = true;
}
unsigned long id() const {
checkParsed();
return id_;
}
const char *hostsPath() const {
checkParsed();
return hostsPath_.c_str();
}
Host barrier() const {
checkParsed();
return barrier_;
}
Host signal() const {
checkParsed();
return signal_;
}
const char *outputPath() const {
checkParsed();
return outputPath_.c_str();
}
const char *configPath() const {
checkParsed();
if (!withConfig) {
throw std::runtime_error("Parser is configure to ignore the config path");
}
return configPath_.c_str();
}
std::vector<Host> hosts() {
std::ifstream hostsFile(hostsPath());
std::vector<Host> hosts;
if (!hostsFile.is_open()) {
std::ostringstream os;
os << "`" << hostsPath() << "` does not exist.";
throw std::invalid_argument(os.str());
}
std::string line;
int lineNum = 0;
while (std::getline(hostsFile, line)) {
lineNum += 1;
std::istringstream iss(line);
trim(line);
if (line.empty()) {
continue;
}
unsigned long id;
std::string ip;
unsigned short port;
if (!(iss >> id >> ip >> port)) {
std::ostringstream os;
os << "Parsing for `" << hostsPath() << "` failed at line " << lineNum;
throw std::invalid_argument(os.str());
}
hosts.push_back(Host(id, ip, port));
}
if (hosts.size() < 2UL) {
std::ostringstream os;
os << "`" << hostsPath() << "` must contain at least two hosts";
throw std::invalid_argument(os.str());
}
auto comp = [](const Host &x, const Host &y) { return x.id < y.id; };
auto result = std::minmax_element(hosts.begin(), hosts.end(), comp);
size_t minID = (*result.first).id;
size_t maxID = (*result.second).id;
if (minID != 1UL || maxID != static_cast<unsigned long>(hosts.size())) {
std::ostringstream os;
os << "In `" << hostsPath()
<< "` IDs of processes have to start from 1 and be compact";
throw std::invalid_argument(os.str());
}
std::sort(hosts.begin(), hosts.end(),
[](const Host &a, const Host &b) -> bool { return a.id < b.id; });
return hosts;
}
private:
bool parseInternal() {
if (!parseID()) {
return false;
}
if (!parseHostPath()) {
return false;
}
if (!parseBarrier()) {
return false;
}
if (!parseSignal()) {
return false;
}
if (!parseOutputPath()) {
return false;
}
if (!parseConfigPath()) {
return false;
}
return true;
}
void help(const int, char const *const *argv) {
auto configStr = "CONFIG";
std::cerr << "Usage: " << argv[0]
<< " --id ID --hosts HOSTS --barrier NAME:PORT --signal NAME:PORT --output OUTPUT";
if (!withConfig) {
std::cerr << "\n";
} else {
std::cerr << " CONFIG\n";
}
exit(EXIT_FAILURE);
}
bool parseID() {
if (argc < 3) {
return false;
}
if (std::strcmp(argv[1], "--id") == 0) {
if (isPositiveNumber(argv[2])) {
try {
id_ = std::stoul(argv[2]);
} catch (std::invalid_argument const &e) {
return false;
} catch (std::out_of_range const &e) {
return false;
}
return true;
}
}
return false;
}
bool parseHostPath() {
if (argc < 5) {
return false;
}
if (std::strcmp(argv[3], "--hosts") == 0) {
hostsPath_ = std::string(argv[4]);
return true;
}
return false;
}
bool parseBarrier() {
if (argc < 7) {
return false;
}
if (std::strcmp(argv[5], "--barrier") == 0) {
std::string barrier_addr = argv[6];
std::replace(barrier_addr.begin(), barrier_addr.end(), ':', ' ');
std::stringstream ss(barrier_addr);
std::string barrier_name;
unsigned short barrier_port;
ss >> barrier_name;
ss >> barrier_port;
barrier_ = Host(0, barrier_name, barrier_port);
return true;
}
return false;
}
bool parseSignal() {
if (argc < 9) {
return false;
}
if (std::strcmp(argv[7], "--signal") == 0) {
std::string signal_addr = argv[8];
std::replace(signal_addr.begin(), signal_addr.end(), ':', ' ');
std::stringstream ss(signal_addr);
std::string signal_name;
unsigned short signal_port;
ss >> signal_name;
ss >> signal_port;
signal_ = Host(0, signal_name, signal_port);
return true;
}
return false;
}
bool parseOutputPath() {
if (argc < 11) {
return false;
}
if (std::strcmp(argv[9], "--output") == 0) {
outputPath_ = std::string(argv[10]);
return true;
}
return false;
}
bool parseConfigPath() {
if (!withConfig) {
return true;
}
if (argc < 12) {
return false;
}
configPath_ = std::string(argv[11]);
return true;
}
bool isPositiveNumber(const std::string &s) const {
return !s.empty() && std::find_if(s.begin(), s.end(), [](unsigned char c) {
return !std::isdigit(c);
}) == s.end();
}
void checkParsed() const {
if (!parsed) {
throw std::runtime_error("Invoke parse() first");
}
}
void ltrim(std::string &s) {
s.erase(s.begin(), std::find_if(s.begin(), s.end(),
[](int ch) { return !std::isspace(ch); }));
}
void rtrim(std::string &s) {
s.erase(std::find_if(s.rbegin(), s.rend(),
[](int ch) { return !std::isspace(ch); })
.base(),
s.end());
}
void trim(std::string &s) {
ltrim(s);
rtrim(s);
}
private:
const int argc;
char const *const *argv;
bool withConfig;
bool parsed;
unsigned long id_;
std::string hostsPath_;
Host barrier_;
Host signal_;
std::string outputPath_;
std::string configPath_;
};

View File

@ -1,11 +0,0 @@
#include "hello.h"
void hello() {
printf("Source file that end in .c will be compiled automatically with gcc,\n"
"while files that end in .cpp will be compiled automatically with g++.\n"
"For those that prefer C, they can either use the provided C++\n"
"argument parser and write everything else in C, or delete the\n"
"provided main.cpp and start from scratch.\n"
"Make sure to have compatible ABI when calling C functions from cpp:\n"
"See `hello.h` on how to do this.\n");
}

View File

@ -1,109 +0,0 @@
#include <chrono>
#include <iostream>
#include <thread>
#include "barrier.hpp"
#include "parser.hpp"
#include "hello.h"
#include <signal.h>
static void stop(int) {
// reset signal handlers to default
signal(SIGTERM, SIG_DFL);
signal(SIGINT, SIG_DFL);
// immediately stop network packet processing
std::cout << "Immediately stopping network packet processing.\n";
// write/flush output file if necessary
std::cout << "Writing output.\n";
// exit directly from signal handler
exit(0);
}
int main(int argc, char **argv) {
signal(SIGTERM, stop);
signal(SIGINT, stop);
// `true` means that a config file is required.
// Call with `false` if no config file is necessary.
bool requireConfig = true;
Parser parser(argc, argv, requireConfig);
parser.parse();
hello();
std::cout << std::endl;
std::cout << "My PID: " << getpid() << "\n";
std::cout << "Use `kill -SIGINT " << getpid() << "` or `kill -SIGTERM "
<< getpid() << "` to stop processing packets\n\n";
std::cout << "My ID: " << parser.id() << "\n\n";
std::cout << "Path to hosts:\n";
std::cout << "==============\n";
std::cout << parser.hostsPath() << "\n\n";
std::cout << "List of resolved hosts is:\n";
std::cout << "==========================\n";
auto hosts = parser.hosts();
for (auto &host : hosts) {
std::cout << host.id << "\n";
std::cout << "Human-readable IP: " << host.ipReadable() << "\n";
std::cout << "Machine-readable IP: " << host.ip << "\n";
std::cout << "Human-readbale Port: " << host.portReadable() << "\n";
std::cout << "Machine-readbale Port: " << host.port << "\n";
std::cout << "\n";
}
std::cout << "\n";
std::cout << "Barrier:\n";
std::cout << "========\n";
auto barrier = parser.barrier();
std::cout << "Human-readable IP: " << barrier.ipReadable() << "\n";
std::cout << "Machine-readable IP: " << barrier.ip << "\n";
std::cout << "Human-readbale Port: " << barrier.portReadable() << "\n";
std::cout << "Machine-readbale Port: " << barrier.port << "\n";
std::cout << "\n";
std::cout << "Signal:\n";
std::cout << "========\n";
auto signal = parser.signal();
std::cout << "Human-readable IP: " << signal.ipReadable() << "\n";
std::cout << "Machine-readable IP: " << signal.ip << "\n";
std::cout << "Human-readbale Port: " << signal.portReadable() << "\n";
std::cout << "Machine-readbale Port: " << signal.port << "\n";
std::cout << "\n";
std::cout << "Path to output:\n";
std::cout << "===============\n";
std::cout << parser.outputPath() << "\n\n";
if (requireConfig) {
std::cout << "Path to config:\n";
std::cout << "===============\n";
std::cout << parser.configPath() << "\n\n";
}
std::cout << "Doing some initialization...\n\n";
Coordinator coordinator(parser.id(), barrier, signal);
std::cout << "Waiting for all processes to finish initialization\n\n";
coordinator.waitOnBarrier();
std::cout << "Broadcasting messages...\n\n";
std::cout << "Signaling end of broadcasting messages\n\n";
coordinator.finishedBroadcasting();
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(60));
}
return 0;
}

View File

@ -1,32 +0,0 @@
# Created by https://www.toptal.com/developers/gitignore/api/java
# Edit at https://www.toptal.com/developers/gitignore?templates=java
target/
bin/da_proc.jar
### Java ###
# Compiled class file
*.class
# Log file
*.log
# BlueJ files
*.ctxt
# Mobile Tools for Java (J2ME)
.mtj.tmp/
# Package Files #
*.jar
*.war
*.nar
*.ear
*.zip
*.tar.gz
*.rar
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
# End of https://www.toptal.com/developers/gitignore/api/java

View File

@ -1 +0,0 @@
This is a reserved directory name! Store the binary generated by `build.sh` in this directory

View File

@ -1 +0,0 @@
This is a reserved directory name, do not delete or use in your application!

View File

@ -1 +0,0 @@
This is a reserved directory name, do not delete or use in your application!

View File

@ -1,9 +0,0 @@
#!/bin/bash
set -e
# Change the current working directory to the location of the present file
cd "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
mvn clean compile assembly:single
mv target/da_project-1.0-SNAPSHOT-jar-with-dependencies.jar bin/da_proc.jar

View File

@ -1,7 +0,0 @@
#!/bin/bash
# Change the current working directory to the location of the present file
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
rm -f "$DIR"/bin/da_proc.jar
rm -rf "$DIR"/target

View File

@ -1,64 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cs451</groupId>
<artifactId>da_project</artifactId>
<version>1.0-SNAPSHOT</version>
<name>DA_Project</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.11</maven.compiler.source>
<maven.compiler.target>1.11</maven.compiler.target>
</properties>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<release>11</release>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>cs451.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

View File

@ -1,9 +0,0 @@
#!/bin/bash
# Change the current working directory to the location of the present file
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
ret=0
exec 3>&1; $(java -jar "$DIR"/bin/da_proc.jar "$@" >&3); ret=$?; exec 3>&-
exit $ret

View File

@ -1,58 +0,0 @@
package cs451;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
public class BarrierParser {
private static final String BARRIER_KEY = "--barrier";
private static final int BARRIER_ARGS_NUM = 2;
private static final String COLON_REGEX = ":";
private static final String IP_START_REGEX = "/";
private static String ip;
private static int port;
public boolean populate(String key, String value) {
if (!key.equals(BARRIER_KEY)) {
return false;
}
String[] barrier = value.split(COLON_REGEX);
if (barrier.length != BARRIER_ARGS_NUM) {
return false;
}
try {
String ipTest = InetAddress.getByName(barrier[0]).toString();
if (ipTest.startsWith(IP_START_REGEX)) {
ip = ipTest.substring(1);
} else {
ip = InetAddress.getByName(ipTest.split(IP_START_REGEX)[0]).getHostAddress();
}
port = Integer.parseInt(barrier[1]);
if (port <= 0) {
System.err.println("Barrier port must be a positive number!");
return false;
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
return true;
}
public String getIp() {
return ip;
}
public int getPort() {
return port;
}
}

View File

@ -1,19 +0,0 @@
package cs451;
import java.io.File;
public class ConfigParser {
private String path;
public boolean populate(String value) {
File file = new File(value);
path = file.getPath();
return true;
}
public String getPath() {
return path;
}
}

View File

@ -1,30 +0,0 @@
package cs451;
public class Constants {
public static final int ARG_LIMIT_NO_CONFIG = 10;
public static final int ARG_LIMIT_CONFIG = 11;
// indexes for id
public static final int ID_KEY = 0;
public static final int ID_VALUE = 1;
// indexes for hosts
public static final int HOSTS_KEY = 2;
public static final int HOSTS_VALUE = 3;
// indexes for barrier
public static final int BARRIER_KEY = 4;
public static final int BARRIER_VALUE = 5;
// indexes for signal
public static final int SIGNAL_KEY = 6;
public static final int SIGNAL_VALUE = 7;
// indexes for output
public static final int OUTPUT_KEY = 8;
public static final int OUTPUT_VALUE = 9;
// indexes for config
public static final int CONFIG_VALUE = 10;
}

View File

@ -1,75 +0,0 @@
package cs451;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.DataOutputStream;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.net.Socket;
import java.net.UnknownHostException;
public class Coordinator {
private int pid;
private String barrierIp;
private int barrierPort;
private String signalIp;
private int signalPort;
private Socket signalSocket = null;
public Coordinator(int pid, String barrierIp, int barrierPort, String signalIp, int signalPort) {
this.pid = pid;
this.barrierIp = barrierIp;
this.barrierPort = barrierPort;
this.signalIp = signalIp;
this.signalPort = signalPort;
signalSocket = connectToHost(this.signalIp, this.signalPort);
}
public void waitOnBarrier() {
try {
Socket socket = connectToHost(barrierIp, barrierPort);
InputStream input = socket.getInputStream();
InputStreamReader reader = new InputStreamReader(input);
System.out.println("Accessing barrier...");
int character;
while ((character = reader.read()) != -1) {}
} catch (IOException ex) {
System.out.println("I/O error: " + ex.getMessage());
}
}
public void finishedBroadcasting() {
try {
signalSocket.close();
} catch (IOException ex) {
System.out.println("I/O error: " + ex.getMessage());
}
}
private Socket connectToHost(String ip, int port) {
Socket socket = null;
try {
socket = new Socket(ip, port);
OutputStream output = socket.getOutputStream();
DataOutputStream writer = new DataOutputStream(output);
ByteBuffer bb = ByteBuffer.allocate(8);
bb.order(ByteOrder.BIG_ENDIAN);
bb.putLong((long) pid);
writer.write(bb.array(), 0, 8);
} catch (IOException ex) {
System.out.println("I/O error: " + ex.getMessage());
}
return socket;
}
}

View File

@ -1,56 +0,0 @@
package cs451;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class Host {
private static final String IP_START_REGEX = "/";
private int id;
private String ip;
private int port = -1;
public boolean populate(String idString, String ipString, String portString) {
try {
id = Integer.parseInt(idString);
String ipTest = InetAddress.getByName(ipString).toString();
if (ipTest.startsWith(IP_START_REGEX)) {
ip = ipTest.substring(1);
} else {
ip = InetAddress.getByName(ipTest.split(IP_START_REGEX)[0]).getHostAddress();
}
port = Integer.parseInt(portString);
if (port <= 0) {
System.err.println("Port in the hosts file must be a positive number!");
return false;
}
} catch (NumberFormatException e) {
if (port == -1) {
System.err.println("Id in the hosts file must be a number!");
} else {
System.err.println("Port in the hosts file must be a number!");
}
return false;
} catch (UnknownHostException e) {
e.printStackTrace();
}
return true;
}
public int getId() {
return id;
}
public String getIp() {
return ip;
}
public int getPort() {
return port;
}
}

View File

@ -1,88 +0,0 @@
package cs451;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
public class HostsParser {
private static final String HOSTS_KEY = "--hosts";
private static final String SPACES_REGEX = "\\s+";
private String filename;
private List<Host> hosts = new ArrayList<>();
public boolean populate(String key, String filename) {
if (!key.equals(HOSTS_KEY)) {
return false;
}
this.filename = filename;
try(BufferedReader br = new BufferedReader(new FileReader(filename))) {
int lineNum = 1;
for(String line; (line = br.readLine()) != null; lineNum++) {
if (line.isBlank()) {
continue;
}
String[] splits = line.split(SPACES_REGEX);
if (splits.length != 3) {
System.err.println("Problem with the line " + lineNum + " in the hosts file!");
return false;
}
Host newHost = new Host();
if (!newHost.populate(splits[0], splits[1], splits[2])) {
return false;
}
hosts.add(newHost);
}
} catch (IOException e) {
System.err.println("Problem with the hosts file!");
return false;
}
if (!checkIdRange()) {
System.err.println("Hosts ids are not within the range!");
return false;
}
// sort by id
Collections.sort(hosts, new HostsComparator());
return true;
}
private boolean checkIdRange() {
int num = hosts.size();
for (Host host : hosts) {
if (host.getId() < 1 || host.getId() > num) {
System.err.println("Id of a host is not in the right range!");
return false;
}
}
return true;
}
public boolean inRange(int id) {
return id <= hosts.size();
}
public List<Host> getHosts() {
return hosts;
}
class HostsComparator implements Comparator<Host> {
public int compare(Host a, Host b) {
return a.getId() - b.getId();
}
}
}

View File

@ -1,31 +0,0 @@
package cs451;
public class IdParser {
private static final String ID_KEY = "--id";
private int id;
public boolean populate(String key, String value) {
if (!key.equals(ID_KEY)) {
return false;
}
try {
id = Integer.parseInt(value);
if (id <= 0) {
System.err.println("Id must be a positive number!");
}
} catch (NumberFormatException e) {
System.err.println("Id must be a number!");
return false;
}
return true;
}
public int getId() {
return id;
}
}

View File

@ -1,68 +0,0 @@
package cs451;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
public class Main {
private static void handleSignal() {
//immediately stop network packet processing
System.out.println("Immediately stopping network packet processing.");
//write/flush output file if necessary
System.out.println("Writing output.");
}
private static void initSignalHandlers() {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
handleSignal();
}
});
}
public static void main(String[] args) throws InterruptedException {
Parser parser = new Parser(args);
parser.parse();
initSignalHandlers();
// example
long pid = ProcessHandle.current().pid();
System.out.println("My PID is " + pid + ".");
System.out.println("Use 'kill -SIGINT " + pid + " ' or 'kill -SIGTERM " + pid + " ' to stop processing packets.");
System.out.println("My id is " + parser.myId() + ".");
System.out.println("List of hosts is:");
for (Host host: parser.hosts()) {
System.out.println(host.getId() + ", " + host.getIp() + ", " + host.getPort());
}
System.out.println("Barrier: " + parser.barrierIp() + ":" + parser.barrierPort());
System.out.println("Signal: " + parser.signalIp() + ":" + parser.signalPort());
System.out.println("Output: " + parser.output());
// if config is defined; always check before parser.config()
if (parser.hasConfig()) {
System.out.println("Config: " + parser.config());
}
Coordinator coordinator = new Coordinator(parser.myId(), parser.barrierIp(), parser.barrierPort(), parser.signalIp(), parser.signalPort());
System.out.println("Waiting for all processes for finish initialization");
coordinator.waitOnBarrier();
System.out.println("Broadcasting messages...");
System.out.println("Signaling end of broadcasting messages");
coordinator.finishedBroadcasting();
while (true) {
// Sleep for 1 hour
Thread.sleep(60 * 60 * 1000);
}
}
}

View File

@ -1,25 +0,0 @@
package cs451;
import java.io.File;
public class OutputParser {
private static final String OUTPUT_KEY = "--output";
private String path;
public boolean populate(String key, String value) {
if (!key.equals(OUTPUT_KEY)) {
return false;
}
File file = new File(value);
path = file.getPath();
return true;
}
public String getPath() {
return path;
}
}

View File

@ -1,107 +0,0 @@
package cs451;
import java.util.List;
public class Parser {
private String[] args;
private long pid;
private IdParser idParser;
private HostsParser hostsParser;
private BarrierParser barrierParser;
private SignalParser signalParser;
private OutputParser outputParser;
private ConfigParser configParser;
public Parser(String[] args) {
this.args = args;
}
public void parse() {
pid = ProcessHandle.current().pid();
idParser = new IdParser();
hostsParser = new HostsParser();
barrierParser = new BarrierParser();
signalParser = new SignalParser();
outputParser = new OutputParser();
configParser = null;
int argsNum = args.length;
if (argsNum != Constants.ARG_LIMIT_NO_CONFIG && argsNum != Constants.ARG_LIMIT_CONFIG) {
help();
}
if (!idParser.populate(args[Constants.ID_KEY], args[Constants.ID_VALUE])) {
help();
}
if (!hostsParser.populate(args[Constants.HOSTS_KEY], args[Constants.HOSTS_VALUE])) {
help();
}
if (!hostsParser.inRange(idParser.getId())) {
help();
}
if (!barrierParser.populate(args[Constants.BARRIER_KEY], args[Constants.BARRIER_VALUE])) {
help();
}
if (!signalParser.populate(args[Constants.SIGNAL_KEY], args[Constants.SIGNAL_VALUE])) {
help();
}
if (!outputParser.populate(args[Constants.OUTPUT_KEY], args[Constants.OUTPUT_VALUE])) {
help();
}
if (argsNum == Constants.ARG_LIMIT_CONFIG) {
configParser = new ConfigParser();
if (!configParser.populate(args[Constants.CONFIG_VALUE])) {
}
}
}
private void help() {
System.err.println("Usage: ./run.sh --id ID --hosts HOSTS --barrier NAME:PORT --signal NAME:PORT --output OUTPUT [config]");
System.exit(1);
}
public int myId() {
return idParser.getId();
}
public List<Host> hosts() {
return hostsParser.getHosts();
}
public String barrierIp() {
return barrierParser.getIp();
}
public int barrierPort() {
return barrierParser.getPort();
}
public String signalIp() {
return signalParser.getIp();
}
public int signalPort() {
return signalParser.getPort();
}
public String output() {
return outputParser.getPath();
}
public boolean hasConfig() {
return configParser != null;
}
public String config() {
return configParser.getPath();
}
}

View File

@ -1,58 +0,0 @@
package cs451;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
public class SignalParser {
private static final String SIGNAL_KEY = "--signal";
private static final int SIGNAL_ARGS_NUM = 2;
private static final String COLON_REGEX = ":";
private static final String IP_START_REGEX = "/";
private static String ip;
private static int port;
public boolean populate(String key, String value) {
if (!key.equals(SIGNAL_KEY)) {
return false;
}
String[] signal = value.split(COLON_REGEX);
if (signal.length != SIGNAL_ARGS_NUM) {
return false;
}
try {
String ipTest = InetAddress.getByName(signal[0]).toString();
if (ipTest.startsWith(IP_START_REGEX)) {
ip = ipTest.substring(1);
} else {
ip = InetAddress.getByName(ipTest.split(IP_START_REGEX)[0]).getHostAddress();
}
port = Integer.parseInt(signal[1]);
if (port <= 0) {
System.err.println("Signal port must be a positive number!");
return false;
}
} catch (UnknownHostException e) {
e.printStackTrace();
}
return true;
}
public String getIp() {
return ip;
}
public int getPort() {
return port;
}
}