manager.d (6689B)
1 module bastlibridge.manager; 2 import bastlibridge.util; 3 import bastlibridge.base; 4 import std.format; 5 import core.thread; 6 import core.sync.rwmutex; 7 import std.array; 8 import std.exception; 9 import std.range; 10 import std.stdio; 11 import std.concurrency; 12 import std.algorithm; 13 import std.experimental.logger; 14 15 16 17 class Manager{ 18 19 enum Control{ 20 SHUTDOWN 21 } 22 23 struct Process{ 24 Endpoint endpoint; 25 Thread thread; 26 this(Endpoint ep){ 27 endpoint=ep; 28 thread=new Thread(&ep.thread); 29 } 30 void stop(){ 31 endpoint.stop(); 32 } 33 void start(){ 34 tryNTimes!((){endpoint.open();},(Exception e){warning(e.toString); Thread.sleep(1.dur!"seconds");})(5); 35 thread.start(); 36 } 37 } 38 39 private Process[string] endpoints; 40 41 private LookupTable LUT; 42 43 private ReadWriteMutex LUT_lock; 44 private ReadWriteMutex endpoints_lock; 45 public string savefile; 46 47 package __gshared Tid TID; 48 49 this(){ 50 LUT_lock=new ReadWriteMutex(); 51 endpoints_lock=new ReadWriteMutex(); 52 TID=thisTid(); 53 } 54 55 /** 56 * Syntax: name=basetype:argstring 57 */ 58 Process addEndpoint(string cmdline){ 59 auto ep=createEndpoint(cmdline); 60 auto proc=Process(ep); 61 addProcess(proc); 62 return proc; 63 } 64 65 void addProcess(Process p){ 66 synchronized(endpoints_lock.writer){ 67 endpoints[p.endpoint.name]=p; 68 } 69 } 70 71 Endpoint createEndpoint(string cmdline){ 72 string cmdlinetmp=cmdline; 73 string name,basetype,args; 74 cmdline.formattedRead!"%s=%s:%s"(name,basetype,args); 75 if(name in endpoints){ 76 throw new Exception(format("Endpoint %s already defined", name)); 77 } 78 auto ep=endpointTypes.get(this, basetype, args); 79 ep.name=name; 80 ep.cmdline=cmdlinetmp; 81 return ep; 82 } 83 84 void dumpConfig(scope void delegate(in char[] s) writer){ 85 synchronized(endpoints_lock.reader){ 86 foreach(ep; endpoints.byValue){ 87 writer(ep.endpoint.cmdline); 88 writer("\n"); 89 } 90 } 91 writer("\n"); 92 LUT.dumpConfig(writer); 93 } 94 95 void distribute(Port src, Message msg){ 96 synchronized(LUT_lock.reader){ 97 LUT[src] 98 .each!(a=>a.send(msg)); 99 } 100 } 101 102 auto getEndpoint(in char[] name){ 103 synchronized(endpoints_lock.reader){ 104 return name in endpoints; 105 } 106 } 107 108 void run(){ 109 trace("Firing up the threads"); 110 endpoints.byValue.each!(a=>a.start()); 111 } 112 113 auto soft_kill(){ 114 teardown(); 115 Thread.sleep(1000.dur!"msecs"); 116 } 117 118 void handleExit(string name){ 119 auto p=getEndpoint(name); 120 p.thread.join(); 121 if(p.endpoint.restart){ 122 auto process=addEndpoint(p.endpoint.cmdline); 123 replaceEndpoint(name, process); 124 process.start(); 125 } 126 else{ 127 removeEndpoint(name); 128 } 129 } 130 131 void serve(){ 132 trace("waiting for messages"); 133 bool finished=false; 134 try{ 135 while(!finished){ 136 receive( 137 (string name){ 138 trace("Manager is terminating thread ", name); 139 handleExit(name); 140 trace("Current endpoint count ", endpoints.length); 141 }, 142 (Manager.Control mc){ 143 if(mc==Control.SHUTDOWN){ 144 info("Shutting down due to illegal endpoint exit"); 145 soft_kill(); 146 finished=true; 147 } 148 } 149 ); 150 if(endpoints.length==0){ 151 finished=true; 152 } 153 } 154 } 155 catch(Exception e){ 156 error("Main thread dun goofed ", e.toString); 157 teardown(); 158 foreach(p; endpoints){ 159 p.thread.join(); 160 } 161 162 return; 163 } 164 info("All processes stopped. We will now exit"); 165 } 166 167 void terminate(in char[] name){ 168 auto val=getEndpoint(name); 169 if(val){ 170 val.stop(); 171 } 172 } 173 174 void disconnectEndpoint(in char[] name){ 175 synchronized(endpoints_lock.reader){ 176 synchronized(LUT_lock.writer){ 177 foreach(port; endpoints[name].endpoint.ports){ 178 LUT.disconnectAll(port); 179 } 180 } 181 } 182 } 183 184 void reconnectEndpoint(in char[] name, Endpoint newep){ 185 synchronized(endpoints_lock.reader){ 186 synchronized(LUT_lock.writer){ 187 foreach(port; endpoints[name].endpoint.ports){ 188 LUT.reconnectAll(port, Port(newep, port.chan)); 189 } 190 } 191 } 192 } 193 194 void removeEndpoint(in char[] name){ 195 disconnectEndpoint(name); 196 trace("Removing endpoint ", name); 197 synchronized(endpoints_lock.writer){ 198 endpoints.remove(cast(string)name); 199 } 200 } 201 202 void replaceEndpoint(string name, Process newproc){ 203 reconnectEndpoint(name,newproc.endpoint); 204 trace("Reconnecting endpoint ", name); 205 synchronized(endpoints_lock.writer){ 206 endpoints[name]=newproc; 207 } 208 } 209 210 void teardown(){ 211 synchronized(endpoints_lock.reader){ 212 endpoints.byValue.each!(a=>a.endpoint.stop()); 213 } 214 } 215 216 Port getPort(bool create=false)(in char[] a) 217 out(p){ 218 static if(create){ 219 assert(p.valid()); 220 } 221 } 222 do{ 223 auto s=a.findSplit(":"); 224 if(s[2].length==0){ 225 throw new Exception(format(`Endpoint specifier "%s" malformed`, a)); 226 } 227 228 Process* ep=enforce(getEndpoint(s[0]), format("Endpoint %s not found", s[0])); 229 static if(create){ 230 return ep.endpoint.getNewPort(s[2]); 231 } 232 else{ 233 return ep.endpoint.getPort(s[2]); 234 } 235 } 236 237 238 void save(string file){ 239 info("Saving configuration into ", file); 240 File f=File(file, "w+"); 241 scope(exit)f.close(); 242 dumpConfig((in char[] t)=>f.write(t)); 243 } 244 245 static string LUTMap(string func1, string func2, bool create=false)(){ 246 return format!(` 247 void %s(TA, TB)(TA a, TB b){ 248 Port gp(T)(T t){ 249 static if(is(T == Port)){ 250 return t; 251 } 252 else{ 253 return getPort!(%s)(t); 254 } 255 } 256 Port pa=gp(a),pb=gp(b); 257 assert(pa.valid(), "pa"); 258 assert(pb.valid(), "pb"); 259 trace("Doing a %s on ", a, " and ", b); 260 synchronized(LUT_lock.writer){ 261 LUT.%s(pa,pb); 262 } 263 if(savefile) 264 save(savefile); 265 }`)(func1, create ? "true" : "false", func2, func2); 266 } 267 mixin(LUTMap!("link", "connect", true)()); 268 mixin(LUTMap!("linkDirected", "connectDirected", true)()); 269 mixin(LUTMap!("unlink", "disconnect", false)()); 270 mixin(LUTMap!("unlinkDirected", "disconnectDirected", false)()); 271 } 272 273 struct LookupTable{ 274 275 private Port[][Port] lut; 276 277 void connect(Port a, Port b){ 278 connectDirected(a,b); 279 connectDirected(b,a); 280 } 281 282 void connectDirected(Port a, Port b){ 283 lut[a]~=b; 284 } 285 286 void disconnectAll(Port a){ 287 lut.remove(a); 288 foreach(ref ports; lut.byValue){ 289 ports=ports.remove!(p=>p==a)(); 290 } 291 } 292 293 void reconnectAll(Port a, Port b){ 294 lut[b]=lut[a]; 295 foreach(ref ports; lut.byValue){ 296 if(ports.canFind(a)){ 297 ports~=b; 298 } 299 } 300 disconnectAll(a); 301 } 302 303 void disconnect(Port a, Port b){ 304 disconnectDirected(a,b); 305 disconnectDirected(b,a); 306 } 307 308 void disconnectDirected(Port a, Port b){ 309 auto i=a in lut; 310 if(!i){ 311 return; 312 } 313 *i=remove!(a=>a==b)(*i); 314 } 315 316 void dumpConfig(scope void delegate(in char[] s) writer){ 317 foreach(port; lut.byPair){ 318 auto from=port[0]; 319 foreach(to; port[1]){ 320 formattedWrite(writer,"linkDirected %s %s\n", from.toString, to.toString); 321 } 322 } 323 } 324 325 auto opIndex(Port a){ 326 return lut[a]; 327 } 328 }