BastliBridge

git clone git://xatko.vsos.ethz.ch/BastliBridge.git
Log | Files | Refs | Submodules | README

base.d (4131B)


      1 module bastlibridge.base;
      2 public import bastlibridge.manager;
      3 import bastlibridge.base;
      4 import std.datetime : SysTime,Clock;
      5 import std.exception;
      6 import std.concurrency;
      7 import std.format;
      8 import std.algorithm;
      9 import std.typecons;
     10 import std.array;
     11 import core.sync.mutex;
     12 import std.experimental.logger;
     13 
     14 
     15 abstract class Endpoint{
     16 	alias channelID=const(char)[];
     17 	
     18 	package Manager manager;
     19 	package string name;
     20 	package string cmdline;
     21 	protected Mutex mtx;
     22 	protected SysTime lastUpdate;
     23 	bool restart=true;
     24 	
     25 	protected void heartbeat(){
     26 		lastUpdate=Clock.currTime;
     27 	}
     28 	
     29 	package string lastSeen(){
     30 		return format("Last update on %s: %s (%s ago)", name, 
     31 			lastUpdate.toUTC.toString(), (Clock.currTime-lastUpdate).toString);
     32 	}
     33 	
     34 	this(Manager m, string args){
     35 		manager=m;
     36 		heartbeat();
     37 		mtx=new Mutex();
     38 	}
     39 	
     40 	abstract void open();
     41 	abstract Channel join(string c);
     42 	abstract void part(Channel c);
     43 	abstract void sendMessage(Message m, Channel c);
     44 	abstract void run();
     45 	abstract void stop();
     46 	
     47 	Channel[string] channelMap;
     48 	
     49 	Channel getChannel(in char[] c){
     50 		auto chan=c in channelMap;
     51 		if(chan){
     52 			return *chan;
     53 		}
     54 		return null;
     55 	}
     56 	
     57 	Channel createChannel(in char[] c){
     58 		auto gc=getChannel(c);
     59 		if(gc)
     60 			return gc;
     61 		
     62 		auto str=c.idup;
     63 		auto newchan=join(str);
     64 		channelMap[str]=newchan;
     65 		newchan._name=str;
     66 		return newchan;
     67 	}
     68 	
     69 	Port getPort(in char[] c){
     70 		auto chan=getChannel(c);
     71 		if(!chan){
     72 			throw new Exception(format("Channel %s not found in endpoint", c));
     73 		}
     74 		return getPort(chan);
     75 	}
     76 	
     77 	Port getNewPort(in char[] c){
     78 		return getPort(createChannel(c));
     79 	}
     80 	
     81 	Port getPort(Channel cid){
     82 		return Port(this,cid);
     83 	}
     84 
     85 	void thread(){
     86 		info("Running up endpoint");
     87 		try{
     88 			run();
     89 		}
     90 		catch(Exception e){
     91 			warning("Exception caught ", e.toString());
     92 		}
     93 		catch(Error e){
     94 			error("Error caught ", e.toString());
     95 			trace("Sending shutdown to manager");
     96 			send(manager.TID, Manager.Control.SHUTDOWN);
     97 			trace("Shutdown signal sent");
     98 		}
     99 		info("Endpoint finished");
    100 		info("Sending message to ", manager.TID);
    101 		send(manager.TID, name);
    102 		info("Sent message to parent thread");
    103 	}
    104 	
    105 	auto ports(){
    106 		return channelMap.byValue.map!(a=>Port(this,a));
    107 	}
    108 	
    109 	static string logHook(string fct)(){
    110 		return format(`auto %s`
    111 		~`(int line = __LINE__, string file = __FILE__, string funcName = __FUNCTION__,`
    112 		~`string prettyFuncName = __PRETTY_FUNCTION__, string moduleName = __MODULE__, `
    113 		~`A...)(A args){`
    114 		~`return .%s!(line,file,funcName,prettyFuncName,moduleName)`
    115 		~`("Endpoint ", name, ": ", args);}`, fct, fct);
    116 	}
    117 	
    118 	mixin(logHook!"trace");
    119 	mixin(logHook!"info");
    120 	mixin(logHook!"warning");
    121 	mixin(logHook!"error");
    122 	mixin(logHook!"fatal");
    123 }
    124 
    125 abstract class QueuedEndpoint : Endpoint{
    126 	Tuple!(Message,Channel)[] msgqueue;
    127 	this(Manager m, string args){
    128 		super(m,args);
    129 	}
    130 	final override void sendMessage(Message m, Channel c){
    131 		if(mtx.tryLock()){
    132 			sendMessageQueueless(m,c);
    133 			mtx.unlock();
    134 		}
    135 		else{
    136 			msgqueue~=tuple(m,c);
    137 		}
    138 	}
    139 	void sendQueue(){
    140 		msgqueue.each!(a=>sendMessageQueueless(a[0],a[1]));
    141 		msgqueue.length=0;
    142 	}
    143 	abstract void sendMessageQueueless(Message m, Channel c);
    144 }
    145 
    146 abstract class Channel{
    147 	package string _name;
    148 }
    149 
    150 struct Port{
    151 	Endpoint ep;
    152 	Channel chan;
    153 	void send(Message m){
    154 		ep.sendMessage(m,chan);
    155 	}
    156 	string toString() const{
    157 		return format("%s:%s", ep.name, chan._name);
    158 	}
    159 	bool valid() const{
    160 		return ep !is null && chan !is null;
    161 	}
    162 }
    163 
    164 abstract class Message{
    165 	Endpoint source;
    166 	abstract const(char)[] userName();
    167 	abstract const(char)[] getMessage();
    168 	abstract const(char)[] getChannelName();
    169 	abstract bool auth();
    170 	abstract SysTime getTime();
    171 	abstract void respond(in char[] response);
    172 	Port getPort(){
    173 		return Port(source, source.getChannel(getChannelName()));
    174 	}
    175 }
    176 
    177 struct EndpointTypes{
    178 	private Endpoint function(Manager,string)[string] table;
    179 	
    180 	void add(T)(string str){
    181 		table[str]=(Manager m, string args){return new T(m,args);};
    182 	}
    183 	
    184 	Endpoint get(Manager m, string type, string args){
    185 		return (*enforce(type in table, format("Endpoint type %s unknown", type)))(m,args);
    186 	}
    187 }
    188 
    189 EndpointTypes endpointTypes;