diff --git a/validate.py b/validate.py new file mode 100755 index 0000000..3e238c2 --- /dev/null +++ b/validate.py @@ -0,0 +1,489 @@ +#!/usr/bin/env python3 + +import argparse +import os, atexit +import textwrap +import time +import tempfile +import threading, subprocess +import barrier, finishedSignal + + +import signal +import random +import time +from enum import Enum + +from collections import defaultdict, OrderedDict + + +BARRIER_IP = 'localhost' +BARRIER_PORT = 10000 + +SIGNAL_IP = 'localhost' +SIGNAL_PORT = 11000 + +PROCESSES_BASE_IP = 11000 + +# Do not run multiple validations concurrently! + +class TC: + def __init__(self, losses, interface="lo", needSudo=True, sudoPassword=""): + self.losses = losses + self.interface = interface + self.needSudo = needSudo + self.sudoPassword = sudoPassword + + cmd1 = 'tc qdisc add dev {} root netem 2>/dev/null'.format(self.interface) + cmd2 = 'tc qdisc change dev {} root netem delay {} {} loss {} {} reorder {} {}'.format(self.interface, *self.losses['delay'], *self.losses['loss'], *self.losses['reordering']) + + if self.needSudo: + os.system('echo %s | sudo -S {}'.format(cmd1)) + os.system('echo %s | sudo -S {}'.format(cmd2)) + else: + os.system(cmd1) + os.system(cmd2) + + atexit.register(self.cleanup) + + def __str__(self): + ret = """\ + Interface: {} + Delay: {} {} + Loss: {} {} + Reordering: {} {}""".format( + self.interface, + *self.losses['delay'], + *self.losses['loss'], + *self.losses['reordering']) + + return textwrap.dedent(ret) + + def cleanup(self): + cmd = 'tc qdisc del dev {} root 2>/dev/null'.format(self.interface) + if self.needSudo: + os.system('echo %s | sudo -S {}'.format(cmd)) + else: + os.system(cmd) + +class ProcessState(Enum): + RUNNING = 1 + STOPPED = 2 + TERMINATED = 3 + +class ProcessInfo: + def __init__(self, handle): + self.lock = threading.Lock() + self.handle = handle + self.state = ProcessState.RUNNING + + @staticmethod + def stateToSignal(state): + if state == ProcessState.RUNNING: + return signal.SIGCONT + + if state == ProcessState.STOPPED: + return signal.SIGSTOP + + if state == ProcessState.TERMINATED: + return signal.SIGTERM + + @staticmethod + def stateToSignalStr(state): + if state == ProcessState.RUNNING: + return "SIGCONT" + + if state == ProcessState.STOPPED: + return "SIGSTOP" + + if state == ProcessState.TERMINATED: + return "SIGTERM" + + @staticmethod + def validStateTransition(current, desired): + if current == ProcessState.TERMINATED: + return False + + if current == ProcessState.RUNNING: + return desired == ProcessState.STOPPED or desired == ProcessState.TERMINATED + + if current == ProcessState.STOPPED: + return desired == ProcessState.RUNNING + + return False + +class AtomicSaturatedCounter: + def __init__(self, saturation, initial=0): + self._saturation = saturation + self._value = initial + self._lock = threading.Lock() + + def reserve(self): + with self._lock: + if self._value < self._saturation: + self._value += 1 + return True + else: + return False + +class Validation: + def __init__(self, processes, messages, outputDir): + self.processes = processes + self.messages = messages + self.outputDirPath = os.path.abspath(outputDir) + if not os.path.isdir(self.outputDirPath): + raise Exception("`{}` is not a directory".format(self.outputDirPath)) + + def generateConfig(self): + # Implement on the derived classes + pass + + def checkProcess(self, pid): + # Implement on the derived classes + pass + + def checkAll(self, continueOnError=True): + ok = True + for pid in range(1, self.processes+1): + ret = self.checkProcess(pid) + if not ret: + ok = False + + if not ret and not continueOnError: + return False + + return ok + +class FifoBroadcastValidation(Validation): + def generateConfig(self): + hosts = tempfile.NamedTemporaryFile(mode='w') + config = tempfile.NamedTemporaryFile(mode='w') + + for i in range(1, self.processes + 1): + hosts.write("{} localhost {}\n".format(i, PROCESSES_BASE_IP+i)) + + hosts.flush() + + config.write("{}\n".format(self.messages)) + config.flush() + + return (hosts, config) + + def checkProcess(self, pid): + filePath = os.path.join(self.outputDirPath, 'proc{:02d}.output'.format(pid)) + + i = 1 + nextMessage = defaultdict(lambda : 1) + filename = os.path.basename(filePath) + + with open(filePath) as f: + for lineNumber, line in enumerate(f): + tokens = line.split() + + # Check broadcast + if tokens[0] == 'b': + msg = int(tokens[1]) + if msg != i: + print("File {}, Line {}: Messages broadcast out of order. Expected message {} but broadcast message {}".format(filename, lineNumber, i, msg)) + return False + i += 1 + + # Check delivery + if tokens[0] == 'd': + sender = int(tokens[1]) + msg = int(tokens[2]) + if msg != nextMessage[sender]: + print("File {}, Line {}: Message delivered out of order. Expected message {}, but delivered message {}".format(filename, lineNumber, nextMessage[sender], msg)) + return False + else: + nextMessage[sender] = msg + 1 + + return True + +class LCausalBroadcastValidation(Validation): + def __init__(self, processes, outputDir, causalRelationships): + super().__init__(processes, outputDir) + + def generateConfig(self): + raise NotImplementedError() + + def checkProcess(self, pid): + raise NotImplementedError() + +class StressTest: + def __init__(self, procs, concurrency, attempts, attemptsRatio): + self.processes = len(procs) + self.processesInfo = dict() + for (logicalPID, handle) in procs: + self.processesInfo[logicalPID] = ProcessInfo(handle) + self.concurrency = concurrency + self.attempts = attempts + self.attemptsRatio = attemptsRatio + + maxTerminatedProcesses = self.processes // 2 if self.processes % 2 == 1 else (self.processes - 1) // 2 + self.terminatedProcs = AtomicSaturatedCounter(maxTerminatedProcesses) + + def stress(self): + selectProc = list(range(1, self.processes+1)) + random.shuffle(selectProc) + + selectOp = [ProcessState.STOPPED] * int(1000 * self.attemptsRatio['STOP']) + \ + [ProcessState.RUNNING] * int(1000 * self.attemptsRatio['CONT']) + \ + [ProcessState.TERMINATED] * int(1000 * self.attemptsRatio['TERM']) + random.shuffle(selectOp) + + successfulAttempts = 0 + while successfulAttempts < self.attempts: + proc = random.choice(selectProc) + op = random.choice(selectOp) + info = self.processesInfo[proc] + + with info.lock: + if ProcessInfo.validStateTransition(info.state, op): + + if op == ProcessState.TERMINATED: + reserved = self.terminatedProcs.reserve() + if reserved: + selectProc.remove(proc) + else: + continue + + time.sleep(float(random.randint(50, 500)) / 1000.0) + info.handle.send_signal(ProcessInfo.stateToSignal(op)) + info.state = op + successfulAttempts += 1 + print("Sending {} to process {}".format(ProcessInfo.stateToSignalStr(op), proc)) + + # if op == ProcessState.TERMINATED and proc not in terminatedProcs: + # if len(terminatedProcs) < maxTerminatedProcesses: + + # terminatedProcs.add(proc) + + # if len(terminatedProcs) == maxTerminatedProcesses: + # break + + def remainingUnterminatedProcesses(self): + remaining = [] + for pid, info in self.processesInfo.items(): + with info.lock: + if info.state != ProcessState.TERMINATED: + remaining.append(pid) + + return None if len(remaining) == 0 else remaining + + def terminateAllProcesses(self): + for _, info in self.processesInfo.items(): + with info.lock: + if info.state != ProcessState.TERMINATED: + if info.state == ProcessState.STOPPED: + info.handle.send_signal(ProcessInfo.stateToSignal(ProcessState.RUNNING)) + + info.handle.send_signal(ProcessInfo.stateToSignal(ProcessState.TERMINATED)) + + return False + + def continueStoppedProcesses(self): + for _, info in self.processesInfo.items(): + with info.lock: + if info.state != ProcessState.TERMINATED: + if info.state == ProcessState.STOPPED: + info.handle.send_signal(ProcessInfo.stateToSignal(ProcessState.RUNNING)) + + def run(self): + if self.concurrency > 1: + threads = [threading.Thread(target=self.stress) for _ in range(self.concurrency)] + [p.start() for p in threads] + [p.join() for p in threads] + else: + self.stress() + +def startProcesses(processes, runscript, hostsFilePath, configFilePath, outputDir): + runscriptPath = os.path.abspath(runscript) + if not os.path.isfile(runscriptPath): + raise Exception("`{}` is not a file".format(runscriptPath)) + + if os.path.basename(runscriptPath) != 'run.sh': + raise Exception("`{}` is not a runscript".format(runscriptPath)) + + outputDirPath = os.path.abspath(outputDir) + if not os.path.isdir(outputDirPath): + raise Exception("`{}` is not a directory".format(outputDirPath)) + + baseDir, _ = os.path.split(runscriptPath) + bin_cpp = os.path.join(baseDir, "bin", "da_proc") + bin_java = os.path.join(baseDir, "bin", "da_proc.jar") + + if os.path.exists(bin_cpp): + cmd = [bin_cpp] + elif os.path.exists(bin_java): + cmd = ['java', '-jar', bin_java] + else: + raise Exception("`{}` could not find a binary to execute. Make sure you build before validating".format(runscriptPath)) + + procs = [] + for pid in range(1, processes+1): + cmd_ext = ['--id', str(pid), + '--hosts', hostsFilePath, + '--barrier', '{}:{}'.format(BARRIER_IP, BARRIER_PORT), + '--signal', '{}:{}'.format(SIGNAL_IP, SIGNAL_PORT), + '--output', os.path.join(outputDirPath, 'proc{:02d}.output'.format(pid)), + configFilePath] + + stdoutFd = open(os.path.join(outputDirPath, 'proc{:02d}.stdout'.format(pid)), "w") + stderrFd = open(os.path.join(outputDirPath, 'proc{:02d}.stderr'.format(pid)), "w") + + + procs.append((pid, subprocess.Popen(cmd + cmd_ext, stdout=stdoutFd, stderr=stderrFd))) + + return procs + +def main(processes, messages, runscript, broadcastType, logsDir, testConfig): + # Set tc for loopback + tc = TC(testConfig['TC']) + print(tc) + + # Start the barrier + initBarrier = barrier.Barrier(BARRIER_IP, BARRIER_PORT, processes) + initBarrier.listen() + startTimesFuture = initBarrier.startTimesFuture() + + initBarrierThread = threading.Thread(target=initBarrier.wait) + initBarrierThread.start() + + # Start the finish signal + finishSignal = finishedSignal.FinishedSignal(SIGNAL_IP, SIGNAL_PORT, processes) + finishSignal.listen() + finishSignalThread = threading.Thread(target=finishSignal.wait) + finishSignalThread.start() + + if broadcastType == "fifo": + validation = FifoBroadcastValidation(processes, messages, logsDir) + else: + validation = LCausalBroadcastValidation(processes, messages, logsDir, None) + + hostsFile, configFile = validation.generateConfig() + + try: + # Start the processes and get their PIDs + procs = startProcesses(processes, runscript, hostsFile.name, configFile.name, logsDir) + + # Create the stress test + st = StressTest(procs, + testConfig['ST']['concurrency'], + testConfig['ST']['attempts'], + testConfig['ST']['attemptsDistribution']) + + for (logicalPID, procHandle) in procs: + print("Process with logicalPID {} has PID {}".format(logicalPID, procHandle.pid)) + + + initBarrierThread.join() + print("All processes have been initialized.") + + st.run() + print("StressTest is complete.") + + + print("Resuming stopped processes.") + st.continueStoppedProcesses() + + print("Waiting until all running processes have finished broadcasting.") + finishSignalThread.join() + + for pid, startTs in OrderedDict(sorted(startTimesFuture.items())).items(): + print("Process {} finished broadcasting {} messages in {} ms".format(pid, messages, finishSignal.endTimestamps()[pid] - startTs)) + + unterminated = st.remainingUnterminatedProcesses() + if unterminated is not None: + input('Hit `Enter` to terminate the remaining processes with logicalPIDs {}.'.format(unterminated)) + st.terminateAllProcesses() + + mutex = threading.Lock() + + def waitForProcess(logicalPID, procHandle, mutex): + procHandle.wait() + + with mutex: + print("Process {} exited with {}".format(logicalPID, procHandle.returncode)) + + # Monitor which processes have exited + monitors = [threading.Thread(target=waitForProcess, args=(logicalPID, procHandle, mutex)) for (logicalPID, procHandle) in procs] + [p.start() for p in monitors] + [p.join() for p in monitors] + + print("Result of validation: {}".format(validation.checkAll())) + + finally: + if procs is not None: + for _, p in procs: + p.kill() + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + parser.add_argument( + "-r", + "--runscript", + required=True, + dest="runscript", + help="Path to run.sh", + ) + + parser.add_argument( + "-b", + "--broadcast", + choices=["fifo", "lcausal"], + required=True, + dest="broadcastType", + help="Which broadcast implementation to test", + ) + + parser.add_argument( + "-l", + "--logs", + required=True, + dest="logsDir", + help="Directory to store stdout, stderr and outputs generated by the processes", + ) + + parser.add_argument( + "-p", + "--processes", + required=True, + type=int, + dest="processes", + help="Number of processes that broadcast", + ) + + parser.add_argument( + "-m", + "--messages", + required=True, + type=int, + dest="messages", + help="Maximum number (because it can crash) of messages that each process can broadcast", + ) + + results = parser.parse_args() + + testConfig = { + # Network configuration using the tc command + 'TC': { + 'delay': ('200ms', '50ms'), + 'loss': ('10%', '25%'), + 'reordering': ('25%', '50%') + }, + + # StressTest configuration + 'ST': { + 'concurrency' : 8, # How many threads are interferring with the running processes + 'attempts' : 8, # How many interferring attempts each threads does + 'attemptsDistribution' : { # Probability with which an interferring thread will + 'STOP': 0.48, # select an interferring action (make sure they add up to 1) + 'CONT': 0.48, + 'TERM':0.04 + } + } + } + + main(results.processes, results.messages, results.runscript, results.broadcastType, results.logsDir, testConfig) \ No newline at end of file