1
0
This commit is contained in:
choelzl 2020-11-10 16:58:52 +01:00
parent 9195a7d0c8
commit b14fdc00c9
Signed by: sora
GPG Key ID: A362EA0491E2EEA0
20 changed files with 337 additions and 40 deletions

View File

@ -107,9 +107,7 @@ public class Main {
}else if(t== NetEventType.SEND){
mh_tpe.add(new Pair<>(new Pair<>(ne.message.id,ne.message.src),ne.message.tpe));
}
}, (t,ne) -> {
Logger.error("ERR"+t+" - "+ne.getMessage());
});
}, (t,ne) -> Logger.error("ERR"+t+" - "+ne.getMessage()));
System.out.println("Waiting for all processes for finish initialization");
coordinator.waitOnBarrier();

View File

@ -13,7 +13,20 @@ import java.util.concurrent.*;
import java.util.function.BiConsumer;
/**
* NetManager
*
* Main Manager all the NetEventHandlers.
* Uses a ThreadPool with a TaskQueue to handle Async Events
* Uses Synchronous processing to handler Sync Events
*
* Has {@link BiConsumer} to handle Completion NetEvents and ErrorEvents
* Has 2 {@link ScheduledExecutorService} , one for Conditional Sending/Delivery (500ms) and one for Heartbeats(1s)
*
* Parameters are adjusted from {@link cs451.tools.ParamDetector}
*
* @author C. Hölzl
*/
public abstract class NetManager {
public static int INTERNAL_WAIT;
@ -41,10 +54,21 @@ public abstract class NetManager {
private static BiConsumer<NetEventType,Exception> onErrorHandler;
/**
* Resigters a NetEventHandler to the NetManager
* @param handler NetEventHandler to register
*/
private static void registerNetHandler(NetEventHandlerAbstract handler){
nm_listeners.put(handler.getClass(),handler);
}
/**
* Starts the NetManager
* @param h host (self)
* @param p parser
* @param och completion consumer
* @param oeh error consumer
*/
public static void start(Host h, Parser p, BiConsumer<NetEventType,NetEvent> och, BiConsumer<NetEventType,Exception> oeh) {
onCompleteHandler = och;
onErrorHandler = oeh;
@ -72,6 +96,10 @@ public abstract class NetManager {
), 0, FD_WAIT, TimeUnit.MILLISECONDS);
}
/**
* Stops the NetManager
*/
public static void stop() {
isStopped = true;
nm_listeners.values().forEach(NetEventHandlerAbstract::stop);
@ -81,6 +109,10 @@ public abstract class NetManager {
System.out.println("NetManager handled "+ex.getCompletedTaskCount()+" tasks during this run.");
}
/**
* Checks if the NetManager and its layers are done with their work
* @return true if NM and NEH are done
*/
public static boolean isDone() {
return isStopped || nm_listeners.values().stream().map(NetEventHandlerAbstract::isDone).reduce(ex.getActiveCount()==0 && ex.getQueue().size()==0,(b1, b2) -> b1 && b2);
}
@ -136,6 +168,9 @@ public abstract class NetManager {
//=================================================================================================================
//=================================================================================================================
/**
* Runner for NetEvent instances
*/
private static class NetEventRunner implements Runnable {
private final NetEvent ne;

View File

@ -4,8 +4,17 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
/**
* Message abstraction class
*
* @author C. Hölzl
*/
public class Message implements Comparable<Message> {
/**
* Type of Message
*/
public enum TYPE {
NONE("NONE", 'N'),
ERR("ERR", 'E'),
@ -27,6 +36,20 @@ public class Message implements Comparable<Message> {
public final Integer id;
public TYPE tpe;
/**
* Creates a new message from a given message
* @param m Message to copy
* @return Message
*/
public static Message MSG(Message m) {
return new Message(m);
}
/**
* Build a Data Message
* @param id message id
* @return Message
*/
public static Message DMSG(Integer id){
return new Message(id,TYPE.DATA);
}
@ -34,21 +57,46 @@ public class Message implements Comparable<Message> {
return new Message(id,TYPE.DATA,src);
}
/**
* Build a Heartbeat Message
* @return Message
*/
public static Message HRTB(){
return new Message(0,TYPE.HRTB);
}
/**
* Build a Message of a given type
* @param mess_id Message ID
* @param tpe Message Type
* @return Message
*/
public static Message TMSG(Integer mess_id, TYPE tpe) {
return new Message(mess_id, tpe);
}
/**
* @return Empty Message with no type nor ID
*/
public static Message EMPTY(){
return new Message(-1, TYPE.NONE);
}
/**
* Converts a character to a given type
* @param c char to convert
* @return Type
*/
private static TYPE CharacterToTpe(Character c){
return Arrays.stream(TYPE.values()).filter(type -> type.c==c).findFirst().orElse(TYPE.NONE);
}
/**
* Create a message from a ByteBuffer
* @param b buffer
* @return Message
*/
public static Message FromBuffer(ByteBuffer b) {
Character tpe = b.getChar();
Integer id = b.getInt();
@ -56,13 +104,17 @@ public class Message implements Comparable<Message> {
return new Message(id,CharacterToTpe(tpe),src);
}
/**
* Puts a message into a ByteBuffer
* @param b buffer
*/
public void ToBuffer(ByteBuffer b){
b.putChar(tpe.c);
b.putInt(id);
b.putInt(src);
}
public Message(Message m) {
private Message(Message m) {
this.id = m.id;
this.tpe = m.tpe;
this.src = m.src;
@ -79,12 +131,10 @@ public class Message implements Comparable<Message> {
@Override
public String toString() {
switch(tpe) {
case DATA:
return src+"-"+ id;
default:
return src+"-"+tpe.c + id;
if (tpe == TYPE.DATA) {
return src + "-" + id;
}
return src + "-" + tpe.c + id;
}
@Override

View File

@ -2,6 +2,13 @@ package cs451.net.event;
import cs451.parser.Host;
/**
* NetEvent abstraction class
*
* Holds a {@link cs451.net.event.Message} and a {@link cs451.parser.Host}
*
* @author C. Hölzl
*/
public class NetEvent {
public final Host peer;
@ -22,11 +29,11 @@ public class NetEvent {
}
public static NetEvent Message(Host peer, Message m){
return new NetEvent(peer,new Message(m));
return new NetEvent(peer, Message.MSG(m));
}
public static NetEvent Message(Message m){
return new NetEvent(NO_PEER,new Message(m));
return new NetEvent(NO_PEER,Message.MSG(m));
}
public static NetEvent Message(Integer mess_id){

View File

@ -1,5 +1,11 @@
package cs451.net.event;
/**
* NetEventType
* Represents a type of {@link NetEvent}
*
* @author C. Hölzl
*/
public enum NetEventType {
DLVR,
SEND,

View File

@ -8,47 +8,92 @@ import cs451.parser.Parser;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* NetEventHandler abstraction class
*
* @author C. Hölzl
*/
public abstract class NetEventHandlerAbstract implements NetEventHandlerInterface {
/**
* Deliver & Broadcast Layers to handle resulting operation
*/
private final Class<? extends NetEventHandlerAbstract> deliverLayer;
private final Class<? extends NetEventHandlerAbstract> broadcastLayer;
public final AtomicBoolean active = new AtomicBoolean(true);
/**
* Initialized the main NetEventHandler fields
* @param deliverLayer layer in which NetEvents should be delivered
* @param broadcastLayer layer in which NetEvents should be broadcast
*/
NetEventHandlerAbstract(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {
this.deliverLayer = deliverLayer;
this.broadcastLayer = broadcastLayer;
}
/**
* Delivers a NetEvent Synchronously
* @param ne NetEvent
*/
public void deliverNextSync(NetEvent ne){
NetManager.deliverSync(deliverLayer,ne);
}
/**
* Sends a NetEvent Synchronously
* @param ne NetEvent
*/
public void sendNextSync(NetEvent ne){
NetManager.sendSync(broadcastLayer,ne);
}
/**
* Crashes a NetEvent Synchronously
* @param ne NetEvent
*/
public void crashNextSync(NetEvent ne){
NetManager.crashSync(deliverLayer,ne);
}
/**
* Delivers a NetEvent Asynchronously
* @param ne NetEvent
*/
public void deliverNextAsync(NetEvent ne){
NetManager.deliver(deliverLayer,ne);
}
/**
* Sends a NetEvent Asynchronously
* @param ne NetEvent
*/
public void sendNextAsync(NetEvent ne){
NetManager.send(broadcastLayer,ne);
}
/**
* Crashes a NetEvent Asynchronously
* @param ne NetEvent
*/
public void crashNextAsync(NetEvent ne) {
NetManager.crash(deliverLayer,ne);
}
/**
* Starts the NetEventHandler
* @param h host (self)
* @param p parser
*/
public void start(Host h, Parser p) {
Objects.requireNonNull(h);
Objects.requireNonNull(p);
active.set(true);
}
/**
* Stops the NetEventHandler
*/
public void stop() {
active.set(false);
}

View File

@ -6,21 +6,66 @@ import cs451.parser.Host;
import cs451.parser.Parser;
import cs451.tools.Logger;
/**
* NetEventHandler Interface
*
* @author C. Hölzl
*/
public interface NetEventHandlerInterface {
/**
* Sends Messages on condition (500ms event timer)
*/
default void sendIf(){}
/**
* Delivers Messages on condition (500ms event timer)
*/
default void deliverIf(){}
/**
* Hearth-beat (1s event timer)
*/
default void beat(){}
/**
* Delivers a NetEvent to the next layer synchronously
* @param ne NetEvent
*/
void deliverNextSync(NetEvent ne);
/**
* Sends a NetEvent to the next layer synchronously
* @param ne NetEvent
*/
void sendNextSync(NetEvent ne);
/**
* Crashes a NetEvent to the next layer synchronously
* @param ne NetEvent
*/
void crashNextSync(NetEvent ne);
/**
* Delivers a NetEvent to the next layer asynchronously
* @param ne NetEvent
*/
void deliverNextAsync(NetEvent ne);
/**
* Sends a NetEvent to the next layer asynchronously
* @param ne NetEvent
*/
void sendNextAsync(NetEvent ne);
/**
* Crashes a NetEvent to the next layer asynchronously
* @param ne NetEvent
*/
void crashNextAsync(NetEvent ne);
/**
* Handles a NetEvent
* @param et EventType
* @param ne NetEvent
*/
default void onEvent(NetEventType et, NetEvent ne){
switch (et){
case DLVR: deliver(ne);
@ -35,18 +80,44 @@ public interface NetEventHandlerInterface {
}
}
/**
* Send Event Handler
* @param ne NetEvent
*/
default void send(NetEvent ne){
sendNextSync(ne);
}
/**
* Deliver Event Handler
* @param ne NetEvent
*/
default void deliver(NetEvent ne){
deliverNextSync(ne);
}
/**
* Crash Event Handler
* @param ne NetEvent
*/
default void crash(NetEvent ne){
crashNextSync(ne);
}
/**
* Starts the NetEventHandler
* @param h host (self)
* @param p parser
*/
void start(Host h, Parser p);
/**
* Stops the NetEventHandler
*/
void stop();
/**
* Checks if NetEventHandler is Finished
* @return if it is done with its work
*/
default boolean isDone(){ return true;}
}

View File

@ -6,6 +6,13 @@ import cs451.parser.Parser;
import java.util.List;
/**
* NetEventHandler for Best Effort Broadcast
*
* Send: Send to all hosts (deliver to self)
*
* @author C. Hölzl
*/
public class NetHandlerBEB extends NetEventHandlerAbstract {
private List<Host> hosts;

View File

@ -4,6 +4,13 @@ import cs451.net.NetManager;
import cs451.net.event.NetEvent;
import cs451.net.event.NetEventType;
/**
* NetEventHandler for DEFAULT
*
* Generates errors (No event should land here...)
*
* @author C. Hölzl
*/
public class NetHandlerDFLT extends NetEventHandlerAbstract {
public NetHandlerDFLT(Class<? extends NetEventHandlerAbstract> deliverLayer, Class<? extends NetEventHandlerAbstract> broadcastLayer) {

View File

@ -8,9 +8,17 @@ import cs451.parser.Parser;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* NetEventHandler for Failure Detection
*
* Beat: Sends ping to hosts
* Deliver: Marks peer as alive (Drop HRBT packets)
*
* @author C. Hölzl
*/
public class NetHandlerFD extends NetEventHandlerAbstract {
private Host me;
private List<Host> hosts;
private static final ConcurrentHashMap<Integer,Integer> alive = new ConcurrentHashMap<>();
@ -20,25 +28,19 @@ public class NetHandlerFD extends NetEventHandlerAbstract {
@Override
public void beat() {
hosts.forEach(h->{
if(h.getId() != me.getId()){
alive.computeIfPresent(h.getId(),(k, v) -> {
if(v == 0) return 1;
hosts.forEach(h-> alive.computeIfPresent(h.getId(),(k, v) -> {
if(v == -1) return 1;
if(v > NetManager.FD_MAX_TRIES)
crashNextAsync(NetEvent.Message(h, Message.EMPTY()));
else
sendNextSync(NetEvent.MessageHRTB(h));
return v+1;
});
}
});
return v;
}));
}
@Override
public void deliver(NetEvent ne) {
alive.computeIfPresent(ne.peer.getId(),(k,v)->0);
alive.computeIfPresent(ne.peer.getId(),(k,v)->-1);
switch(ne.message.tpe){
case HRTB:
break;
@ -50,7 +52,6 @@ public class NetHandlerFD extends NetEventHandlerAbstract {
public void start(Host h, Parser p) {
super.start(h, p);
hosts = p.hosts();
me = h;
hosts.forEach(ch-> alive.put(ch.getId(),0));
hosts.stream().filter(ch->ch.getId()!=h.getId()).forEach(ch-> alive.put(ch.getId(),0));
}
}

View File

@ -12,6 +12,16 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* NetEventHandler for First In First Out
*
* DeliverIf: packet from queue satisfies FIFO ordering
*
* Send: sends next packet of current host
* Deliver: Add packet to received packet queue for DeliverIf
*
* @author C. Hölzl
*/
public class NetHandlerFIFO extends NetEventHandlerAbstract {
private final AtomicInteger sn = new AtomicInteger(1);

View File

@ -6,7 +6,13 @@ import cs451.tools.Pair;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* NetEventHandler for Perfect Link
*
* Deliver: Drop Duplicate Packets
*
* @author C. Hölzl
*/
public class NetHandlerPL extends NetEventHandlerAbstract {
private final Set<Pair<Integer,Message>> delivered = ConcurrentHashMap.newKeySet();

View File

@ -14,6 +14,14 @@ import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* NetEventHandler for Socket
*
* Send: Sends Message through socket
* Deliver: Read Message from socket (OWN LOOP)
*
* @author C. Hölzl
*/
public class NetHandlerSCKT extends NetEventHandlerAbstract {
private static final Integer BUFF_SIZE = 128;

View File

@ -11,7 +11,17 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* NetEventHandler for Stuborn Link
*
* SendIf: Sends a message again if not Acknowledged
*
* Send: Add Message to "Awaiting Acknowledge" queue
* Deliver: delivers to next layer (Drops ACK messages and removes peer from "Awaiting Acknowledge"
* Crash: Mark peer as timed out to stop further sends/acknowledgements
*
* @author C. Hölzl
*/
public class NetHandlerSL extends NetEventHandlerAbstract {
private List<Host> hosts;

View File

@ -7,6 +7,16 @@ import cs451.parser.Parser;
import java.util.concurrent.atomic.AtomicInteger;
/**
* NetEventHandler for Top Level
*
* SendIf: based on window, add messages to be sent
*
* Send: SendIf
* Deliver: SendIf
*
* @author C. Hölzl
*/
public class NetHandlerTOPL extends NetEventHandlerAbstract {
private final AtomicInteger toSend = new AtomicInteger(0);

View File

@ -9,7 +9,17 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
/**
* NetEventHandler for Uniform Reliable Broadcast
*
* DeliverIf: pending has been acked by all correct processes (and has not been delivered)
*
* Send: Add to pending and send
* Deliver: Ack and Add to pending
* Crash: Remove host from correct processes
*
* @author C. Hölzl
*/
public class NetHandlerURB extends NetEventHandlerAbstract {

View File

@ -1,5 +1,8 @@
package cs451.tools;
/**
* Logger Class to log events, errors,....
*/
public abstract class Logger {
public enum LOG_LEVEL{

View File

@ -3,6 +3,11 @@ package cs451.tools;
import java.util.ArrayList;
import java.util.List;
/**
* Pair Class used in many scenarios of NEHs
* @param <S>
* @param <T>
*/
public class Pair<S, T> implements Comparable<T> {
private final List<Object> vl = new ArrayList<>();

View File

@ -4,6 +4,9 @@ import cs451.net.NetManager;
import cs451.parser.Host;
import cs451.parser.Parser;
/**
* Automatically Detects best parameters for the system
*/
public abstract class ParamDetector {
private static Integer bound(Integer value, Integer lower, Integer upper){

17
bnr.sh
View File

@ -1,7 +1,12 @@
#!/bin/sh
#!/bin/bash
export _JAVA_OPTIONS="-Xmx16G"
./257844/build.sh
sudo echo 1
echo "Running with $1 processes and $2 messages"
echo "\n\n\n" | ./validate.py -r 257844/run.sh -b fifo -l 257844/bin/logs -p $1 -m $2
if [ $# -eq 2 ]; then
export _JAVA_OPTIONS="-Xmx16G"
./257844/build.sh
sudo echo 1
echo "Running with $1 processes and $2 messages"
yes "" | ./validate.py -r 257844/run.sh -b fifo -l 257844/bin/logs -p $1 -m $2
else
echo "Missing Arguments ..."
echo "Usage: $0 process_count message_count"
fi