BastliBridge

A bot framework bridgin multiple IM protocols, and mail
git clone git://xatko.vsos.ethz.ch/BastliBridge.git
Log | Files | Refs | Submodules

commit 993b7faa8d93751bdf1d31714e655e61b5920c7e
parent cd84bd01421aebb3f1455d3573c77ea23f88c2e3
Author: Dominik Schmidt <dominik@schm1dt.ch>
Date:   Sat, 22 Sep 2018 09:31:25 +0200

Try to keep endpoints always connected

Diffstat:
src/bastlibridge/manager.d | 87+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------
1 file changed, 67 insertions(+), 20 deletions(-)

diff --git a/src/bastlibridge/manager.d b/src/bastlibridge/manager.d @@ -1,4 +1,5 @@ module bastlibridge.manager; +import bastlibridge.util; import bastlibridge.base; import std.format; import core.thread; @@ -30,7 +31,7 @@ class Manager{ endpoint.stop(); } void start(){ - endpoint.open(); + tryNTimes!((){endpoint.open();},(Exception e){warning(e.toString); Thread.sleep(1.dur!"seconds");})(5); thread.start(); } } @@ -117,35 +118,49 @@ class Manager{ void handleExit(string name){ auto p=getEndpoint(name); p.thread.join(); - removeEndpoint(name); if(p.endpoint.restart){ auto process=addEndpoint(p.endpoint.cmdline); + replaceEndpoint(name, process); process.start(); } + else{ + removeEndpoint(name); + } } - auto serve(){ + void serve(){ trace("waiting for messages"); bool finished=false; - while(!finished){ - receive( - (string name){ - trace("Manager is terminating thread ", name); - handleExit(name); - trace("Current endpoint count ", endpoints.length); - }, - (Manager.Control mc){ - if(mc==Control.SHUTDOWN){ - info("Shutting down due to illegal endpoint exit"); - soft_kill(); - finished=true; + try{ + while(!finished){ + receive( + (string name){ + trace("Manager is terminating thread ", name); + handleExit(name); + trace("Current endpoint count ", endpoints.length); + }, + (Manager.Control mc){ + if(mc==Control.SHUTDOWN){ + info("Shutting down due to illegal endpoint exit"); + soft_kill(); + finished=true; + } } + ); + if(endpoints.length==0){ + finished=true; } - ); - if(endpoints.length==0){ - finished=true; } } + catch(Exception e){ + error("Main thread dun goofed ", e.toString); + teardown(); + foreach(p; endpoints){ + p.thread.join(); + } + + return; + } info("All processes stopped. We will now exit"); } @@ -156,8 +171,7 @@ class Manager{ } } - void removeEndpoint(in char[] name){ - trace("Removing endpoint ", name); + void disconnectEndpoint(in char[] name){ synchronized(endpoints_lock.reader){ synchronized(LUT_lock.writer){ foreach(port; endpoints[name].endpoint.ports){ @@ -165,11 +179,34 @@ class Manager{ } } } + } + + void reconnectEndpoint(in char[] name, Endpoint newep){ + synchronized(endpoints_lock.reader){ + synchronized(LUT_lock.writer){ + foreach(port; endpoints[name].endpoint.ports){ + LUT.reconnectAll(port, Port(newep, port.chan)); + } + } + } + } + + void removeEndpoint(in char[] name){ + disconnectEndpoint(name); + trace("Removing endpoint ", name); synchronized(endpoints_lock.writer){ endpoints.remove(cast(string)name); } } + void replaceEndpoint(string name, Process newproc){ + reconnectEndpoint(name,newproc.endpoint); + trace("Reconnecting endpoint ", name); + synchronized(endpoints_lock.writer){ + endpoints[name]=newproc; + } + } + void teardown(){ synchronized(endpoints_lock.reader){ endpoints.byValue.each!(a=>a.endpoint.stop()); @@ -253,6 +290,16 @@ struct LookupTable{ } } + void reconnectAll(Port a, Port b){ + lut[b]=lut[a]; + foreach(ref ports; lut.byValue){ + if(ports.canFind(a)){ + ports~=b; + } + } + disconnectAll(a); + } + void disconnect(Port a, Port b){ disconnectDirected(a,b); disconnectDirected(b,a);