621 lines
21 KiB
Python
Executable File
621 lines
21 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import os, atexit
|
|
import textwrap
|
|
import time
|
|
import tempfile
|
|
import threading, subprocess
|
|
import barrier, finishedSignal
|
|
|
|
|
|
import signal
|
|
import random
|
|
import time
|
|
from enum import Enum
|
|
|
|
from collections import defaultdict, OrderedDict
|
|
|
|
|
|
BARRIER_IP = 'localhost'
|
|
BARRIER_PORT = 10000
|
|
|
|
SIGNAL_IP = 'localhost'
|
|
SIGNAL_PORT = 11000
|
|
|
|
PROCESSES_BASE_IP = 11000
|
|
|
|
# Do not run multiple validations concurrently!
|
|
|
|
class TC:
|
|
def __init__(self, losses, interface="lo", needSudo=True, sudoPassword="dcl"):
|
|
self.losses = losses
|
|
self.interface = interface
|
|
self.needSudo = needSudo
|
|
self.sudoPassword = sudoPassword
|
|
|
|
cmd1 = 'tc qdisc add dev {} root netem 2>/dev/null'.format(self.interface)
|
|
cmd2 = 'tc qdisc change dev {} root netem delay {} {} loss {} {} reorder {} {}'.format(self.interface, *self.losses['delay'], *self.losses['loss'], *self.losses['reordering'])
|
|
|
|
if self.needSudo:
|
|
os.system("echo {} | sudo -S {}".format(self.sudoPassword, cmd1))
|
|
os.system("echo {} | sudo -S {}".format(self.sudoPassword, cmd2))
|
|
else:
|
|
os.system(cmd1)
|
|
os.system(cmd2)
|
|
|
|
atexit.register(self.cleanup)
|
|
|
|
def __str__(self):
|
|
ret = """\
|
|
Interface: {}
|
|
Delay: {} {}
|
|
Loss: {} {}
|
|
Reordering: {} {}""".format(
|
|
self.interface,
|
|
*self.losses['delay'],
|
|
*self.losses['loss'],
|
|
*self.losses['reordering'])
|
|
|
|
return textwrap.dedent(ret)
|
|
|
|
def cleanup(self):
|
|
cmd = 'tc qdisc del dev {} root 2>/dev/null'.format(self.interface)
|
|
if self.needSudo:
|
|
os.system("echo '{}' | sudo -S {}".format(self.sudoPassword, cmd))
|
|
else:
|
|
os.system(cmd)
|
|
|
|
class ProcessState(Enum):
|
|
RUNNING = 1
|
|
STOPPED = 2
|
|
TERMINATED = 3
|
|
|
|
class ProcessInfo:
|
|
def __init__(self, handle):
|
|
self.lock = threading.Lock()
|
|
self.handle = handle
|
|
self.state = ProcessState.RUNNING
|
|
|
|
@staticmethod
|
|
def stateToSignal(state):
|
|
if state == ProcessState.RUNNING:
|
|
return signal.SIGCONT
|
|
|
|
if state == ProcessState.STOPPED:
|
|
return signal.SIGSTOP
|
|
|
|
if state == ProcessState.TERMINATED:
|
|
return signal.SIGTERM
|
|
|
|
@staticmethod
|
|
def stateToSignalStr(state):
|
|
if state == ProcessState.RUNNING:
|
|
return "SIGCONT"
|
|
|
|
if state == ProcessState.STOPPED:
|
|
return "SIGSTOP"
|
|
|
|
if state == ProcessState.TERMINATED:
|
|
return "SIGTERM"
|
|
|
|
@staticmethod
|
|
def validStateTransition(current, desired):
|
|
if current == ProcessState.TERMINATED:
|
|
return False
|
|
|
|
if current == ProcessState.RUNNING:
|
|
return desired == ProcessState.STOPPED or desired == ProcessState.TERMINATED
|
|
|
|
if current == ProcessState.STOPPED:
|
|
return desired == ProcessState.RUNNING
|
|
|
|
return False
|
|
|
|
class AtomicSaturatedCounter:
|
|
def __init__(self, saturation, initial=0):
|
|
self._saturation = saturation
|
|
self._value = initial
|
|
self._lock = threading.Lock()
|
|
|
|
def reserve(self):
|
|
with self._lock:
|
|
if self._value < self._saturation:
|
|
self._value += 1
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
class Validation:
|
|
def __init__(self, processes, messages, outputDir):
|
|
self.processes = processes
|
|
self.messages = messages
|
|
self.outputDirPath = os.path.abspath(outputDir)
|
|
if not os.path.isdir(self.outputDirPath):
|
|
raise Exception("`{}` is not a directory".format(self.outputDirPath))
|
|
|
|
def generateConfig(self):
|
|
# Implement on the derived classes
|
|
pass
|
|
|
|
def checkProcess(self, pid):
|
|
# Implement on the derived classes
|
|
pass
|
|
|
|
def checkAll(self, continueOnError=True):
|
|
ok = True
|
|
for pid in range(1, self.processes+1):
|
|
ret = self.checkProcess(pid)
|
|
if not ret:
|
|
ok = False
|
|
|
|
if not ret and not continueOnError:
|
|
return False
|
|
|
|
return ok
|
|
|
|
class FifoBroadcastValidation(Validation):
|
|
def generateConfig(self):
|
|
hosts = tempfile.NamedTemporaryFile(mode='w')
|
|
config = tempfile.NamedTemporaryFile(mode='w')
|
|
|
|
for i in range(1, self.processes + 1):
|
|
hosts.write("{} localhost {}\n".format(i, PROCESSES_BASE_IP+i))
|
|
|
|
hosts.flush()
|
|
|
|
config.write("{}\n".format(self.messages))
|
|
config.flush()
|
|
|
|
return (hosts, config)
|
|
|
|
def checkProcess(self, pid):
|
|
filePath = os.path.join(self.outputDirPath, 'proc{:02d}.output'.format(pid))
|
|
|
|
i = 1
|
|
nextMessage = defaultdict(lambda : 1)
|
|
filename = os.path.basename(filePath)
|
|
|
|
with open(filePath) as f:
|
|
for lineNumber, line in enumerate(f):
|
|
tokens = line.split()
|
|
|
|
# Check broadcast
|
|
if tokens[0] == 'b':
|
|
msg = int(tokens[1])
|
|
if msg != i:
|
|
print("File {}, Line {}: Messages broadcast out of order. Expected message {} but broadcast message {}".format(filename, lineNumber, i, msg))
|
|
return False
|
|
i += 1
|
|
|
|
# Check delivery
|
|
if tokens[0] == 'd':
|
|
sender = int(tokens[1])
|
|
msg = int(tokens[2])
|
|
if msg != nextMessage[sender]:
|
|
print("File {}, Line {}: Message delivered out of order. Expected message {}, but delivered message {}".format(filename, lineNumber, nextMessage[sender], msg))
|
|
return False
|
|
else:
|
|
nextMessage[sender] = msg + 1
|
|
|
|
return True
|
|
|
|
class LCausalBroadcastValidation(Validation):
|
|
|
|
def generateConfig(self):
|
|
hosts = tempfile.NamedTemporaryFile(mode='w')
|
|
config = tempfile.NamedTemporaryFile(mode='w')
|
|
|
|
# Hosts file
|
|
for i in range(1, self.processes + 1):
|
|
hosts.write("{} localhost {}\n".format(i, PROCESSES_BASE_IP+i))
|
|
hosts.flush()
|
|
|
|
self.lcausalDeps = defaultdict(list)
|
|
|
|
# # of messages
|
|
config.write("{}\n".format(self.messages))
|
|
|
|
# Config file; Create some random locality. Each process may have
|
|
# up to N processes / 2 locally dependent processes.
|
|
for thisproc in range(1, self.processes + 1):
|
|
config.write(str(thisproc) + " ")
|
|
|
|
# Add random number of dependencies
|
|
deps = [i for i in range(1, self.processes + 1) if i != thisproc]
|
|
for j in range(0, random.randint(0, int(self.processes/2))):
|
|
depidx = random.randint(0, len(deps) - 1)
|
|
otherproc = deps[depidx]
|
|
config.write(str(otherproc) + " ")
|
|
self.lcausalDeps[thisproc].append(otherproc)
|
|
del deps[depidx]
|
|
config.write("\n")
|
|
|
|
config.flush()
|
|
|
|
return (hosts, config)
|
|
|
|
def filePathForPID(self, pid):
|
|
return os.path.join(self.outputDirPath, 'proc{:02d}.output'.format(pid))
|
|
|
|
|
|
def verifyCausality(self, fromPid, broadcastSeq, toPid, fromHistory):
|
|
ok = True
|
|
filename = self.filePathForPID(toPid)
|
|
toPidFile = open(filename)
|
|
toHistory = defaultdict(lambda: 0)
|
|
|
|
for lineNumber, line in enumerate(toPidFile):
|
|
tokens = line.split()
|
|
if tokens[0] == 'd':
|
|
# Record most-recently received value from process. This is
|
|
# sufficient due to LCausal also adhering to FIFO.
|
|
if tokens[1] in fromHistory:
|
|
toHistory[tokens[1]] = int(tokens[2])
|
|
|
|
if tokens[1] == fromPid and tokens[2] == broadcastSeq:
|
|
# Found the dependent broadcast, ensure that history is consistent
|
|
# up to this point
|
|
for pid, seq in fromHistory.items():
|
|
if toHistory[pid] < seq:
|
|
print("File {}, Line {}: \n\tDelivered dependent message {}:{} with unresolved dependency\n"
|
|
"\t Message was dependent on {}:{} but only delivered up to {}:{}".format(filename, lineNumber + 1,fromPid, broadcastSeq, pid,seq, pid, toHistory[pid]))
|
|
ok = False
|
|
|
|
return ok
|
|
|
|
|
|
|
|
def checkLCausal(self, pid):
|
|
if pid not in self.lcausalDeps:
|
|
return True
|
|
|
|
depHistory = defaultdict(lambda: 0)
|
|
pidFile = open(self.filePathForPID(pid))
|
|
|
|
for line in pidFile.readlines():
|
|
ltokens = line.split()
|
|
|
|
# If token is broadcast, for each other process in the system,
|
|
# verify that that token was delivered with adherance to the
|
|
# causality as recorded in the @var depHistory up to this point
|
|
if ltokens[0] == 'b':
|
|
for i in range(1, self.processes+1):
|
|
if i == pid:
|
|
continue
|
|
if not self.verifyCausality(
|
|
fromPid=str(pid),
|
|
broadcastSeq=ltokens[1],
|
|
toPid=i,
|
|
fromHistory=depHistory
|
|
):
|
|
return False
|
|
|
|
# If token is deliver and is in the set of causally dependent processes,
|
|
# record it as being part of the dependent history up to this point.
|
|
if ltokens[0] == 'd' and int(ltokens[1]) in self.lcausalDeps[pid]:
|
|
depHistory[ltokens[1]] = int(ltokens[2])
|
|
|
|
return True
|
|
|
|
|
|
def checkFIFO(self, pid):
|
|
filePath = os.path.join(self.outputDirPath, 'proc{:02d}.output'.format(pid))
|
|
|
|
i = 1
|
|
nextMessage = defaultdict(lambda : 1)
|
|
filename = os.path.basename(filePath)
|
|
|
|
with open(filePath) as f:
|
|
for lineNumber, line in enumerate(f):
|
|
tokens = line.split()
|
|
|
|
# Check broadcast
|
|
if tokens[0] == 'b':
|
|
msg = int(tokens[1])
|
|
if msg != i:
|
|
print("File {}, Line {}: Messages broadcast out of order. Expected message {} but broadcast message {}".format(filename, lineNumber, i, msg))
|
|
return False
|
|
i += 1
|
|
|
|
# Check delivery
|
|
if tokens[0] == 'd':
|
|
sender = int(tokens[1])
|
|
msg = int(tokens[2])
|
|
if msg != nextMessage[sender]:
|
|
print("File {}, Line {}: Message delivered out of order. Expected message {}, but delivered message {}".format(filename, lineNumber, nextMessage[sender], msg))
|
|
return False
|
|
else:
|
|
nextMessage[sender] = msg + 1
|
|
return True
|
|
|
|
|
|
def checkAll(self, continueOnError=True):
|
|
print("LCausal verification:")
|
|
for pid in range(1, self.processes+1):
|
|
ret = self.checkFIFO(pid)
|
|
ret &= self.checkLCausal(pid)
|
|
|
|
if not ret and not continueOnError:
|
|
return False
|
|
return True
|
|
|
|
class StressTest:
|
|
def __init__(self, procs, concurrency, attempts, attemptsRatio):
|
|
self.processes = len(procs)
|
|
self.processesInfo = dict()
|
|
for (logicalPID, handle) in procs:
|
|
self.processesInfo[logicalPID] = ProcessInfo(handle)
|
|
self.concurrency = concurrency
|
|
self.attempts = attempts
|
|
self.attemptsRatio = attemptsRatio
|
|
|
|
maxTerminatedProcesses = self.processes // 2 if self.processes % 2 == 1 else (self.processes - 1) // 2
|
|
self.terminatedProcs = AtomicSaturatedCounter(maxTerminatedProcesses)
|
|
|
|
def stress(self):
|
|
selectProc = list(range(1, self.processes+1))
|
|
random.shuffle(selectProc)
|
|
|
|
selectOp = [ProcessState.STOPPED] * int(1000 * self.attemptsRatio['STOP']) + \
|
|
[ProcessState.RUNNING] * int(1000 * self.attemptsRatio['CONT']) + \
|
|
[ProcessState.TERMINATED] * int(1000 * self.attemptsRatio['TERM'])
|
|
random.shuffle(selectOp)
|
|
|
|
successfulAttempts = 0
|
|
while successfulAttempts < self.attempts:
|
|
proc = random.choice(selectProc)
|
|
op = random.choice(selectOp)
|
|
info = self.processesInfo[proc]
|
|
|
|
with info.lock:
|
|
if ProcessInfo.validStateTransition(info.state, op):
|
|
|
|
if op == ProcessState.TERMINATED:
|
|
reserved = self.terminatedProcs.reserve()
|
|
if reserved:
|
|
selectProc.remove(proc)
|
|
else:
|
|
continue
|
|
|
|
time.sleep(float(random.randint(50, 500)) / 1000.0)
|
|
info.handle.send_signal(ProcessInfo.stateToSignal(op))
|
|
info.state = op
|
|
successfulAttempts += 1
|
|
print("Sending {} to process {}".format(ProcessInfo.stateToSignalStr(op), proc))
|
|
|
|
# if op == ProcessState.TERMINATED and proc not in terminatedProcs:
|
|
# if len(terminatedProcs) < maxTerminatedProcesses:
|
|
|
|
# terminatedProcs.add(proc)
|
|
|
|
# if len(terminatedProcs) == maxTerminatedProcesses:
|
|
# break
|
|
|
|
def remainingUnterminatedProcesses(self):
|
|
remaining = []
|
|
for pid, info in self.processesInfo.items():
|
|
with info.lock:
|
|
if info.state != ProcessState.TERMINATED:
|
|
remaining.append(pid)
|
|
|
|
return None if len(remaining) == 0 else remaining
|
|
|
|
def terminateAllProcesses(self):
|
|
for _, info in self.processesInfo.items():
|
|
with info.lock:
|
|
if info.state != ProcessState.TERMINATED:
|
|
if info.state == ProcessState.STOPPED:
|
|
info.handle.send_signal(ProcessInfo.stateToSignal(ProcessState.RUNNING))
|
|
|
|
info.handle.send_signal(ProcessInfo.stateToSignal(ProcessState.TERMINATED))
|
|
|
|
return False
|
|
|
|
def continueStoppedProcesses(self):
|
|
for _, info in self.processesInfo.items():
|
|
with info.lock:
|
|
if info.state != ProcessState.TERMINATED:
|
|
if info.state == ProcessState.STOPPED:
|
|
info.handle.send_signal(ProcessInfo.stateToSignal(ProcessState.RUNNING))
|
|
|
|
def run(self):
|
|
if self.concurrency > 1:
|
|
threads = [threading.Thread(target=self.stress) for _ in range(self.concurrency)]
|
|
[p.start() for p in threads]
|
|
[p.join() for p in threads]
|
|
else:
|
|
self.stress()
|
|
|
|
def startProcesses(processes, runscript, hostsFilePath, configFilePath, outputDir):
|
|
runscriptPath = os.path.abspath(runscript)
|
|
if not os.path.isfile(runscriptPath):
|
|
raise Exception("`{}` is not a file".format(runscriptPath))
|
|
|
|
if os.path.basename(runscriptPath) != 'run.sh':
|
|
raise Exception("`{}` is not a runscript".format(runscriptPath))
|
|
|
|
outputDirPath = os.path.abspath(outputDir)
|
|
if not os.path.isdir(outputDirPath):
|
|
raise Exception("`{}` is not a directory".format(outputDirPath))
|
|
|
|
baseDir, _ = os.path.split(runscriptPath)
|
|
bin_cpp = os.path.join(baseDir, "bin", "da_proc")
|
|
bin_java = os.path.join(baseDir, "bin", "da_proc.jar")
|
|
|
|
if os.path.exists(bin_cpp):
|
|
cmd = [bin_cpp]
|
|
elif os.path.exists(bin_java):
|
|
cmd = ['java', '-jar', bin_java]
|
|
else:
|
|
raise Exception("`{}` could not find a binary to execute. Make sure you build before validating".format(runscriptPath))
|
|
|
|
procs = []
|
|
for pid in range(1, processes+1):
|
|
cmd_ext = ['--id', str(pid),
|
|
'--hosts', hostsFilePath,
|
|
'--barrier', '{}:{}'.format(BARRIER_IP, BARRIER_PORT),
|
|
'--signal', '{}:{}'.format(SIGNAL_IP, SIGNAL_PORT),
|
|
'--output', os.path.join(outputDirPath, 'proc{:02d}.output'.format(pid)),
|
|
configFilePath]
|
|
|
|
stdoutFd = open(os.path.join(outputDirPath, 'proc{:02d}.stdout'.format(pid)), "w")
|
|
stderrFd = open(os.path.join(outputDirPath, 'proc{:02d}.stderr'.format(pid)), "w")
|
|
|
|
|
|
procs.append((pid, subprocess.Popen(cmd + cmd_ext, stdout=stdoutFd, stderr=stderrFd)))
|
|
|
|
return procs
|
|
|
|
def main(processes, messages, runscript, broadcastType, logsDir, testConfig):
|
|
# Set tc for loopback
|
|
tc = TC(testConfig['TC'])
|
|
print(tc)
|
|
|
|
# Start the barrier
|
|
initBarrier = barrier.Barrier(BARRIER_IP, BARRIER_PORT, processes)
|
|
initBarrier.listen()
|
|
startTimesFuture = initBarrier.startTimesFuture()
|
|
|
|
initBarrierThread = threading.Thread(target=initBarrier.wait)
|
|
initBarrierThread.start()
|
|
|
|
# Start the finish signal
|
|
finishSignal = finishedSignal.FinishedSignal(SIGNAL_IP, SIGNAL_PORT, processes)
|
|
finishSignal.listen()
|
|
finishSignalThread = threading.Thread(target=finishSignal.wait)
|
|
finishSignalThread.start()
|
|
|
|
if broadcastType == "fifo":
|
|
validation = FifoBroadcastValidation(processes, messages, logsDir)
|
|
else:
|
|
validation = LCausalBroadcastValidation(processes, messages, logsDir)
|
|
|
|
hostsFile, configFile = validation.generateConfig()
|
|
|
|
try:
|
|
# Start the processes and get their PIDs
|
|
procs = startProcesses(processes, runscript, hostsFile.name, configFile.name, logsDir)
|
|
|
|
# Create the stress test
|
|
st = StressTest(procs,
|
|
testConfig['ST']['concurrency'],
|
|
testConfig['ST']['attempts'],
|
|
testConfig['ST']['attemptsDistribution'])
|
|
|
|
for (logicalPID, procHandle) in procs:
|
|
print("Process with logicalPID {} has PID {}".format(logicalPID, procHandle.pid))
|
|
|
|
|
|
initBarrierThread.join()
|
|
print("All processes have been initialized.")
|
|
|
|
st.run()
|
|
print("StressTest is complete.")
|
|
|
|
|
|
print("Resuming stopped processes.")
|
|
st.continueStoppedProcesses()
|
|
|
|
print("Waiting until all running processes have finished broadcasting.")
|
|
finishSignalThread.join()
|
|
|
|
for pid, startTs in OrderedDict(sorted(startTimesFuture.items())).items():
|
|
print("Process {} finished broadcasting {} messages in {} ms".format(pid, messages, finishSignal.endTimestamps()[pid] - startTs))
|
|
|
|
unterminated = st.remainingUnterminatedProcesses()
|
|
if unterminated is not None:
|
|
input('Hit `Enter` to terminate the remaining processes with logicalPIDs {}.'.format(unterminated))
|
|
st.terminateAllProcesses()
|
|
|
|
mutex = threading.Lock()
|
|
|
|
def waitForProcess(logicalPID, procHandle, mutex):
|
|
procHandle.wait()
|
|
|
|
with mutex:
|
|
print("Process {} exited with {}".format(logicalPID, procHandle.returncode))
|
|
|
|
# Monitor which processes have exited
|
|
monitors = [threading.Thread(target=waitForProcess, args=(logicalPID, procHandle, mutex)) for (logicalPID, procHandle) in procs]
|
|
[p.start() for p in monitors]
|
|
[p.join() for p in monitors]
|
|
|
|
input('Hit `Enter` to validate the output')
|
|
print("Result of validation: {}".format(validation.checkAll()))
|
|
|
|
finally:
|
|
if procs is not None:
|
|
for _, p in procs:
|
|
p.kill()
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument(
|
|
"-r",
|
|
"--runscript",
|
|
required=True,
|
|
dest="runscript",
|
|
help="Path to run.sh",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-b",
|
|
"--broadcast",
|
|
choices=["fifo", "lcausal"],
|
|
required=True,
|
|
dest="broadcastType",
|
|
help="Which broadcast implementation to test",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-l",
|
|
"--logs",
|
|
required=True,
|
|
dest="logsDir",
|
|
help="Directory to store stdout, stderr and outputs generated by the processes",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-p",
|
|
"--processes",
|
|
required=True,
|
|
type=int,
|
|
dest="processes",
|
|
help="Number of processes that broadcast",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-m",
|
|
"--messages",
|
|
required=True,
|
|
type=int,
|
|
dest="messages",
|
|
help="Maximum number (because it can crash) of messages that each process can broadcast",
|
|
)
|
|
|
|
results = parser.parse_args()
|
|
|
|
testConfig = {
|
|
# Network configuration using the tc command
|
|
'TC': {
|
|
'delay': ('200ms', '50ms'),
|
|
'loss': ('10%', '25%'),
|
|
'reordering': ('25%', '50%')
|
|
},
|
|
|
|
# StressTest configuration
|
|
'ST': {
|
|
'concurrency' : 8, # How many threads are interferring with the running processes
|
|
'attempts' : 8, # How many interferring attempts each threads does
|
|
'attemptsDistribution' : { # Probability with which an interferring thread will
|
|
'STOP': 0.48, # select an interferring action (make sure they add up to 1)
|
|
'CONT': 0.48,
|
|
'TERM':0.04
|
|
}
|
|
}
|
|
}
|
|
|
|
main(results.processes, results.messages, results.runscript, results.broadcastType, results.logsDir, testConfig)
|