diff --git a/finishedSignal.py b/finishedSignal.py new file mode 100755 index 0000000..99bbd5b --- /dev/null +++ b/finishedSignal.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 + +import argparse +import socket +import time +import struct +import selectors +from collections import OrderedDict + +class FinishedSignal: + def __init__(self, host, port, waitFor, printer=None): + self.host = host + self.port = port + self.waitFor = waitFor + self.printer = printer + self.connections = dict() + self.endTimes = dict() + self.sel = selectors.DefaultSelector() + + def listen(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.sock.bind((self.host, self.port)) + self.sock.listen(128) + self.sock.setblocking(False) + self.sel.register(self.sock, selectors.EVENT_READ, self.accept) + + def wait(self): + while self.waitFor > 0: + events = self.sel.select() + for key, mask in events: + callback = key.data + callback(key.fileobj, mask) + + for key, pid_timestamp in self.connections.items(): + pid = pid_timestamp[0] + ts = pid_timestamp[1] + self.endTimes[pid] = ts + + def endTimestamps(self): + return self.endTimes + + def accept(self, sock, mask): + conn, addr = sock.accept() + conn.setblocking(False) + self.sel.register(conn, selectors.EVENT_READ, self.read) + + def read(self, conn, mask): + data = conn.recv(8) + if data: + pid = struct.unpack('!Q', bytes(data))[0] + self.connections[conn] = [pid] + + if self.printer: + host, port = conn.getpeername() + addr = '{}:{}'.format(host, port) + self.printer("Connection from {}, corresponds to PID {}".format(addr, pid)) + else: + self.connections[conn].append(int(time.time() * 1000)) + self.sel.unregister(conn) + conn.close() + self.waitFor -= 1 + + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + parser.add_argument( + "--host", + default="0.0.0.0", + dest="host", + help="IP address where the finish signal listens to (default: any)", + ) + + parser.add_argument( + "--port", + default=11000, + type=int, + dest="port", + help="TCP port where the finish signal listens to (default: 11000)", + ) + + parser.add_argument( + "-p", + "--processes", + required=True, + type=int, + dest="processes", + help="Number of processes the finish signal waits for", + ) + + results = parser.parse_args() + + signal = FinishedSignal(results.host, results.port, results.processes, print) + signal.listen() + print("Finish signal listens on {}:{} and waits for {} processes".format(results.host, results.port, results.processes)) + + signal.wait() + + for pid, endTs in OrderedDict(sorted(signal.endTimestamps().items())).items(): + print("Process {} finished broadcasting at time {} ms from Unix epoch ".format(pid, endTs)) \ No newline at end of file diff --git a/template_cpp/src/include/barrier.hpp b/template_cpp/src/include/barrier.hpp index 78a1909..b20ff48 100644 --- a/template_cpp/src/include/barrier.hpp +++ b/template_cpp/src/include/barrier.hpp @@ -1,32 +1,90 @@ #pragma once #include "parser.hpp" -void waitOnBarrier(Parser::Host const &barrier); +#include +#include -void waitOnBarrier(Parser::Host const &barrier) { - struct sockaddr_in server; - std::memset(&server, 0, sizeof(server)); - - int fd = socket(AF_INET, SOCK_STREAM, 0); - if (fd < 0) { - throw std::runtime_error("Could not create the barrier socket: " + - std::string(std::strerror(errno))); +class Coordinator { +public: + Coordinator(unsigned long id, Parser::Host barrier, Parser::Host signal) + : id_{id}, barrier_{barrier}, signal_{signal} + { + signalFd_ = connectToHost(signal_, "signal"); } - server.sin_family = AF_INET; - server.sin_addr.s_addr = barrier.ip; - server.sin_port = barrier.port; - if (connect(fd, reinterpret_cast(&server), - sizeof(server)) < 0) { - throw std::runtime_error("Could not connect to the barrier: " + - std::string(std::strerror(errno))); + void waitOnBarrier() { + int fd = connectToHost(barrier_, "barrier"); + + char dummy; + if (recv(fd, &dummy, sizeof(dummy), 0) < 0) { + throw std::runtime_error("Could not read from the barrier socket: " + + std::string(std::strerror(errno))); + } + + close(fd); } - char dummy; - if (recv(fd, &dummy, sizeof(dummy), 0) < 0) { - throw std::runtime_error("Could not read from the barrier socket: " + - std::string(std::strerror(errno))); + void finishedBroadcasting() { + close(signalFd_); } - close(fd); -} +private: + int connectToHost(Parser::Host const &host, std::string const &reason) { + struct sockaddr_in server; + std::memset(&server, 0, sizeof(server)); + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + throw std::runtime_error("Could not create the " + reason + " socket: " + + std::string(std::strerror(errno))); + } + + server.sin_family = AF_INET; + server.sin_addr.s_addr = host.ip; + server.sin_port = host.port; + if (connect(fd, reinterpret_cast(&server), + sizeof(server)) < 0) { + throw std::runtime_error("Could not connect to the " + reason + ": " + + std::string(std::strerror(errno))); + } + + auto id = htonT(static_cast(id_)); + if (writeWithRetry(fd, reinterpret_cast(&id), sizeof(id))) { + throw std::runtime_error("Could not send my LogicalPID to the " + reason + ": " + + std::string(std::strerror(errno))); + } + + return fd; + } + + // From https://stackoverflow.com/questions/32683086/handling-incomplete-write-calls + ssize_t writeWithRetry (int fd, uint8_t const* buf, size_t size) { + ssize_t ret; + while (size > 0) { + do + { + ret = write(fd, buf, size); + } while ((ret < 0) && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK)); + if (ret < 0) + return ret; + size -= ret; + buf += ret; + } + return 0; + } + + template + constexpr T htonT (T value) noexcept + { + #if __BYTE_ORDER == __LITTLE_ENDIAN + char* ptr = reinterpret_cast(&value); + std::reverse(ptr, ptr + sizeof(T)); + #endif + return value; + } + +private: + unsigned long id_; + Parser::Host barrier_; + Parser::Host signal_; + int signalFd_; +}; diff --git a/template_cpp/src/include/parser.hpp b/template_cpp/src/include/parser.hpp index dbcb601..4c2ffcf 100644 --- a/template_cpp/src/include/parser.hpp +++ b/template_cpp/src/include/parser.hpp @@ -118,6 +118,11 @@ public: return barrier_; } + Host signal() const { + checkParsed(); + return signal_; + } + const char *outputPath() const { checkParsed(); return outputPath_.c_str(); @@ -204,6 +209,10 @@ private: return false; } + if (!parseSignal()) { + return false; + } + if (!parseOutputPath()) { return false; } @@ -218,7 +227,7 @@ private: void help(const int, char const *const *argv) { auto configStr = "CONFIG"; std::cerr << "Usage: " << argv[0] - << " --id ID --hosts HOSTS --barrier NAME:PORT --output OUTPUT"; + << " --id ID --hosts HOSTS --barrier NAME:PORT --signal NAME:PORT --output OUTPUT"; if (!withConfig) { std::cerr << "\n"; @@ -287,13 +296,36 @@ private: return false; } - bool parseOutputPath() { + bool parseSignal() { if (argc < 9) { return false; } - if (std::strcmp(argv[7], "--output") == 0) { - outputPath_ = std::string(argv[8]); + if (std::strcmp(argv[7], "--signal") == 0) { + std::string signal_addr = argv[8]; + std::replace(signal_addr.begin(), signal_addr.end(), ':', ' '); + std::stringstream ss(signal_addr); + + std::string signal_name; + unsigned short signal_port; + + ss >> signal_name; + ss >> signal_port; + + signal_ = Host(0, signal_name, signal_port); + return true; + } + + return false; + } + + bool parseOutputPath() { + if (argc < 11) { + return false; + } + + if (std::strcmp(argv[9], "--output") == 0) { + outputPath_ = std::string(argv[10]); return true; } @@ -305,11 +337,11 @@ private: return true; } - if (argc < 10) { + if (argc < 12) { return false; } - configPath_ = std::string(argv[9]); + configPath_ = std::string(argv[11]); return true; } @@ -352,6 +384,7 @@ private: unsigned long id_; std::string hostsPath_; Host barrier_; + Host signal_; std::string outputPath_; std::string configPath_; }; diff --git a/template_cpp/src/src/main.cpp b/template_cpp/src/src/main.cpp index c736a8c..547766a 100644 --- a/template_cpp/src/src/main.cpp +++ b/template_cpp/src/src/main.cpp @@ -69,6 +69,15 @@ int main(int argc, char **argv) { std::cout << "Machine-readbale Port: " << barrier.port << "\n"; std::cout << "\n"; + std::cout << "Signal:\n"; + std::cout << "========\n"; + auto signal = parser.signal(); + std::cout << "Human-readable IP: " << signal.ipReadable() << "\n"; + std::cout << "Machine-readable IP: " << signal.ip << "\n"; + std::cout << "Human-readbale Port: " << signal.portReadable() << "\n"; + std::cout << "Machine-readbale Port: " << signal.port << "\n"; + std::cout << "\n"; + std::cout << "Path to output:\n"; std::cout << "===============\n"; std::cout << parser.outputPath() << "\n\n"; @@ -81,11 +90,17 @@ int main(int argc, char **argv) { std::cout << "Doing some initialization...\n\n"; + Coordinator coordinator(parser.id(), barrier, signal); + std::cout << "Waiting for all processes to finish initialization\n\n"; - waitOnBarrier(barrier); + coordinator.waitOnBarrier(); std::cout << "Broadcasting messages...\n\n"; + std::cout << "Signaling end of broadcasting messages\n\n"; + coordinator.finishedBroadcasting(); + + while (true) { std::this_thread::sleep_for(std::chrono::seconds(60)); }