BastliBridge

git clone git://xatko.vsos.ethz.ch/BastliBridge.git
Log | Files | Refs | Submodules | README

manager.d (6689B)


      1 module bastlibridge.manager;
      2 import bastlibridge.util;
      3 import bastlibridge.base;
      4 import std.format;
      5 import core.thread;
      6 import core.sync.rwmutex;
      7 import std.array;
      8 import std.exception;
      9 import std.range;
     10 import std.stdio;
     11 import std.concurrency;
     12 import std.algorithm;
     13 import std.experimental.logger;
     14 
     15 
     16 
     17 class Manager{
     18 	
     19 	enum Control{
     20 		SHUTDOWN
     21 	}
     22 	
     23 	struct Process{
     24 		Endpoint endpoint;
     25 		Thread thread;
     26 		this(Endpoint ep){
     27 			endpoint=ep;
     28 			thread=new Thread(&ep.thread);
     29 		}
     30 		void stop(){
     31 			endpoint.stop();
     32 		}
     33 		void start(){
     34 			tryNTimes!((){endpoint.open();},(Exception e){warning(e.toString); Thread.sleep(1.dur!"seconds");})(5);
     35 			thread.start();
     36 		}
     37 	}
     38 	
     39 	private Process[string] endpoints;
     40 	
     41 	private LookupTable LUT;
     42 	
     43 	private ReadWriteMutex LUT_lock;
     44 	private ReadWriteMutex endpoints_lock;
     45 	public string savefile;
     46 	
     47 	package __gshared Tid TID;
     48 	
     49 	this(){
     50 		LUT_lock=new ReadWriteMutex();
     51 		endpoints_lock=new ReadWriteMutex();
     52 		TID=thisTid();
     53 	}
     54 	
     55 	/**
     56 	 * Syntax: name=basetype:argstring
     57 	 */
     58 	Process addEndpoint(string cmdline){
     59 		auto ep=createEndpoint(cmdline);
     60 		auto proc=Process(ep);
     61 		addProcess(proc);
     62 		return proc;
     63 	}
     64 	
     65 	void addProcess(Process p){
     66 		synchronized(endpoints_lock.writer){
     67 			endpoints[p.endpoint.name]=p;
     68 		}
     69 	}
     70 	
     71 	Endpoint createEndpoint(string cmdline){
     72 		string cmdlinetmp=cmdline;
     73 		string name,basetype,args;
     74 		cmdline.formattedRead!"%s=%s:%s"(name,basetype,args);
     75 		if(name in endpoints){
     76 			throw new Exception(format("Endpoint %s already defined", name));
     77 		}
     78 		auto ep=endpointTypes.get(this, basetype, args);
     79 		ep.name=name;
     80 		ep.cmdline=cmdlinetmp;
     81 		return ep;
     82 	}
     83 	
     84 	void dumpConfig(scope void delegate(in char[] s) writer){
     85 		synchronized(endpoints_lock.reader){
     86 			foreach(ep; endpoints.byValue){
     87 				writer(ep.endpoint.cmdline);
     88 				writer("\n");
     89 			}
     90 		}
     91 		writer("\n");
     92 		LUT.dumpConfig(writer);
     93 	}
     94 	
     95 	void distribute(Port src, Message msg){
     96 		synchronized(LUT_lock.reader){
     97 			LUT[src]
     98 				.each!(a=>a.send(msg));
     99 		}
    100 	}
    101 	
    102 	auto getEndpoint(in char[] name){
    103 		synchronized(endpoints_lock.reader){
    104 			return name in endpoints;
    105 		}
    106 	}
    107 	
    108 	void run(){
    109 		trace("Firing up the threads");
    110 		endpoints.byValue.each!(a=>a.start());
    111 	}
    112 	
    113 	auto soft_kill(){
    114 		teardown();
    115 		Thread.sleep(1000.dur!"msecs");
    116 	}
    117 	
    118 	void handleExit(string name){
    119 		auto p=getEndpoint(name);
    120 		p.thread.join();
    121 		if(p.endpoint.restart){
    122 			auto process=addEndpoint(p.endpoint.cmdline);
    123 			replaceEndpoint(name, process);
    124 			process.start();
    125 		}
    126 		else{
    127 			removeEndpoint(name);
    128 		}
    129 	}
    130 	
    131 	void serve(){
    132 		trace("waiting for messages");
    133 		bool finished=false;
    134 		try{
    135 			while(!finished){
    136 				receive(
    137 					(string name){
    138 						trace("Manager is terminating thread ", name);
    139 						handleExit(name);
    140 						trace("Current endpoint count ", endpoints.length);
    141 					},
    142 					(Manager.Control mc){
    143 						if(mc==Control.SHUTDOWN){
    144 							info("Shutting down due to illegal endpoint exit");
    145 							soft_kill();
    146 							finished=true;
    147 						}
    148 					}
    149 				);
    150 				if(endpoints.length==0){
    151 					finished=true;
    152 				}
    153 			}
    154 		}
    155 		catch(Exception e){
    156 			error("Main thread dun goofed ", e.toString);
    157 			teardown();
    158 			foreach(p; endpoints){
    159 				p.thread.join();
    160 			}
    161 			
    162 			return;
    163 		}
    164 		info("All processes stopped. We will now exit");
    165 	}
    166 	
    167 	void terminate(in char[] name){
    168 		auto val=getEndpoint(name);
    169 		if(val){
    170 			val.stop();
    171 		}
    172 	}
    173 	
    174 	void disconnectEndpoint(in char[] name){
    175 		synchronized(endpoints_lock.reader){
    176 			synchronized(LUT_lock.writer){
    177 				foreach(port; endpoints[name].endpoint.ports){
    178 					LUT.disconnectAll(port);
    179 				}
    180 			}
    181 		}
    182 	}
    183 	
    184 	void reconnectEndpoint(in char[] name, Endpoint newep){
    185 		synchronized(endpoints_lock.reader){
    186 			synchronized(LUT_lock.writer){
    187 				foreach(port; endpoints[name].endpoint.ports){
    188 					LUT.reconnectAll(port, Port(newep, port.chan));
    189 				}
    190 			}
    191 		}
    192 	}
    193 	
    194 	void removeEndpoint(in char[] name){
    195 		disconnectEndpoint(name);
    196 		trace("Removing endpoint ", name);
    197 		synchronized(endpoints_lock.writer){
    198 			endpoints.remove(cast(string)name);
    199 		}
    200 	}
    201 	
    202 	void replaceEndpoint(string name, Process newproc){
    203 		reconnectEndpoint(name,newproc.endpoint);
    204 		trace("Reconnecting endpoint ", name);
    205 		synchronized(endpoints_lock.writer){
    206 			endpoints[name]=newproc;
    207 		}
    208 	}
    209 	
    210 	void teardown(){
    211 		synchronized(endpoints_lock.reader){
    212 			endpoints.byValue.each!(a=>a.endpoint.stop());
    213 		}
    214 	}
    215 	
    216 	Port getPort(bool create=false)(in char[] a)
    217 	out(p){
    218 		static if(create){
    219 			assert(p.valid());
    220 		}
    221 	}
    222 	do{
    223 		auto s=a.findSplit(":");
    224 		if(s[2].length==0){
    225 			throw new Exception(format(`Endpoint specifier "%s" malformed`, a));
    226 		}
    227 		
    228 		Process* ep=enforce(getEndpoint(s[0]), format("Endpoint %s not found", s[0]));
    229 		static if(create){
    230 			return ep.endpoint.getNewPort(s[2]);
    231 		}
    232 		else{
    233 			return ep.endpoint.getPort(s[2]);
    234 		}
    235 	}
    236 	
    237 	
    238 	void save(string file){
    239 		info("Saving configuration into ", file);
    240 		File f=File(file, "w+");
    241 		scope(exit)f.close();
    242 		dumpConfig((in char[] t)=>f.write(t));
    243 	}
    244 	
    245 	static string LUTMap(string func1, string func2, bool create=false)(){
    246 		return format!(`
    247 			void %s(TA, TB)(TA a, TB b){
    248 					Port gp(T)(T t){
    249 						static if(is(T == Port)){
    250 							return t;
    251 						}
    252 						else{
    253 							return getPort!(%s)(t);
    254 						}
    255 					}
    256 					Port pa=gp(a),pb=gp(b);
    257 					assert(pa.valid(), "pa");
    258 					assert(pb.valid(), "pb");
    259 					trace("Doing a %s on ", a, " and ", b);
    260 				synchronized(LUT_lock.writer){
    261 					LUT.%s(pa,pb);
    262 				}
    263 				if(savefile)
    264 					save(savefile);
    265 			}`)(func1, create ? "true" : "false", func2, func2);
    266 	}
    267 	mixin(LUTMap!("link", "connect", true)());
    268 	mixin(LUTMap!("linkDirected", "connectDirected", true)());
    269 	mixin(LUTMap!("unlink", "disconnect", false)());
    270 	mixin(LUTMap!("unlinkDirected", "disconnectDirected", false)());
    271 }
    272 
    273 struct LookupTable{
    274 	
    275 	private Port[][Port] lut;
    276 
    277 	void connect(Port a, Port b){
    278 		connectDirected(a,b);
    279 		connectDirected(b,a);
    280 	}
    281 	
    282 	void connectDirected(Port a, Port b){
    283 		lut[a]~=b;
    284 	}
    285 	
    286 	void disconnectAll(Port a){
    287 		lut.remove(a);
    288 		foreach(ref ports; lut.byValue){
    289 			ports=ports.remove!(p=>p==a)();
    290 		}
    291 	}
    292 	
    293 	void reconnectAll(Port a, Port b){
    294 		lut[b]=lut[a];
    295 		foreach(ref ports; lut.byValue){
    296 			if(ports.canFind(a)){
    297 				ports~=b;
    298 			}
    299 		}
    300 		disconnectAll(a);
    301 	}
    302 	
    303 	void disconnect(Port a, Port b){
    304 		disconnectDirected(a,b);
    305 		disconnectDirected(b,a);
    306 	}
    307 	
    308 	void disconnectDirected(Port a, Port b){
    309 		auto i=a in lut;
    310 		if(!i){
    311 			return;
    312 		}
    313 		*i=remove!(a=>a==b)(*i);
    314 	}
    315 	
    316 	void dumpConfig(scope void delegate(in char[] s) writer){
    317 		foreach(port; lut.byPair){
    318 			auto from=port[0];
    319 			foreach(to; port[1]){
    320 				formattedWrite(writer,"linkDirected %s %s\n", from.toString, to.toString);
    321 			}
    322 		}
    323 	}
    324 	
    325 	auto opIndex(Port a){
    326 		return lut[a];
    327 	}
    328 }