BastliBridge

A bot framework bridgin multiple IM protocols, and mail
git clone git://xatko.vsos.ethz.ch/BastliBridge.git
Log | Files | Refs | Submodules

commit d9629eadead38454b7eea50bf4182b8e5e7554ae
parent 53716243fbcf322b2e0d5551436d6ddc0960e115
Author: Dominik Schmidt <dominik@schm1dt.ch>
Date:   Wed,  5 Sep 2018 13:26:52 +0200

More improvements

Diffstat:
Makefile | 8++++----
src/bastlibridge/base.d | 15+++++++++++++--
src/bastlibridge/bot.d | 542+------------------------------------------------------------------------------
src/bastlibridge/interfaces/irc.d | 5++++-
src/bastlibridge/manager.d | 49++++++++++++++++++++++++++++++++++++++-----------
src/bastlibridge/util.d | 21+++++++++++++++++++++
6 files changed, 82 insertions(+), 558 deletions(-)

diff --git a/Makefile b/Makefile @@ -1,7 +1,7 @@ DMD ?= ldmd2 -DEBUG ?= -g +DEBUG ?= -g -Xcc=-rdynamic RELEASE ?= -release -O -d-version=StdLoggerDisableTrace -DFLAGS ?= +DFLAGS ?= INCLUDES = Dirk/source/ src Dirk/libev Dirk/ssl SRC := $(wildcard src/bastlibridge/*.d) $(wildcard src/telegram/*.d) $(wildcard src/bastlibridge/*/*.d) $(wildcard Dirk/source/*/*.d) Dirk/libev/deimos/ev.d TARGET = bastlibridge @@ -20,10 +20,10 @@ $(TARGET): $(patsubst %.d, %.o, $(SRC)) $(DMD) $^ $(_DFLAGS) $(RELEASE) -of$@ strip -s $@ $(TARGET)-debug: $(patsubst %.d, %-debug.o, $(SRC)) - $(DMD) $^ $(_DFLAGS) $(DEBUG) -Xcc=-rdynamic -of$@ + $(DMD) $^ $(_DFLAGS) $(DEBUG) -of$@ $(TARGET2)-debug: $(patsubst %.d, %-debug.o, $(SRC2)) - $(DMD) $^ -I src $(DEBUG) -Xcc=-rdynamic -of$@ + $(DMD) $^ -I src $(DEBUG) -of$@ $(TARGET2): $(patsubst %.d, %.o, $(SRC2)) $(DMD) $^ -I src $(RELEASE) -of$@ diff --git a/src/bastlibridge/base.d b/src/bastlibridge/base.d @@ -83,7 +83,18 @@ abstract class Endpoint{ void thread(){ info("Running up endpoint"); - run(); + try{ + run(); + } + catch(Exception e){ + warning("Exception caught ", e.toString()); + } + catch(Error e){ + error("Error caught ", e.toString()); + trace("Sending shutdown to manager"); + send(manager.TID, Manager.Control.SHUTDOWN); + trace("Shutdown signal sent"); + } info("Endpoint finished"); info("Sending message to ", manager.TID); send(manager.TID, name); @@ -143,7 +154,7 @@ struct Port{ string toString() const{ return format("%s:%s", ep.name, chan._name); } - bool valid(){ + bool valid() const{ return ep !is null && chan !is null; } } diff --git a/src/bastlibridge/bot.d b/src/bastlibridge/bot.d @@ -93,7 +93,7 @@ struct IDCounter(T){ static this(){ globalCommands.add!((Message m, in char[] a, in char[] b){m.source.manager.link(a,b);})("link"); globalCommands.add!((Message m, in char[] a, in char[] b){m.source.manager.linkDirected(a,b);})("linkDirected"); - globalCommands.add!((Message m, in char[] b){m.source.manager.link(m.getPort(),b);})("linkWith"); + globalCommands.add!((Message m, in char[] b){m.source.manager.link(m.source.name~":"~m.getChannelName(),b);})("linkWith"); globalCommands.add!((Message m){m.source.stop();})("quit"); globalCommands.add!((Message m){m.source.manager.teardown();})("teardown"); globalCommands.add!((Message m, in char[] other){m.respond(enforce(m.source.manager.getEndpoint(other),"Endpoint unknown").endpoint.lastSeen());})("lastUpdate"); @@ -105,9 +105,7 @@ static this(){ * based on a name, e.g. "telegram" → new Telegram() * */ - - - + static this(){ endpointTypes.add!Telegram("telegram"); endpointTypes.add!IRC("irc"); @@ -150,539 +148,3 @@ void main(string[] args){ } m.serve(); } - -/* - - -struct LookupTable{ - private long[][string] _irc; - private string[][long] _telegram; - - void connect(string irc, long telegram){ - _irc[irc]~=telegram; - _telegram[telegram]~=irc; - } - void disconnect(string irc, long telegram){ - auto i=irc in _irc; - *i=remove!(a=>a==telegram)(*i); - auto t=telegram in _telegram; - *t=remove!(a=>a==irc)(*t); - } - - long[] irc(in char[] irc){ - return _irc.get(cast(string)irc,[]); - } - - string[] telegram(in long telegram){ - return _telegram.get(telegram,[]); - } - auto ircChannels(){ - return _irc.byKey; - } - auto telegramChannels(){ - return _telegram.byKey; - } - - auto links(){ - return _irc.byPair.map!(a=>a[1].map!(b=>tuple(a[0],b))).joiner; - } -} - - -struct Watcher{ - ev_io io; - void delegate(int revents) cb; - extern(C) static void callback(ev_loop_t* loop, ev_io *io, int revents){ - auto watcher=cast(Watcher*) io; - watcher.cb(revents); - } - this(Socket sock, typeof(this.cb) cb){ - this(sock.handle,cb); - } - this(int fd, typeof(this.cb) cb){ - this.cb=cb; - ev_io_init(&io, &callback, fd, EV_READ); - } -} - -struct Bot{ - Socket ircSocket; - Socket controlSocket; - string controlPath; - Address controlAddress; - IrcClient ircClient; - Telegram telegram, telegram_listener; - LookupTable lut; - Watcher w_irc,w_tele,w_ctrl; - ev_loop_t *eventloop; - string proxy_url; - uint max_retries = 5; - - string[long] telegram_channels; - - private Address delegate() _ircAddressgen; - - @property void ircAddresses(Range)(Range r) if(isInputRange!(Range)&&is(ElementType!Range: Address)){ - static if(isInfinite!Range){ - _ircAddressgen=(){ - auto res=r.front; - r.popFront(); - return res; - }; - } - else{ - ircAddresses(r.cycle); - } - } - @property auto ircAddresses(){ - import std.range:generate; - return generate!(()=>_ircAddressgen()); - } - - Address ircAddress(){ - return _ircAddressgen(); - } - - struct LastUpdate{ - SysTime Telegram,IRC; - void updateTelegram(){ - Telegram=Clock.currTime(); - } - void updateIRC(){ - IRC=Clock.currTime(); - } - string toString(string separator=", "){ - auto now=Clock.currTime; - string str=format!( - "Last Update received from IRC: %s UTC, that is %s ago%s"~ - "Last update received from Telegram: %s UTC, that is %s ago" - )(IRC.toUTC.toString(), (now-IRC).toString, - separator, - Telegram.toUTC.toString, (now-Telegram).toString); - return str; - } - } - - LastUpdate lastUpdate; - - this(string tgramApiKey){ - telegram=Telegram("253031348:AAGn4dHCPwFDSN4Bmt4ZWc3bIAmARmzsf9k"); - telegram_listener=Telegram("253031348:AAGn4dHCPwFDSN4Bmt4ZWc3bIAmARmzsf9k"); - } - - static immutable string unknown_username="Unknown"; - static string TelegramUser(JSONValue User){ - string username=unknown_username; - foreach(key; ["username", "first_name", "last_name"]){ - if(key in User){ - username=User[key].str; - break; - } - } - return username; - } - - string formatProxyURL(string file_id){ - return proxy_url~file_id; - } - - string locationToOSMUrl(JSONValue loc){ - return locationToOSMUrl(loc["latitude"].floating, loc["longitude"].floating); - } - string locationToOSMUrl(float lat, float lng){ - return format!"https://www.openstreetmap.org/#map=%f/%f"(lat, lng); - } - - string TelegramUserToIRC(JSONValue user){ - dchar mode=' '; - if(user["is_bot"].type==JSON_TYPE.TRUE){ - mode='*'; - } - return "<"~mode.to!string~TelegramUser(user)~">"; - } - - void TelegramToIRC(JSONValue Message, ref Appender!string app){ - string username="< "~unknown_username~">"; - if(auto from="from" in Message){ - app~=TelegramUserToIRC(*from); - app~=" "; - } - if(auto fwd="forward_from" in Message){ - app~=TelegramUserToIRC(*fwd); - app~=" "; - } - foreach(t; ["text", "caption"]){ - if(auto tv=t in Message){ - app~=(*tv).str; - app~=" "; - } - } - - if(auto photo="photo" in Message){ - string fid; - long size=long.min; - foreach(p; (*photo).array){ - if(p["file_size"].integer>size){ - size=p["file_size"].integer; - fid=p["file_id"].str; - } - } - app~=formatProxyURL(fid); - app~=" "; - } - - foreach(t; ["audio", "document", "sticker", "video", "voice"]){ - if(auto tv=t in Message){ - app~=formatProxyURL((*tv)["file_id"].str); - app~=" "; - } - } - if(auto location="location" in Message){ - app~=locationToOSMUrl(*location); - app~=" "; - } - if("contact" in Message){ - auto c=Message["contact"]; - app~=c["first_name"].str; - app~=" "; - if("last_name" in c){ - app~=c["last_name"].str; - app~=" "; - } - app~="("; - app~=c["phone_number"].str; - app~=") "; - } - if(auto venue="venue" in Message){ - app~=(*venue).str; - app~=locationToOSMUrl((*venue)["location"]); - app~=" "; - } - if(auto jv="reply_to_message" in Message){ - TelegramToIRC(*jv, app); - } - } - - string TelegramToIRC(JSONValue Message){ - Appender!string app; - - TelegramToIRC(Message, app); - - return app.data[0..$-1]; - } - - void initialize(){ - auto af=ircAddress.addressFamily; - ircSocket=new SslSocket(af); - ircClient=new IrcClient(ircSocket); - - controlAddress = new UnixAddress(controlPath); - controlSocket = new Socket(AddressFamily.UNIX, SocketType.STREAM); - - //lut.connect("#bastli_wasserstoffreakteur", -8554836); - - - ircClient.realName="Bastli"; - ircClient.userName="bastli"; - ircClient.nickName="bastli"; - ircClient.onConnect~=((){ - info("Connected to IRC on ",ircClient.serverAddress); - ev_io_start(eventloop, &w_tele.io); - telegram_listener.triggerUpdates(); - foreach(k;lut.ircChannels){ - ircClient.join(k); - } - }); - ircClient.onMessage~=(u,tt,m){ - trace("IRC received message from "~u.nickName~" on "~tt~": "~m); - string msg=cast(string)m; - if(msg.skipOver("!bastli ")){ - trace("Message is a command"); - try{ - auto split=msg.findSplit(" "); - switch(split[0]){ - case "links": - ircClient.send(tt,lut.irc(tt).map!(a=>telegram.getChatName(a)~"("~a.to!string~")").join(", ")); - break; - case "link": - //link(tt.idup, split[2].to!long); - break; - case "unlink": - unlink(tt.idup, split[2].to!long); - break; - case "list_telegram": - ircClient.send(tt, telegram_channels.byPair.map!(a=>a[1]~"("~a[0].to!string~")").join(", ")); - break; - case "lastUpdate": - ircClient.send(tt, lastUpdate.toString(", ")); - break; - default: - ircClient.send(tt, "Unknown command"); - break; - } - } - catch(Exception e){ - ircClient.send(tt, "Error while executing command: "~e.msg); - warning(e.to!string); - } - } - else{ - foreach(chatid; lut.irc(tt)){ - telegram.send(chatid, "< "~u.nickName~"> "~m); - } - } - }; - - - telegram_listener.onMessage~=(j){ - trace("Received Message "~j.toPrettyString()); - auto jv="text" in j; - string msg=""; - if(jv){ - msg=j["text"].str; - } - - auto chatid=j["chat"]["id"].integer; - if(chatid !in telegram_channels){ - if("title" in j["chat"]){ - telegram_channels[chatid]=j["chat"]["title"].str; - } - else{ - telegram_channels[chatid]=TelegramUser(j["chat"]); - } - } - void forward_msg(){ - auto irc_chans=lut.telegram(chatid); - if(irc_chans.length==0){ - warning("Input on not-connected telegram-channel "~j["chat"].toPrettyString); - } - foreach(chan; irc_chans){ - ircClient.send(chan, TelegramToIRC(j)); - } - } - if(msg.front=='/'){ - trace("Bot command received"); - try{ - auto split=msg.findSplit(" "); - auto cmdbot=split[0].findSplit("@"); - if(cmdbot[1].length!=0 && cmdbot[2]!=telegram.botName){ - trace("Not one of our commands"); - forward_msg(); - } - else{ - switch(cmdbot[0][1..$]){ - case "link": - link(split[2], chatid); - ircClient.notice(split[2], "Linked with "~chatid.to!string); - break; - case "links": - telegram.send(chatid, lut.telegram(chatid).join(", ")); - break; - case "unlink": - unlink(split[2], chatid); - ircClient.notice(split[2], "Unlinked with "~chatid.to!string); - break; - case "lastUpdate": - telegram.send(chatid, lastUpdate.toString("\n")); - break; - default: - //telegram.send(chatid, split[0]~": Unknown command"); - break; - } - } - } - catch(Exception e){ - telegram.send(chatid, "Error executing command: "~e.msg); - warning(e.to!string); - } - } - else{ - forward_msg(); - } - }; - } - - void unlink(string irc, long telegram){ - info("Unlinking ", irc, " from ", telegram); - lut.disconnect(irc, telegram); - if(ircClient.connected()){ - if(lut.irc(irc).length==0){ - ircClient.part(irc); - } - } - } - - void link(string irc, long telegram){ - info("Linking ", irc, " with ", telegram); - lut.connect(irc, telegram); - if(ircClient.connected()){ - ircClient.join(irc); - } - } - - void connect(){ - info("Connecting to Telegram"); - tryNTimes!(()=>telegram.connect(), (e)=>warning(e))(max_retries); - tryNTimes!(()=>telegram_listener.connect(), (e)=>warning(e))(max_retries); - - //ircClient.connect(new InternetAddress("127.0.0.1",6667)); - info("Connecting to IRC on ", ircAddress.toString); - tryNTimes!(()=>ircClient.connect(ircAddress), (e)=>warning(e))(max_retries); - - if(controlPath.exists()){ - remove(controlPath); - } - controlSocket.bind(controlAddress); - controlSocket.listen(10); - } - - void setupEventloop(){ - eventloop=ev_default_loop(0); - - w_irc=Watcher(ircSocket,(revents){ - trace("IRC Update"); - lastUpdate.updateIRC(); - - if(ircClient.read()){ - ev_io_stop(eventloop, &w_irc.io); - } - }); - - w_tele=Watcher(telegram_listener.sock,(revents){ - trace("Telegram Update"); - lastUpdate.updateTelegram(); - - Exception e; - bool res; - try res=telegram_listener.read(); - catch(TelegramException te) error(te); - if(res){ - ev_io_stop(eventloop, &w_tele.io); - ev_io_set(&w_tele.io, telegram_listener.sock.handle, EV_READ); - ev_io_start(eventloop, &w_tele.io); - } - else{ - telegram_listener.triggerUpdates(); - } - }); - - w_ctrl=Watcher(controlSocket.handle, (revents){ - auto ns=controlSocket.accept(); - trace("New control connection "); - Watcher *nw; - nw=new Watcher(ns.handle, (rrevents){ - char[512] buf; - auto ret=ns.receive(buf); - if(ret==0){ - ev_io_stop(eventloop, &nw.io); - delete nw; - } - auto line=buf[0..ret]; - trace("New control command ",buf); - foreach(l; line.splitter("\n")){ - auto split=l.findSplit(" "); - switch(split[0]){ - case "quit": - quit(); - break; - case "link": - auto split2=split[2].findSplit(" "); - link(split2[0].idup, split2[2].to!long); - break; - case "links": - foreach(i,tt; lut._irc.byPair){ - foreach(t; tt){ - ns.send("%s <-> %s\n".format(i,t)); - } - } - break; - case "unlink": - break; - case "": - break; - default: - ns.send(l~": unknown command\n"); - } - } - }); - ev_io_start(eventloop, &nw.io); - }); - } - - void quit(){ - info("Quitting the bot"); - ircClient.quit("Exiting gracefully"); - disconnect(); - stop(); - } - - void start(){ - info("Starting the event-listeners"); - ev_io_start(eventloop, &w_irc.io); - ev_io_start(eventloop, &w_ctrl.io); - } - - void stop(){ - trace("Stopping the Eventloop"); - ev_io_stop(eventloop, &w_tele.io); - ev_io_stop(eventloop, &w_irc.io); - ev_io_stop(eventloop, &w_ctrl.io); - ev_break(eventloop, EVBREAK_ALL); - } - - void run(){ - info("Running the event-loop"); - ev_run(eventloop,0); - } - - void disconnect(){ - telegram.disconnect(); - telegram_listener.disconnect(); - controlSocket.close(); - std.file.remove(controlPath); - } - - void save(File f){ - trace("Saving to file ",f.name); - foreach(link; lut.links){ - f.writeln(link[0], " ", link[1].to!string); - } - } - void load(File f){ - trace("loading from file ",f.name); - foreach(l; f.byLine.filter!(a=>a.length>0)){ - auto split=l.findSplit(" "); - link(split[0].idup, split[2].to!long); - } - } -} - - -int main(string[] args){ - Bot b=Bot(args[1]); - b.proxy_url=args[2]; - b.controlPath=args[3]; - b.ircAddresses=getAddress(args[4],args[5].to!ushort); - b.initialize(); - if(exists("savefile")){ - auto f=File("savefile", "r"); - b.load(f); - f.close(); - } - b.connect(); - scope(failure){ - b.quit(); - } - b.setupEventloop(); - b.start(); - b.run(); - scope(exit){ - auto f=File("savefile", "w+"); - b.save(f); - f.close(); - } - - return 0; - -} -*/ diff --git a/src/bastlibridge/interfaces/irc.d b/src/bastlibridge/interfaces/irc.d @@ -265,7 +265,7 @@ final class IRC : QueuedEndpoint{ ircClient.connect(addr); } - override Channel join(string c){ + private void _join(string c){ if(c[0]=='#'){ trace("Joining irc channel ", c); ircClient.join(c); @@ -273,6 +273,9 @@ final class IRC : QueuedEndpoint{ else{ trace("Not joining query ", c); } + } + override Channel join(string c){ + _join(c); return new IRCChannel(); } diff --git a/src/bastlibridge/manager.d b/src/bastlibridge/manager.d @@ -14,6 +14,11 @@ import std.experimental.logger; class Manager{ + + enum Control{ + SHUTDOWN + } + struct Process{ Endpoint endpoint; Thread thread; @@ -93,16 +98,32 @@ class Manager{ endpoints.byValue.each!(a=>a.start()); } + auto soft_kill(){ + teardown(); + Thread.sleep(1000.dur!"msecs"); + } + auto serve(){ trace("waiting for messages"); - while(true){ - string name=receiveOnly!string(); - trace("Manager is terminating thread ", name); - getEndpoint(name).thread.join; - removeEndpoint(name); - trace("Current endpoint count ", endpoints.length); + bool finished=false; + while(!finished){ + receive( + (string name){ + trace("Manager is terminating thread ", name); + getEndpoint(name).thread.join; + removeEndpoint(name); + trace("Current endpoint count ", endpoints.length); + }, + (Manager.Control mc){ + if(mc==Control.SHUTDOWN){ + info("Shutting down due to illegal endpoint exit"); + soft_kill(); + finished=true; + } + } + ); if(endpoints.length==0){ - break; + finished=true; } } info("All processes stopped. We will now exit"); @@ -135,7 +156,13 @@ class Manager{ } } - Port getPort(bool create=false)(in char[] a){ + Port getPort(bool create=false)(in char[] a) + out(p){ + static if(create){ + assert(p.valid()); + } + } + do{ auto s=a.findSplit(":"); if(s[2].length==0){ throw new Exception(format(`Endpoint specifier "%s" malformed`, a)); @@ -170,15 +197,15 @@ class Manager{ } } Port pa=gp(a),pb=gp(b); - assert(pa.valid()); - assert(pb.valid()); + assert(pa.valid(), "pa"); + assert(pb.valid(), "pb"); trace("Doing a %s on ", a, " and ", b); synchronized(LUT_lock.writer){ LUT.%s(pa,pb); } if(savefile) save(savefile); - }`)(func1, create, func2, func2); + }`)(func1, create ? "true" : "false", func2, func2); } mixin(LUTMap!("link", "connect", true)()); mixin(LUTMap!("linkDirected", "connectDirected", true)()); diff --git a/src/bastlibridge/util.d b/src/bastlibridge/util.d @@ -1,6 +1,8 @@ module bastlibridge.util; import std.exception; import std.socket; +import std.traits; +import std.typecons; import std.range; void tryNTimes(alias func, alias errhandle)(uint N){ @@ -37,3 +39,22 @@ void wait(Socket sock) @trusted{ ubyte[4] buf; recv(sock.handle, buf.ptr, buf.length, MSG_PEEK); } + +struct DeferredExecution(alias func, alias pred){ + alias Args=Parameters!func; + Tuple!Args[] queue; + void exec(Args a){ + if(!pred()){ + queue~=tuple(a); + } + else{ + func(a); + } + } + void dequeue(){ + foreach(arg; queue){ + func(arg.expand); + } + queue.length=0; + } +}