1
0

Add mechanism to measure when broadcast finished execution

This commit is contained in:
Athanasios Xygkis 2020-09-21 11:57:49 +02:00 committed by Thanasis Xigis
parent da6d2f3e6f
commit 21124b2613
4 changed files with 237 additions and 29 deletions

102
finishedSignal.py Executable file
View File

@ -0,0 +1,102 @@
#!/usr/bin/env python3
import argparse
import socket
import time
import struct
import selectors
from collections import OrderedDict
class FinishedSignal:
def __init__(self, host, port, waitFor, printer=None):
self.host = host
self.port = port
self.waitFor = waitFor
self.printer = printer
self.connections = dict()
self.endTimes = dict()
self.sel = selectors.DefaultSelector()
def listen(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.sock.bind((self.host, self.port))
self.sock.listen(128)
self.sock.setblocking(False)
self.sel.register(self.sock, selectors.EVENT_READ, self.accept)
def wait(self):
while self.waitFor > 0:
events = self.sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
for key, pid_timestamp in self.connections.items():
pid = pid_timestamp[0]
ts = pid_timestamp[1]
self.endTimes[pid] = ts
def endTimestamps(self):
return self.endTimes
def accept(self, sock, mask):
conn, addr = sock.accept()
conn.setblocking(False)
self.sel.register(conn, selectors.EVENT_READ, self.read)
def read(self, conn, mask):
data = conn.recv(8)
if data:
pid = struct.unpack('!Q', bytes(data))[0]
self.connections[conn] = [pid]
if self.printer:
host, port = conn.getpeername()
addr = '{}:{}'.format(host, port)
self.printer("Connection from {}, corresponds to PID {}".format(addr, pid))
else:
self.connections[conn].append(int(time.time() * 1000))
self.sel.unregister(conn)
conn.close()
self.waitFor -= 1
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--host",
default="0.0.0.0",
dest="host",
help="IP address where the finish signal listens to (default: any)",
)
parser.add_argument(
"--port",
default=11000,
type=int,
dest="port",
help="TCP port where the finish signal listens to (default: 11000)",
)
parser.add_argument(
"-p",
"--processes",
required=True,
type=int,
dest="processes",
help="Number of processes the finish signal waits for",
)
results = parser.parse_args()
signal = FinishedSignal(results.host, results.port, results.processes, print)
signal.listen()
print("Finish signal listens on {}:{} and waits for {} processes".format(results.host, results.port, results.processes))
signal.wait()
for pid, endTs in OrderedDict(sorted(signal.endTimestamps().items())).items():
print("Process {} finished broadcasting at time {} ms from Unix epoch ".format(pid, endTs))

View File

@ -1,32 +1,90 @@
#pragma once
#include "parser.hpp"
void waitOnBarrier(Parser::Host const &barrier);
#include <endian.h>
#include <algorithm>
void waitOnBarrier(Parser::Host const &barrier) {
struct sockaddr_in server;
std::memset(&server, 0, sizeof(server));
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
throw std::runtime_error("Could not create the barrier socket: " +
std::string(std::strerror(errno)));
class Coordinator {
public:
Coordinator(unsigned long id, Parser::Host barrier, Parser::Host signal)
: id_{id}, barrier_{barrier}, signal_{signal}
{
signalFd_ = connectToHost(signal_, "signal");
}
server.sin_family = AF_INET;
server.sin_addr.s_addr = barrier.ip;
server.sin_port = barrier.port;
if (connect(fd, reinterpret_cast<struct sockaddr *>(&server),
sizeof(server)) < 0) {
throw std::runtime_error("Could not connect to the barrier: " +
std::string(std::strerror(errno)));
void waitOnBarrier() {
int fd = connectToHost(barrier_, "barrier");
char dummy;
if (recv(fd, &dummy, sizeof(dummy), 0) < 0) {
throw std::runtime_error("Could not read from the barrier socket: " +
std::string(std::strerror(errno)));
}
close(fd);
}
char dummy;
if (recv(fd, &dummy, sizeof(dummy), 0) < 0) {
throw std::runtime_error("Could not read from the barrier socket: " +
std::string(std::strerror(errno)));
void finishedBroadcasting() {
close(signalFd_);
}
close(fd);
}
private:
int connectToHost(Parser::Host const &host, std::string const &reason) {
struct sockaddr_in server;
std::memset(&server, 0, sizeof(server));
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
throw std::runtime_error("Could not create the " + reason + " socket: " +
std::string(std::strerror(errno)));
}
server.sin_family = AF_INET;
server.sin_addr.s_addr = host.ip;
server.sin_port = host.port;
if (connect(fd, reinterpret_cast<struct sockaddr *>(&server),
sizeof(server)) < 0) {
throw std::runtime_error("Could not connect to the " + reason + ": " +
std::string(std::strerror(errno)));
}
auto id = htonT(static_cast<uint64_t>(id_));
if (writeWithRetry(fd, reinterpret_cast<uint8_t *>(&id), sizeof(id))) {
throw std::runtime_error("Could not send my LogicalPID to the " + reason + ": " +
std::string(std::strerror(errno)));
}
return fd;
}
// From https://stackoverflow.com/questions/32683086/handling-incomplete-write-calls
ssize_t writeWithRetry (int fd, uint8_t const* buf, size_t size) {
ssize_t ret;
while (size > 0) {
do
{
ret = write(fd, buf, size);
} while ((ret < 0) && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK));
if (ret < 0)
return ret;
size -= ret;
buf += ret;
}
return 0;
}
template <typename T>
constexpr T htonT (T value) noexcept
{
#if __BYTE_ORDER == __LITTLE_ENDIAN
char* ptr = reinterpret_cast<char*>(&value);
std::reverse(ptr, ptr + sizeof(T));
#endif
return value;
}
private:
unsigned long id_;
Parser::Host barrier_;
Parser::Host signal_;
int signalFd_;
};

View File

@ -118,6 +118,11 @@ public:
return barrier_;
}
Host signal() const {
checkParsed();
return signal_;
}
const char *outputPath() const {
checkParsed();
return outputPath_.c_str();
@ -204,6 +209,10 @@ private:
return false;
}
if (!parseSignal()) {
return false;
}
if (!parseOutputPath()) {
return false;
}
@ -218,7 +227,7 @@ private:
void help(const int, char const *const *argv) {
auto configStr = "CONFIG";
std::cerr << "Usage: " << argv[0]
<< " --id ID --hosts HOSTS --barrier NAME:PORT --output OUTPUT";
<< " --id ID --hosts HOSTS --barrier NAME:PORT --signal NAME:PORT --output OUTPUT";
if (!withConfig) {
std::cerr << "\n";
@ -287,13 +296,36 @@ private:
return false;
}
bool parseOutputPath() {
bool parseSignal() {
if (argc < 9) {
return false;
}
if (std::strcmp(argv[7], "--output") == 0) {
outputPath_ = std::string(argv[8]);
if (std::strcmp(argv[7], "--signal") == 0) {
std::string signal_addr = argv[8];
std::replace(signal_addr.begin(), signal_addr.end(), ':', ' ');
std::stringstream ss(signal_addr);
std::string signal_name;
unsigned short signal_port;
ss >> signal_name;
ss >> signal_port;
signal_ = Host(0, signal_name, signal_port);
return true;
}
return false;
}
bool parseOutputPath() {
if (argc < 11) {
return false;
}
if (std::strcmp(argv[9], "--output") == 0) {
outputPath_ = std::string(argv[10]);
return true;
}
@ -305,11 +337,11 @@ private:
return true;
}
if (argc < 10) {
if (argc < 12) {
return false;
}
configPath_ = std::string(argv[9]);
configPath_ = std::string(argv[11]);
return true;
}
@ -352,6 +384,7 @@ private:
unsigned long id_;
std::string hostsPath_;
Host barrier_;
Host signal_;
std::string outputPath_;
std::string configPath_;
};

View File

@ -69,6 +69,15 @@ int main(int argc, char **argv) {
std::cout << "Machine-readbale Port: " << barrier.port << "\n";
std::cout << "\n";
std::cout << "Signal:\n";
std::cout << "========\n";
auto signal = parser.signal();
std::cout << "Human-readable IP: " << signal.ipReadable() << "\n";
std::cout << "Machine-readable IP: " << signal.ip << "\n";
std::cout << "Human-readbale Port: " << signal.portReadable() << "\n";
std::cout << "Machine-readbale Port: " << signal.port << "\n";
std::cout << "\n";
std::cout << "Path to output:\n";
std::cout << "===============\n";
std::cout << parser.outputPath() << "\n\n";
@ -81,11 +90,17 @@ int main(int argc, char **argv) {
std::cout << "Doing some initialization...\n\n";
Coordinator coordinator(parser.id(), barrier, signal);
std::cout << "Waiting for all processes to finish initialization\n\n";
waitOnBarrier(barrier);
coordinator.waitOnBarrier();
std::cout << "Broadcasting messages...\n\n";
std::cout << "Signaling end of broadcasting messages\n\n";
coordinator.finishedBroadcasting();
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(60));
}