base.d (4131B)
1 module bastlibridge.base; 2 public import bastlibridge.manager; 3 import bastlibridge.base; 4 import std.datetime : SysTime,Clock; 5 import std.exception; 6 import std.concurrency; 7 import std.format; 8 import std.algorithm; 9 import std.typecons; 10 import std.array; 11 import core.sync.mutex; 12 import std.experimental.logger; 13 14 15 abstract class Endpoint{ 16 alias channelID=const(char)[]; 17 18 package Manager manager; 19 package string name; 20 package string cmdline; 21 protected Mutex mtx; 22 protected SysTime lastUpdate; 23 bool restart=true; 24 25 protected void heartbeat(){ 26 lastUpdate=Clock.currTime; 27 } 28 29 package string lastSeen(){ 30 return format("Last update on %s: %s (%s ago)", name, 31 lastUpdate.toUTC.toString(), (Clock.currTime-lastUpdate).toString); 32 } 33 34 this(Manager m, string args){ 35 manager=m; 36 heartbeat(); 37 mtx=new Mutex(); 38 } 39 40 abstract void open(); 41 abstract Channel join(string c); 42 abstract void part(Channel c); 43 abstract void sendMessage(Message m, Channel c); 44 abstract void run(); 45 abstract void stop(); 46 47 Channel[string] channelMap; 48 49 Channel getChannel(in char[] c){ 50 auto chan=c in channelMap; 51 if(chan){ 52 return *chan; 53 } 54 return null; 55 } 56 57 Channel createChannel(in char[] c){ 58 auto gc=getChannel(c); 59 if(gc) 60 return gc; 61 62 auto str=c.idup; 63 auto newchan=join(str); 64 channelMap[str]=newchan; 65 newchan._name=str; 66 return newchan; 67 } 68 69 Port getPort(in char[] c){ 70 auto chan=getChannel(c); 71 if(!chan){ 72 throw new Exception(format("Channel %s not found in endpoint", c)); 73 } 74 return getPort(chan); 75 } 76 77 Port getNewPort(in char[] c){ 78 return getPort(createChannel(c)); 79 } 80 81 Port getPort(Channel cid){ 82 return Port(this,cid); 83 } 84 85 void thread(){ 86 info("Running up endpoint"); 87 try{ 88 run(); 89 } 90 catch(Exception e){ 91 warning("Exception caught ", e.toString()); 92 } 93 catch(Error e){ 94 error("Error caught ", e.toString()); 95 trace("Sending shutdown to manager"); 96 send(manager.TID, Manager.Control.SHUTDOWN); 97 trace("Shutdown signal sent"); 98 } 99 info("Endpoint finished"); 100 info("Sending message to ", manager.TID); 101 send(manager.TID, name); 102 info("Sent message to parent thread"); 103 } 104 105 auto ports(){ 106 return channelMap.byValue.map!(a=>Port(this,a)); 107 } 108 109 static string logHook(string fct)(){ 110 return format(`auto %s` 111 ~`(int line = __LINE__, string file = __FILE__, string funcName = __FUNCTION__,` 112 ~`string prettyFuncName = __PRETTY_FUNCTION__, string moduleName = __MODULE__, ` 113 ~`A...)(A args){` 114 ~`return .%s!(line,file,funcName,prettyFuncName,moduleName)` 115 ~`("Endpoint ", name, ": ", args);}`, fct, fct); 116 } 117 118 mixin(logHook!"trace"); 119 mixin(logHook!"info"); 120 mixin(logHook!"warning"); 121 mixin(logHook!"error"); 122 mixin(logHook!"fatal"); 123 } 124 125 abstract class QueuedEndpoint : Endpoint{ 126 Tuple!(Message,Channel)[] msgqueue; 127 this(Manager m, string args){ 128 super(m,args); 129 } 130 final override void sendMessage(Message m, Channel c){ 131 if(mtx.tryLock()){ 132 sendMessageQueueless(m,c); 133 mtx.unlock(); 134 } 135 else{ 136 msgqueue~=tuple(m,c); 137 } 138 } 139 void sendQueue(){ 140 msgqueue.each!(a=>sendMessageQueueless(a[0],a[1])); 141 msgqueue.length=0; 142 } 143 abstract void sendMessageQueueless(Message m, Channel c); 144 } 145 146 abstract class Channel{ 147 package string _name; 148 } 149 150 struct Port{ 151 Endpoint ep; 152 Channel chan; 153 void send(Message m){ 154 ep.sendMessage(m,chan); 155 } 156 string toString() const{ 157 return format("%s:%s", ep.name, chan._name); 158 } 159 bool valid() const{ 160 return ep !is null && chan !is null; 161 } 162 } 163 164 abstract class Message{ 165 Endpoint source; 166 abstract const(char)[] userName(); 167 abstract const(char)[] getMessage(); 168 abstract const(char)[] getChannelName(); 169 abstract bool auth(); 170 abstract SysTime getTime(); 171 abstract void respond(in char[] response); 172 Port getPort(){ 173 return Port(source, source.getChannel(getChannelName())); 174 } 175 } 176 177 struct EndpointTypes{ 178 private Endpoint function(Manager,string)[string] table; 179 180 void add(T)(string str){ 181 table[str]=(Manager m, string args){return new T(m,args);}; 182 } 183 184 Endpoint get(Manager m, string type, string args){ 185 return (*enforce(type in table, format("Endpoint type %s unknown", type)))(m,args); 186 } 187 } 188 189 EndpointTypes endpointTypes;