diff --git a/barrier.py b/barrier.py index 4d621b6..3ff9bae 100755 --- a/barrier.py +++ b/barrier.py @@ -2,49 +2,56 @@ import argparse import socket +import time +import struct +from collections import OrderedDict class Barrier: - def __init__(self, host, port, wait_for): + def __init__(self, host, port, waitFor, printer=None): self.host = host self.port = port - self.wait_for = wait_for + self.waitFor = waitFor + self.printer = printer + self.startTimes = dict() def listen(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.bind((self.host, self.port)) - self.sock.listen() + self.sock.listen(128) - def waitSingle(self): + def wait(self): connections = [] addresses = [] while True: conn, addr = self.sock.accept() - connections.append(conn) - addresses.append(addr) - yield addr - if len(connections) == self.wait_for: + idInBytes = [] + while len(idInBytes) < 8: + data = conn.recv(8 - len(idInBytes)) + if not data: + raise Exception("Could not recv the LogicalPID") + idInBytes += data + + pid = struct.unpack('!Q', bytes(idInBytes))[0] + connections.append((pid, conn)) + addresses.append(addr) + + if self.printer: + self.printer("Connection from {}, corresponds to PID {}".format(addr, pid)) + + if len(connections) == self.waitFor: break - for conn in connections: + for pid, conn in connections: + self.startTimes[pid] = int(time.time() * 1000) conn.close() return None - def wait(self): - g = self.waitSingle() - - conn = [] - while True: - try: - conn.append(next(g)) - except StopIteration: - break - - return conn - + def startTimesFuture(self): + return self.startTimes if __name__ == "__main__": parser = argparse.ArgumentParser() @@ -58,13 +65,14 @@ if __name__ == "__main__": parser.add_argument( "--port", - default=11000, + default=10000, type=int, dest="port", - help="TCP port where the barrier listens to (default: 11000)", + help="TCP port where the barrier listens to (default: 10000)", ) parser.add_argument( + "-p", "--processes", required=True, type=int, @@ -74,16 +82,11 @@ if __name__ == "__main__": results = parser.parse_args() - barrier = Barrier(results.host, results.port, results.processes) + barrier = Barrier(results.host, results.port, results.processes, print) barrier.listen() print("Barrier listens on {}:{} and waits for {} processes".format(results.host, results.port, results.processes)) - # connectedAddr = barrier.wait() - connectedAddrGen = barrier.waitSingle() + barrier.wait() - while True: - try: - connectedAddr = next(connectedAddrGen) - print("Connection from {}".format(connectedAddr)) - except StopIteration: - break \ No newline at end of file + for pid, startTs in OrderedDict(sorted(barrier.startTimesFuture().items())).items(): + print("Process {} started broadcasting at time {} ms from Unix epoch ".format(pid, startTs)) \ No newline at end of file