BastliBridge

A bot framework bridgin multiple IM protocols, and mail
git clone git://xatko.vsos.ethz.ch/BastliBridge.git
Log | Files | Refs | Submodules

commit 5eff2f29fd37a47656894fea1b03db2277f54262
parent 7cf161939deaa52d31aec46d37480d6fcb201f74
Author: Dominik Schmidt <das1993@hotmail.com>
Date:   Sun, 24 Sep 2017 17:16:43 +0200

Modularize bastlibridge into telegram, http and bot.

Diffstat:
Makefile | 4++--
src/bastlibridge/http.d | 250+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/bastlibridge/telegram.d | 327+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/bot.d | 487+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/telegram.d | 818-------------------------------------------------------------------------------
5 files changed, 1066 insertions(+), 820 deletions(-)

diff --git a/Makefile b/Makefile @@ -2,8 +2,8 @@ DMD ?= ldmd2 DEBUG ?= -g RELEASE ?= -release -O -d-version=StdLoggerDisableTrace DFLAGS ?= -INCLUDES = Dirk/source/ -SRC := src/telegram.d $(wildcard Dirk/source/*/*.d) Dirk/libev/deimos/ev.d +INCLUDES = Dirk/source/ src +SRC := src/bot.d $(wildcard src/bastlibridge/*.d) $(wildcard Dirk/source/*/*.d) Dirk/libev/deimos/ev.d TARGET = bastlibridge LIBS = ev diff --git a/src/bastlibridge/http.d b/src/bastlibridge/http.d @@ -0,0 +1,250 @@ +module bastlibridge.http; + +import std.stdio; +import std.typecons; +import std.socket; +import std.format; +import std.range; +import std.algorithm; +import std.meta; +import std.conv; +import std.traits; +import std.exception; +import std.experimental.logger; + +@safe: + +class OutBuffer{ + ubyte[] buffer; + size_t offset; + + invariant(){ + assert(offset <= buffer.length); + } + + void reserve(size_t len) + in{ + assert(offset + len >= offset); + } + out{ + assert(offset + len <= buffer.length); + } + body{ + if(offset+len>buffer.length){ + buffer.length=(offset + len)*2; + } + } + alias put = write; + + void write(const(ubyte)[] bytes){ + reserve(bytes.length); + buffer[offset..offset+bytes.length]=bytes[]; + offset+=bytes.length; + } + + unittest{ + ubyte[] test=iota(0,255).map!(a=>cast(ubyte)a).array; + OutBuffer ob; + ob.write(test); + assert(ob.toBytes == test); + } + + void write(const(char)[] bytes){ + write(cast(const(ubyte)[])bytes); + } + void write(in ubyte b){ + reserve(ubyte.sizeof); + buffer[offset]=b; + offset++; + } + + ubyte[] toBytes(){ + return buffer[0..offset]; + } + + override string toString(){ + return (cast(const(char)[])toBytes).idup; + } + +} + +private static immutable string http_version="HTTP/1.1"; +private static immutable string nl="\r\n"; + + +ubyte[] receiveData(Socket sock){ + static immutable size_t chunk_size=1024; + static ubyte[] buffer=new ubyte[chunk_size]; + return receiveData(sock,buffer,chunk_size); +} + +ubyte[] receiveData(Socket sock, ubyte[] buffer, in size_t chunk_size=1024){ + ubyte[] buf=buffer; + size_t pos=0,res=0; + while(true){ + res=sock.receive(buf); + if(res==0){ + return []; + } + else if(res==Socket.ERROR){ + throw new ErrnoException("Error reading from socket"); + } + pos+=res; + if(pos<buffer.length) + break; + if(pos>=buffer.length){ + buffer.length+=chunk_size; + } + buf=buffer[pos..$]; + } + return buffer[0..pos]; +} + +Nullable!(HttpResponse!T) receiveHttp(T=void)(Socket sock, ubyte[] buffer, in size_t chunk_size=1024){ + auto data=receiveData(sock,buffer,chunk_size); + if(data.length==0){ + return Nullable!(HttpResponse!T).init; + } + return nullable(HttpResponse!T.parse(data)); +} + +Nullable!(HttpResponse!T) receiveHttp(T=void)(Socket sock){ + auto data=receiveData(sock); + if(data.length==0){ + return Nullable!(HttpResponse!T).init; + } + return nullable(HttpResponse!T.parse(data)); +} + +struct HttpRequest{ + private OutBuffer ob; + private uint state=0; + + this(OutBuffer ob){ + setBuffer(ob); + } + + void setBuffer(OutBuffer ob){ + this.ob=ob; + } + + private void clear(){ + ob.offset=0; + state=0; + } + + void request(TM, TU)(TM method, TU url){ + clear(); + ob.formattedWrite!("%s /%s "~http_version~nl)(method,url); + state=1; + } + + void header(TN,TV)(TN name, TV value) + in{ + assert(state==1); + } + body{ + ob.formattedWrite!("%s: %s"~nl)(name, value); + } + + void headers(A...)(A args) + in{ + assert(args.length%2==0); + } + body{ + + string generateMixin(){ + string str; + for(size_t i=0; i<args.length; i+=2){ + str~=format!"header(args[%d],args[%d]);"(i,i+1); + } + return str; + } + mixin(generateMixin()); + } + + void data(T)(T data) if(hasLength!T) + in{ + assert(state==1); + } + body{ + header("Content-Length", data.length); + ob.write(nl); + state=2; + ob.write(data); + } + + void data (T)(T data) if(isSomeString!T){ + import std.string:representation; + this.data(data.representation); + } + + void finalize(){ + if(state<2){ + ob.write(nl); + state=2; + } + } + + void perform(Socket sock){ + finalize(); + writeln(cast(char[])ob.toBytes); + sock.send(ob.toBytes); + } + + string toString(){ + return ob.toString(); + } + +} + +struct CopyAssoc{ + const(char)[][string] assoc; + mixin Proxy!assoc; + + auto opIndexAssign(in char[] value, in char[] key){ + assoc[key.idup]=value; + return value; + } +} + +struct HttpResponse(T=void){ + string ver; + ushort code; + string codename; + + static if(!is(T == void)){ + static assert(__traits(compiles, (){const(char)[] c=""; (T.init)[c]=c;})); + T headers; + } + const(ubyte)[] data; + + static HttpResponse parse(const(ubyte)[] data){ + const(char)[] buf=cast(const char[]) data; + HttpResponse res; + auto lines=buf.splitter("\r\n"); + auto status=lines.front; + auto split=status.splitter(" "); + + res.ver=split.front.idup; + split.popFront(); + res.code=split.front.to!ushort; + split.popFront(); + res.codename=split.joiner.to!string; + static if(!is(T==void)){ + lines.popFront(); + foreach(line;lines.until!(a=>a.length==0)){ + auto headsplit=line.findSplit(":"); + res.headers[headsplit[0]]=headsplit[2]; + } + } + + res.data=cast(const(ubyte)[])buf.findSplit("\r\n\r\n")[2]; + return res; + } + + const(char)[] str() const{ + return cast(const(char)[]) data; + } +} + diff --git a/src/bastlibridge/telegram.d b/src/bastlibridge/telegram.d @@ -0,0 +1,327 @@ +module bastlibridge.telegram; + +import bastlibridge.http; +import std.stdio; +import std.socket; +import std.range; +import std.algorithm; +import std.format; +import std.conv; +import std.experimental.logger; +import std.traits; +import ssl.socket; +import irc.client; +import std.file; +import std.typecons; +import std.string; +import std.json; +import std.exception; +import std.datetime :SysTime,Clock; +import deimos.ev; + +class TelegramErrnoException : ErrnoException{ + this(string msg, string file=__FILE__, size_t line=__LINE__){ + super(msg,file,line); + } +} +class TelegramException : Exception{ + this(string msg, string file=__FILE__, size_t line=__LINE__, Throwable next=null){ + super(msg,file,line,next); + } +} + +class TelegramError : Error{ + this(string msg, string file=__FILE__, size_t line=__LINE__, Throwable next=null){ + super(msg,file,line,next); + } +} + +ubyte[] receiveDynamic(Socket sock, ubyte[] buffer, size_t chunk_size=1024){ + ubyte[] buf=buffer; + size_t pos=0,res=0; + while(true){ + trace("Reading on ", sock.handle, "(", sock , ") ", " with buffer ", buf.ptr, " of length ", buf.length); + res=sock.receive(buf); + if(res==0){ + info("Socket ", sock.handle, " died"); + return []; + } + else if(res==Socket.ERROR){ + throw new ErrnoException("Error reading from socket"); + } + pos+=res; + if(pos<buffer.length) + break; + if(pos>=buffer.length){ + buffer.length+=chunk_size; + } + buf=buffer[pos..$]; + } + return buffer[0..pos]; +} + +struct Telegram{ + string token=""; + enum ApiAddr="api.telegram.org"; + enum ApiPort=443; + + long lastUpdate=-1; + + SslSocket sock; + + private Address ApiAddrObj; + + private HttpRequest httpRequest; + + @disable this(); + this(string token){ + this.token=token; + httpRequest.setBuffer(new OutBuffer); + } + + void connect(bool test=true){ + if(!ApiAddrObj){ + getApiAddress(); + } + trace("Opening socket to Telegram on "~ApiAddrObj.to!string); + auto af=ApiAddrObj.addressFamily; + sock=new SslSocket(af); + sock.connect(ApiAddrObj); + scope(failure){ + sock.close(); + } + if(test){ + //Try whether the socket works + trace("Trying to get the Botname via newly opened socket "~sock.handle.to!string); + botName=getBotName(sock); + } + } + + void getApiAddress(){ + ApiAddrObj=getAddress(ApiAddr,ApiPort).front; + } + + + void disconnect(){ + sock.shutdown(SocketShutdown.BOTH); + sock.close(); + delete sock; + } + + auto getURI(in char[] method){ + return chain("bot",token,"/",method); + } + + void httpRequestHeaders(ref HttpRequest httpRequest){ + httpRequest.header("Host", ApiAddr); + httpRequest.header("Connection", "keep-alive"); + } + + void http(in char[] method){ + httpRequest.request("GET", getURI(method)); + httpRequestHeaders(httpRequest); + httpRequest.perform(sock); + } + + void post(T)(in char[] method, T data){ + httpRequest.request("POST", getURI(method)); + httpRequestHeaders(httpRequest); + httpRequest.header("Content-Type", "application/x-www-form-urlencoded"); + httpRequest.data(data); + httpRequest.perform(sock); + } + + void json(in char[] method, in JSONValue jv){ + httpRequest.request("POST", getURI(method)); + httpRequestHeaders(httpRequest); + httpRequest.header("Content-Type", "application/json"); + httpRequest.data(jv.toString); + httpRequest.perform(sock); + } + + /** + * Receives all available data from the socket. + * The returned buffer gets reused, if you want it immutable, + * use .idup on it. + */ + const(char)[] receive() + in + { + assert(sock !is null); + assert(sock.isAlive); + } + body{ + static immutable size_t chunk_size=1024; + + static ubyte[] buffer=new ubyte[chunk_size]; //This initialization is static, i.e. only done once + return cast(char[])sock.receiveDynamic(buffer, chunk_size); + } + + void checkCode(T)(in HttpResponse!T res){ + if(res.code!=200){ + throw new TelegramException(format!"Server returned %d\n%s"(res.code, res.str)); + } + } + + auto getJSON(T)(in HttpResponse!T res){ + auto j=parseJSON(res.str); + if(j["ok"].type != JSON_TYPE.TRUE){ + throw new TelegramException("API request failed: "~j["description"].str); + } + return j["result"]; + } + + Nullable!(JSONValue) response(){ + auto h=receiveHttp(sock); + if(h.isNull){ + //throw new TelegramError("Socket closed"); + warning("Telegram closed _another_ socket :/ ",sock.handle); + reconnectSocket(); + return Nullable!JSONValue.init; + } + checkCode(h); + return Nullable!JSONValue(getJSON(h)); + } + + void reconnectSocket(){ + trace("Reconnecting socket ", sock.handle); + disconnect(); + connect(); + trace("Reconnected socket, is now ", sock.handle); + } + + struct HTTP{ + string ver; + ushort code; + string code_name; + const(char)[] content; + + static HTTP parse(const(char)[] buf) + in{ + assert(buf.length>0); + } + body{ + HTTP res; + trace("Parseing "~buf); + auto lines=buf.splitter("\r\n"); + auto status=lines.front; + auto split=status.splitter(" "); + + res.ver=split.front.idup; + split.popFront(); + res.code=split.front.to!ushort; + split.popFront(); + res.code_name=split.joiner.to!string; + + res.content=buf.findSplit("\r\n\r\n")[2]; + trace("Parsed to "~res.to!string); + return res; + } + + void checkCode(){ + if(code!=200){ + throw new TelegramException(format!"Server returned %d\n%s"(code, content)); + } + } + + auto getJSON(){ + import std.json; + + auto j=parseJSON(content); + if(j["ok"].type != JSON_TYPE.TRUE){ + throw new TelegramException("API request failed: "~j["description"].str); + } + return j["result"]; + } + } + + void triggerUpdates(uint timeout=600){ + trace("Triggering Update with timeout ", timeout); + string params=format!"offset=%d&timeout=%d"(lastUpdate+1,timeout); + post("getUpdates",params); + } + + void delegate(JSONValue jv)[] onMessage; + + bool read(){ + auto updates=response(); + if(updates.isNull){ + triggerUpdates(); + return true; + } + foreach(size_t i, update; updates){ + lastUpdate=max(lastUpdate, update["update_id"].integer); + if("message" in update){ + onMessage.each!(a=>a(update["message"])); + } + } + + return false; + } + + void send(in long chatid, in char[] buf){ + trace("Sending "~buf~" to "~chatid.to!string); + JSONValue j; + j.object=null; + j["chat_id"] = JSONValue(chatid); + j["text"] = JSONValue(buf); + trace("Sending telegram message "~j.toPrettyString); + json("sendMessage", j); + auto res=response(); + if(res.isNull){ + send(chatid,buf); + } + } + + JSONValue getChat(T)(in T chatid) if(isNarrowString!T || is(T==long)){ + JSONValue j; + j.object=null; + j["chat_id"]=JSONValue(chatid); + json("getChat", j); + auto res=response(); + if(res.isNull){ + return getChat(chatid); + } + return res.get; + } + long getChatId(in char[] chatid){ + return getChat(chatid)["id"].integer; + } + string getChatName(long chatid){ + auto j=getChat(chatid); + if("title" in j){ + //We are in a group + return j["title"].str; + } + //We are in a query + return j["username"].str; + } + + string botName; + + string getBotName(){ + return getBotName(sock); + } + string getBotName(Socket sock){ + http("getMe"); + string _botName=response()["username"].str; + return _botName; + } + string getFile(in char[] file_id){ + JSONValue j; + j.object=null; + j["file_id"]=file_id; + json("getFile", j); + auto r=response(); + if(r.isNull){ + return getFile(file_id); + } + j=r.get; + auto v="file_path" in j; + if(v) + return v.str; + else + throw new TelegramException(cast(string)("Path of file "~file_id~" was not found")); + } +} + diff --git a/src/bot.d b/src/bot.d @@ -0,0 +1,487 @@ +import bastlibridge.telegram; +import bastlibridge.http; +import deimos.ev; +import std.socket; +import std.json; +import std.array; +import std.conv; +import std.experimental.logger; +import std.algorithm; +import std.datetime; +import std.stdio; +import std.string; +import std.file; +import std.typecons; +import irc.client; +import ssl.socket; + + +struct LookupTable{ + private long[][string] _irc; + private string[][long] _telegram; + void connect(string irc, long telegram){ + _irc[irc]~=telegram; + _telegram[telegram]~=irc; + } + void disconnect(string irc, long telegram){ + auto i=irc in _irc; + *i=remove!(a=>a==telegram)(*i); + auto t=telegram in _telegram; + *t=remove!(a=>a==irc)(*t); + } + + long[] irc(in char[] irc){ + return _irc.get(cast(string)irc,[]); + } + + string[] telegram(in long telegram){ + return _telegram.get(telegram,[]); + } + auto ircChannels(){ + return _irc.byKey; + } + auto telegramChannels(){ + return _telegram.byKey; + } + + auto links(){ + return _irc.byPair.map!(a=>a[1].map!(b=>tuple(a[0],b))).joiner; + } +} + + +struct Watcher{ + ev_io io; + void delegate(int revents) cb; + extern(C) static void callback(ev_loop_t* loop, ev_io *io, int revents){ + auto watcher=cast(Watcher*) io; + watcher.cb(revents); + } + this(Socket sock, typeof(this.cb) cb){ + this(sock.handle,cb); + } + this(int fd, typeof(this.cb) cb){ + this.cb=cb; + ev_io_init(&io, &callback, fd, EV_READ); + } +} + +struct Bot{ + Address ircAddress; + Socket ircSocket; + IrcClient ircClient; + Telegram telegram, telegram_listener; + LookupTable lut; + Watcher w_irc,w_tele,w_stdin; + ev_loop_t *eventloop; + + string[long] telegram_channels; + + struct LastUpdate{ + SysTime Telegram,IRC; + void updateTelegram(){ + Telegram=Clock.currTime(); + } + void updateIRC(){ + IRC=Clock.currTime(); + } + string toString(string separator=", "){ + auto now=Clock.currTime; + string str=format!( + "Last Update received from IRC: %s UTC, that is %s ago%s"~ + "Last update received from Telegram: %s UTC, that is %s ago" + )(IRC.toUTC.toString(), (now-IRC).toString, + separator, + Telegram.toUTC.toString, (now-Telegram).toString); + return str; + } + } + + LastUpdate lastUpdate; + + this(string tgramApiKey){ + telegram=Telegram("253031348:AAGn4dHCPwFDSN4Bmt4ZWc3bIAmARmzsf9k"); + telegram_listener=Telegram("253031348:AAGn4dHCPwFDSN4Bmt4ZWc3bIAmARmzsf9k"); + } + + static immutable string unknown_username="Unknown"; + static string TelegramUser(JSONValue User){ + string username=unknown_username; + foreach(key; ["username", "first_name", "last_name"]){ + if(key in User){ + username=User[key].str; + break; + } + } + return username; + } + + string formatProxyURL(string file_id){ + return "FILE("~file_id~")"; //Stub. Implement this + } + + string locationToOSMUrl(JSONValue loc){ + return locationToOSMUrl(loc["latitude"].floating, loc["longitude"].floating); + } + string locationToOSMUrl(float lat, float lng){ + return format!"https://www.openstreetmap.org/#map=%f/%f"(lat, lng); + } + + string TelegramUserToIRC(JSONValue user){ + dchar mode=' '; + if(user["is_bot"].type==JSON_TYPE.TRUE){ + mode='*'; + } + return "<"~mode.to!string~TelegramUser(user)~">"; + } + + void TelegramToIRC(JSONValue Message, ref Appender!string app){ + string username="< "~unknown_username~">"; + if("from" in Message){ + app~=TelegramUserToIRC(Message["from"]); + app~=" "; + } + if("forward_from" in Message){ + app~=TelegramUserToIRC(Message["forward_from"]); + app~=" "; + } + foreach(t; ["text", "caption"]){ + if(t in Message){ + app~=Message[t].str; + app~=" "; + } + } + + if("photo" in Message){ + string fid; + long size=long.min; + foreach(p; Message["photo"].array){ + if(p["file_size"].integer>size){ + size=p["file_size"].integer; + fid=p["file_id"].str; + } + } + app~=formatProxyURL(fid); + app~=" "; + } + + foreach(t; ["audio", "document", "sticker", "video", "voice"]){ + if(t in Message){ + app~=formatProxyURL(Message[t]["file_id"].str); + app~=" "; + } + } + if("location" in Message){ + app~=locationToOSMUrl(Message["location"]); + app~=" "; + } + if("contact" in Message){ + auto c=Message["contact"]; + app~=c["first_name"].str; + app~=" "; + if("last_name" in c){ + app~=c["last_name"].str; + app~=" "; + } + app~="("; + app~=c["phone_number"].str; + app~=") "; + } + if("venue" in Message){ + app~=Message["venue"]["title"].str; + app~=locationToOSMUrl(Message["venue"]["location"]); + app~=" "; + } + if("reply_to_message" in Message){ + TelegramToIRC(Message["Reply_to_message"], app); + } + } + + string TelegramToIRC(JSONValue Message){ + Appender!string app; + + TelegramToIRC(Message, app); + + return app.data[0..$-1]; + } + + void initialize(){ + ircAddress=getAddress("irc.freenode.net",6697)[0]; + auto af=ircAddress.addressFamily; + ircSocket=new SslSocket(af); + ircClient=new IrcClient(ircSocket); + + //lut.connect("#bastli_wasserstoffreakteur", -8554836); + + + ircClient.realName="Bastli"; + ircClient.userName="bastli"; + ircClient.nickName="bastli"; + ircClient.onConnect~=((){ + info("Connected to IRC on ",ircClient.serverAddress); + ev_io_start(eventloop, &w_tele.io); + telegram_listener.triggerUpdates(); + foreach(k;lut.ircChannels){ + ircClient.join(k); + } + }); + ircClient.onMessage~=(u,tt,m){ + trace("IRC received message from "~u.nickName~" on "~tt~": "~m); + string msg=cast(string)m; + if(msg.skipOver("!bastli ")){ + trace("Message is a command"); + try{ + auto split=msg.findSplit(" "); + switch(split[0]){ + case "links": + ircClient.send(tt,lut.irc(tt).map!(a=>telegram.getChatName(a)~"("~a.to!string~")").join(", ")); + break; + case "link": + //link(tt.idup, split[2].to!long); + break; + case "unlink": + unlink(tt.idup, split[2].to!long); + break; + case "list_telegram": + ircClient.send(tt, telegram_channels.byPair.map!(a=>a[1]~"("~a[0].to!string~")").join(", ")); + break; + case "lastUpdate": + ircClient.send(tt, lastUpdate.toString(", ")); + break; + default: + ircClient.send(tt, "Unknown command"); + break; + } + } + catch(Exception e){ + ircClient.send(tt, "Error while executing command: "~e.msg); + warning(e.to!string); + } + } + else{ + foreach(chatid; lut.irc(tt)){ + telegram.send(chatid, "< "~u.nickName~"> "~m); + } + } + }; + + + telegram_listener.onMessage~=(j){ + trace("Received Message "~j.toPrettyString()); + auto jv="text" in j; + string msg=""; + if(jv){ + msg=j["text"].str; + } + + auto chatid=j["chat"]["id"].integer; + if(chatid !in telegram_channels){ + if("title" in j["chat"]){ + telegram_channels[chatid]=j["chat"]["title"].str; + } + else{ + telegram_channels[chatid]=j["chat"]["username"].str; + } + } + if(msg.skipOver("/")){ + trace("Bot command received"); + try{ + auto split=msg.findSplit(" "); + auto cmdbot=split[0].findSplit("@"); + if(cmdbot[1].length==0 && cmdbot[2]!=telegram.botName){ + trace("Not one of our commands"); + } + else{ + switch(cmdbot[0]){ + case "link": + link(split[2], chatid); + ircClient.notice(split[2], "Linked with "~chatid.to!string); + break; + case "links": + telegram.send(chatid, lut.telegram(chatid).join(", ")); + break; + case "unlink": + unlink(split[2], chatid); + ircClient.notice(split[2], "Unlinked with "~chatid.to!string); + break; + case "lastUpdate": + telegram.send(chatid, lastUpdate.toString("\n")); + break; + default: + //telegram.send(chatid, split[0]~": Unknown command"); + break; + } + } + } + catch(Exception e){ + telegram.send(chatid, "Error executing command: "~e.msg); + warning(e.to!string); + } + } + else{ + auto irc_chans=lut.telegram(chatid); + if(irc_chans.length==0){ + warning("Input on not-connected telegram-channel "~j["chat"].toPrettyString); + } + foreach(chan; irc_chans){ + ircClient.send(chan, TelegramToIRC(j)); + } + } + }; + } + + void unlink(string irc, long telegram){ + info("Unlinking ", irc, " from ", telegram); + lut.disconnect(irc, telegram); + if(ircClient.connected()){ + if(lut.irc(irc).length==0){ + ircClient.part(irc); + } + } + } + + void link(string irc, long telegram){ + info("Linking ", irc, " with ", telegram); + lut.connect(irc, telegram); + if(ircClient.connected()){ + ircClient.join(irc); + } + } + + void connect(){ + info("Connecting to Telegram"); + telegram.connect(); + telegram_listener.connect(); + //ircClient.connect(new InternetAddress("127.0.0.1",6667)); + info("Connecting to IRC"); + ircClient.connect(ircAddress); + } + + void setupEventloop(){ + eventloop=ev_default_loop(0); + + w_irc=Watcher(ircSocket,(revents){ + trace("IRC Update"); + lastUpdate.updateIRC(); + + if(ircClient.read()){ + ev_io_stop(eventloop, &w_irc.io); + } + }); + + w_tele=Watcher(telegram_listener.sock,(revents){ + trace("Telegram Update"); + lastUpdate.updateTelegram(); + + Exception e; + bool res; + try res=telegram_listener.read(); + catch(TelegramException te) error(te); + if(res){ + ev_io_stop(eventloop, &w_tele.io); + ev_io_set(&w_tele.io, telegram_listener.sock.handle, EV_READ); + ev_io_start(eventloop, &w_tele.io); + } + else{ + telegram_listener.triggerUpdates(); + } + }); + + w_stdin=Watcher(stdin.fileno, (revents){ + auto l=stdin.readln.chomp; + auto split=l.findSplit(" "); + switch(split[0]){ + case "quit": + quit(); + break; + case "link": + auto split2=split[2].findSplit(" "); + link(split2[0], split2[2].to!long); + break; + case "links": + foreach(i,tt; lut._irc.byPair){ + foreach(t; tt){ + writeln(i, " <-> ", t); + } + } + break; + case "unlink": + break; + default: + writeln(l, ": unknown command"); + } + + }); + } + + void quit(){ + info("Quitting the bot"); + ircClient.quit("Exiting gracefully"); + disconnect(); + stop(); + } + + void start(){ + info("Starting the event-listeners"); + ev_io_start(eventloop, &w_irc.io); + ev_io_start(eventloop, &w_stdin.io); + } + + void stop(){ + trace("Stopping the Eventloop"); + ev_io_stop(eventloop, &w_tele.io); + ev_io_stop(eventloop, &w_irc.io); + ev_io_stop(eventloop, &w_stdin.io); + ev_break(eventloop, EVBREAK_ALL); + } + + void run(){ + info("Running the event-loop"); + ev_run(eventloop,0); + } + + void disconnect(){ + telegram.disconnect(); + telegram_listener.disconnect(); + } + + void save(File f){ + trace("Saving to file ",f.name); + foreach(link; lut.links){ + f.writeln(link[0], " ", link[1].to!string); + } + } + void load(File f){ + trace("loading from file ",f.name); + foreach(l; f.byLine.filter!(a=>a.length>0)){ + auto split=l.findSplit(" "); + link(split[0].idup, split[2].to!long); + } + } +} + + +int main(string[] args){ + Bot b=Bot(args[1]); + b.initialize(); + if(exists("savefile")){ + auto f=File("savefile", "r"); + b.load(f); + f.close(); + } + b.connect(); + scope(failure){ + b.quit(); + } + b.setupEventloop(); + b.start(); + b.run(); + scope(exit){ + auto f=File("savefile", "w+"); + b.save(f); + f.close(); + } + + return 0; + +} diff --git a/src/telegram.d b/src/telegram.d @@ -1,818 +0,0 @@ -import std.stdio; -import std.socket; -import std.range; -import std.algorithm; -import std.format; -import std.conv; -import std.experimental.logger; -import std.traits; -import ssl.socket; -import irc.client; -import std.file; -import std.typecons; -import std.string; -import std.json; -import std.exception; -import std.datetime :SysTime,Clock; -import deimos.ev; - -class TelegramErrnoException : ErrnoException{ - this(string msg, string file=__FILE__, size_t line=__LINE__){ - super(msg,file,line); - } -} -class TelegramException : Exception{ - this(string msg, string file=__FILE__, size_t line=__LINE__, Throwable next=null){ - super(msg,file,line,next); - } -} - -class TelegramError : Error{ - this(string msg, string file=__FILE__, size_t line=__LINE__, Throwable next=null){ - super(msg,file,line,next); - } -} - -ubyte[] receiveDynamic(Socket sock, ubyte[] buffer, size_t chunk_size=1024){ - ubyte[] buf=buffer; - size_t pos=0,res=0; - while(true){ - trace("Reading on ", sock.handle, "(", sock , ") ", " with buffer ", buf.ptr, " of length ", buf.length); - res=sock.receive(buf); - if(res==0){ - info("Socket ", sock.handle, " died"); - return []; - } - else if(res==Socket.ERROR){ - throw new ErrnoException("Error reading from socket"); - } - pos+=res; - if(pos<buffer.length) - break; - if(pos>=buffer.length){ - buffer.length+=chunk_size; - } - buf=buffer[pos..$]; - } - return buffer[0..pos]; -} - -struct Telegram{ - string token=""; - enum ApiAddr="api.telegram.org"; - enum ApiPort=443; - - long lastUpdate=-1; - - package SslSocket sock; - package SslSocket sock2; - - private Address ApiAddrObj; - - @disable this(); - this(string token){ - this.token=token; - } - - void connect(){ - openSocket(sock); - openSocket(sock2); - } - - void getApiAddress(){ - ApiAddrObj=getAddress(ApiAddr,ApiPort).front; - } - - void openSocket(T)(out T sock){ - if(!ApiAddrObj){ - getApiAddress(); - } - trace("Opening socket to Telegram on "~ApiAddrObj.to!string); - auto af=ApiAddrObj.addressFamily; - sock=new T(af); - sock.connect(ApiAddrObj); - scope(failure){ - sock.close(); - } - testSocket(sock); - } - - void testSocket(T)(T sock){ - //Try whether the socket works - trace("Trying to get the Botname via newly opened socket "~sock.handle.to!string); - botName=getBotName(sock); - } - - - void disconnect(){ - sock.shutdown(SocketShutdown.BOTH); - sock.close(); - delete sock; - sock2.shutdown(SocketShutdown.BOTH); - sock2.close(); - delete sock2; - } - - auto getURI(in char[] method){ - return chain("/bot",token,"/",method); - } - - void http(in char[] method){ - http(method, sock); - } - void http(in char[] method, Socket sock) - in{ - assert(sock !is null); - assert(sock.isAlive); - } - body{ - import std.conv:to; - string req="GET "~getURI(method).to!string~" HTTP/1.1\r\n" - ~"Host: "~ApiAddr~"\r\n" - ~"Connection: keep-alive\r\n"; - req~="\r\n\r\n"; - sock.send(req); - } - - void data(in char[] method, in char[] data, in char[] datatype){ - return this.data(method,data,datatype,sock); - } - - void data(in char[] method, in char[] data, in char[] datatype, Socket sock) - in{ - assert(sock !is null); - assert(sock.isAlive); - } - body{ - import std.conv:to; - string req="GET "~getURI(method).to!string~" HTTP/1.1\r\n" - ~"Host: "~ApiAddr~"\r\n" - ~"Connection: keep-alive\r\n"; - req~="Content-Length: "~data.length.to!string~"\r\n" - ~"Content-Type: "~datatype; - req~="\r\n\r\n"~data; - trace(req); - sock.send(req); - } - - void post(in char[] method, in char[] data){ - post(method,data,sock); - } - void post(in char[] method, in char[] data, Socket sock){ - return this.data(method, data, "application/x-www-form-urlencoded",sock); - } - void json(in char[] method, in char[] data){ - json(method,data,sock); - } - void json(in char[] method, in char[] data, Socket sock){ - return this.data(method, data, "application/json",sock); - } - - /** - * Receives all available data from the socket. - * The returned buffer gets reused, if you want it immutable, - * use .idup on it. - */ - const(char)[] receive(){ - return receive(sock); - } - const(char)[] receive(Socket sock) - in - { - assert(sock !is null); - assert(sock.isAlive); - } - body{ - static immutable size_t chunk_size=1024; - - static ubyte[] buffer=new ubyte[chunk_size]; //This initialization is static, i.e. only done once - return cast(char[])sock.receiveDynamic(buffer, chunk_size); - } - - Nullable!(JSONValue) response(Socket sock){ - auto buf=receive(sock); - if(buf.length==0){ - //throw new TelegramError("Socket closed"); - warning("Telegram closed _another_ socket :/ ",sock.handle); - reconnectSocket(sock); - return Nullable!JSONValue.init; - } - HTTP h=HTTP.parse(buf); - h.checkCode(); - return Nullable!JSONValue(h.getJSON()); - } - - void reconnectSocket(Socket sock){ - trace("Reconnecting socket ", sock.handle); - sock.shutdown(SocketShutdown.BOTH); - sock.close(); - //dreckige hackerei - void[] mem=(cast(void*)sock)[0..__traits(classInstanceSize, SslSocket)]; - emplace!SslSocket(mem,ApiAddrObj.addressFamily); - sock.connect(ApiAddrObj); - trace("Reconnected socket, is now ", sock.handle); - } - - struct HTTP{ - string ver; - ushort code; - string code_name; - const(char)[] content; - - static HTTP parse(const(char)[] buf) - in{ - assert(buf.length>0); - } - body{ - HTTP res; - trace("Parseing "~buf); - auto lines=buf.splitter("\r\n"); - auto status=lines.front; - auto split=status.splitter(" "); - - res.ver=split.front.idup; - split.popFront(); - res.code=split.front.to!ushort; - split.popFront(); - res.code_name=split.joiner.to!string; - - res.content=buf.findSplit("\r\n\r\n")[2]; - trace("Parsed to "~res.to!string); - return res; - } - - void checkCode(){ - if(code!=200){ - throw new TelegramException(format!"Server returned %d\n%s"(code, content)); - } - } - - auto getJSON(){ - import std.json; - - auto j=parseJSON(content); - if(j["ok"].type != JSON_TYPE.TRUE){ - throw new TelegramException("API request failed: "~j["description"].str); - } - return j["result"]; - } - } - - void triggerUpdates(uint timeout=600){ - trace("Triggering Update with timeout ", timeout); - string params=format!"offset=%d&timeout=%d"(lastUpdate+1,timeout); - post("getUpdates",params); - } - - void delegate(JSONValue jv)[] onMessage; - - bool read(){ - auto updates=response(sock); - if(updates.isNull){ - triggerUpdates(); - return true; - } - foreach(size_t i, update; updates){ - lastUpdate=max(lastUpdate, update["update_id"].integer); - if("message" in update){ - onMessage.each!(a=>a(update["message"])); - } - } - - return false; - } - - void send(in long chatid, in char[] buf){ - trace("Sending "~buf~" to "~chatid.to!string); - JSONValue j; - j.object=null; - j["chat_id"] = JSONValue(chatid); - j["text"] = JSONValue(buf); - trace("Sending telegram message "~j.toPrettyString); - json("sendMessage", j.toString,sock2); - auto res=response(sock2); - if(res.isNull){ - send(chatid,buf); - } - } - - JSONValue getChat(T)(in T chatid) if(isNarrowString!T || is(T==long)){ - JSONValue j; - j.object=null; - j["chat_id"]=JSONValue(chatid); - string data=j.toString; - json("getChat", data, sock2); - auto res=response(sock2); - if(res.isNull){ - return getChat(chatid); - } - return res.get; - } - long getChatId(in char[] chatid){ - return getChat(chatid)["id"].integer; - } - string getChatName(long chatid){ - auto j=getChat(chatid); - if("title" in j){ - //We are in a group - return j["title"].str; - } - //We are in a query - return j["username"].str; - } - - string botName; - - string getBotName(){ - return getBotName(sock); - } - string getBotName(Socket sock){ - http("getMe", sock); - string _botName=response(sock)["username"].str; - return _botName; - } - string getFile(in char[] file_id){ - JSONValue j; - j.object=null; - j["file_id"]=file_id; - json("getFile", j.toString, sock2); - auto r=response(sock2); - if(r.isNull){ - return getFile(file_id); - } - j=r.get; - auto v="file_path" in j; - if(v) - return v.str; - else - throw new TelegramException(cast(string)("Path of file "~file_id~" was not found")); - } -} - -struct LookupTable{ - private long[][string] _irc; - private string[][long] _telegram; - void connect(string irc, long telegram){ - _irc[irc]~=telegram; - _telegram[telegram]~=irc; - } - void disconnect(string irc, long telegram){ - auto i=irc in _irc; - *i=remove!(a=>a==telegram)(*i); - auto t=telegram in _telegram; - *t=remove!(a=>a==irc)(*t); - } - - long[] irc(in char[] irc){ - return _irc.get(cast(string)irc,[]); - } - - string[] telegram(in long telegram){ - return _telegram.get(telegram,[]); - } - auto ircChannels(){ - return _irc.byKey; - } - auto telegramChannels(){ - return _telegram.byKey; - } - - auto links(){ - return _irc.byPair.map!(a=>a[1].map!(b=>tuple(a[0],b))).joiner; - } -} - - -struct Watcher{ - ev_io io; - void delegate(int revents) cb; - extern(C) static void callback(ev_loop_t* loop, ev_io *io, int revents){ - auto watcher=cast(Watcher*) io; - watcher.cb(revents); - } - this(Socket sock, typeof(this.cb) cb){ - this(sock.handle,cb); - } - this(int fd, typeof(this.cb) cb){ - this.cb=cb; - ev_io_init(&io, &callback, fd, EV_READ); - } -} - -struct Bot{ - Address ircAddress; - Socket ircSocket; - IrcClient ircClient; - Telegram telegram; - LookupTable lut; - Watcher w_irc,w_tele,w_stdin; - ev_loop_t *eventloop; - - string[long] telegram_channels; - - struct LastUpdate{ - SysTime Telegram,IRC; - void updateTelegram(){ - Telegram=Clock.currTime(); - } - void updateIRC(){ - IRC=Clock.currTime(); - } - string toString(string separator=", "){ - auto now=Clock.currTime; - string str=format!( - "Last Update received from IRC: %s UTC, that is %s ago%s"~ - "Last update received from Telegram: %s UTC, that is %s ago" - )(IRC.toUTC.toString(), (now-IRC).toString, - separator, - Telegram.toUTC.toString, (now-Telegram).toString); - return str; - } - } - - LastUpdate lastUpdate; - - this(string tgramApiKey){ - telegram=Telegram("253031348:AAGn4dHCPwFDSN4Bmt4ZWc3bIAmARmzsf9k"); - } - - static immutable string unknown_username="Unknown"; - static string TelegramUser(JSONValue User){ - string username=unknown_username; - foreach(key; ["username", "first_name", "last_name"]){ - if(key in User){ - username=User[key].str; - break; - } - } - return username; - } - - string formatProxyURL(string file_id){ - return "FILE("~file_id~")"; //Stub. Implement this - } - - string locationToOSMUrl(JSONValue loc){ - return locationToOSMUrl(loc["latitude"].floating, loc["longitude"].floating); - } - string locationToOSMUrl(float lat, float lng){ - return format!"https://www.openstreetmap.org/#map=%f/%f"(lat, lng); - } - - string TelegramUserToIRC(JSONValue user){ - dchar mode=' '; - if(user["is_bot"].type==JSON_TYPE.TRUE){ - mode='*'; - } - return "<"~mode.to!string~TelegramUser(user)~">"; - } - - void TelegramToIRC(JSONValue Message, ref Appender!string app){ - string username="< "~unknown_username~">"; - if("from" in Message){ - app~=TelegramUserToIRC(Message["from"]); - app~=" "; - } - if("forward_from" in Message){ - app~=TelegramUserToIRC(Message["forward_from"]); - app~=" "; - } - foreach(t; ["text", "caption"]){ - if(t in Message){ - app~=Message[t].str; - app~=" "; - } - } - - if("photo" in Message){ - string fid; - long size=long.min; - foreach(p; Message["photo"].array){ - if(p["file_size"].integer>size){ - size=p["file_size"].integer; - fid=p["file_id"].str; - } - } - app~=formatProxyURL(fid); - app~=" "; - } - - foreach(t; ["audio", "document", "sticker", "video", "voice"]){ - if(t in Message){ - app~=formatProxyURL(Message[t]["file_id"].str); - app~=" "; - } - } - if("location" in Message){ - app~=locationToOSMUrl(Message["location"]); - app~=" "; - } - if("contact" in Message){ - auto c=Message["contact"]; - app~=c["first_name"].str; - app~=" "; - if("last_name" in c){ - app~=c["last_name"].str; - app~=" "; - } - app~="("; - app~=c["phone_number"].str; - app~=") "; - } - if("venue" in Message){ - app~=Message["venue"]["title"].str; - app~=locationToOSMUrl(Message["venue"]["location"]); - app~=" "; - } - if("reply_to_message" in Message){ - TelegramToIRC(Message["Reply_to_message"], app); - } - } - - string TelegramToIRC(JSONValue Message){ - Appender!string app; - - TelegramToIRC(Message, app); - - return app.data[0..$-1]; - } - - void initialize(){ - ircAddress=getAddress("irc.freenode.net",6697)[0]; - auto af=ircAddress.addressFamily; - ircSocket=new SslSocket(af); - ircClient=new IrcClient(ircSocket); - - //lut.connect("#bastli_wasserstoffreakteur", -8554836); - - - ircClient.realName="Bastli"; - ircClient.userName="bastli"; - ircClient.nickName="bastli"; - ircClient.onConnect~=((){ - info("Connected to IRC on ",ircClient.serverAddress); - ev_io_start(eventloop, &w_tele.io); - telegram.triggerUpdates(); - foreach(k;lut.ircChannels){ - ircClient.join(k); - } - }); - ircClient.onMessage~=(u,tt,m){ - trace("IRC received message from "~u.nickName~" on "~tt~": "~m); - string msg=cast(string)m; - if(msg.skipOver("!bastli ")){ - trace("Message is a command"); - try{ - auto split=msg.findSplit(" "); - switch(split[0]){ - case "links": - ircClient.send(tt,lut.irc(tt).map!(a=>telegram.getChatName(a)~"("~a.to!string~")").join(", ")); - break; - case "link": - //link(tt.idup, split[2].to!long); - break; - case "unlink": - unlink(tt.idup, split[2].to!long); - break; - case "list_telegram": - ircClient.send(tt, telegram_channels.byPair.map!(a=>a[1]~"("~a[0].to!string~")").join(", ")); - break; - case "lastUpdate": - ircClient.send(tt, lastUpdate.toString(", ")); - break; - default: - ircClient.send(tt, "Unknown command"); - break; - } - } - catch(Exception e){ - ircClient.send(tt, "Error while executing command: "~e.msg); - warning(e.to!string); - } - } - else{ - foreach(chatid; lut.irc(tt)){ - telegram.send(chatid, "< "~u.nickName~"> "~m); - } - } - }; - - - telegram.onMessage~=(j){ - trace("Received Message "~j.toPrettyString()); - auto jv="text" in j; - string msg=""; - if(jv){ - msg=j["text"].str; - } - - auto chatid=j["chat"]["id"].integer; - if(chatid !in telegram_channels){ - if("title" in j["chat"]){ - telegram_channels[chatid]=j["chat"]["title"].str; - } - else{ - telegram_channels[chatid]=j["chat"]["username"].str; - } - } - if(msg.skipOver("/")){ - trace("Bot command received"); - try{ - auto split=msg.findSplit(" "); - auto cmdbot=split[0].findSplit("@"); - if(cmdbot[1].length==0 && cmdbot[2]!=telegram.botName){ - trace("Not one of our commands"); - } - else{ - switch(cmdbot[0]){ - case "link": - link(split[2], chatid); - ircClient.notice(split[2], "Linked with "~chatid.to!string); - break; - case "links": - telegram.send(chatid, lut.telegram(chatid).join(", ")); - break; - case "unlink": - unlink(split[2], chatid); - ircClient.notice(split[2], "Unlinked with "~chatid.to!string); - break; - case "lastUpdate": - telegram.send(chatid, lastUpdate.toString("\n")); - break; - default: - //telegram.send(chatid, split[0]~": Unknown command"); - break; - } - } - } - catch(Exception e){ - telegram.send(chatid, "Error executing command: "~e.msg); - warning(e.to!string); - } - } - else{ - auto irc_chans=lut.telegram(chatid); - if(irc_chans.length==0){ - warning("Input on not-connected telegram-channel "~j["chat"].toPrettyString); - } - foreach(chan; irc_chans){ - ircClient.send(chan, TelegramToIRC(j)); - } - } - }; - } - - void unlink(string irc, long telegram){ - info("Unlinking ", irc, " from ", telegram); - lut.disconnect(irc, telegram); - if(ircClient.connected()){ - if(lut.irc(irc).length==0){ - ircClient.part(irc); - } - } - } - - void link(string irc, long telegram){ - info("Linking ", irc, " with ", telegram); - lut.connect(irc, telegram); - if(ircClient.connected()){ - ircClient.join(irc); - } - } - - void connect(){ - info("Connecting to Telegram"); - telegram.connect(); - //ircClient.connect(new InternetAddress("127.0.0.1",6667)); - info("Connecting to IRC"); - ircClient.connect(ircAddress); - } - - void setupEventloop(){ - eventloop=ev_default_loop(0); - - w_irc=Watcher(ircSocket,(revents){ - trace("IRC Update"); - lastUpdate.updateIRC(); - - if(ircClient.read()){ - ev_io_stop(eventloop, &w_irc.io); - } - }); - - w_tele=Watcher(telegram.sock,(revents){ - trace("Telegram Update"); - lastUpdate.updateTelegram(); - - Exception e; - bool res; - try res=telegram.read(); - catch(TelegramException te) error(te); - if(res){ - ev_io_stop(eventloop, &w_tele.io); - ev_io_set(&w_tele.io, telegram.sock.handle, EV_READ); - ev_io_start(eventloop, &w_tele.io); - } - else{ - telegram.triggerUpdates(); - } - }); - - w_stdin=Watcher(stdin.fileno, (revents){ - auto l=stdin.readln.chomp; - auto split=l.findSplit(" "); - switch(split[0]){ - case "quit": - quit(); - break; - case "link": - auto split2=split[2].findSplit(" "); - link(split2[0], split2[2].to!long); - break; - case "links": - foreach(i,tt; lut._irc.byPair){ - foreach(t; tt){ - writeln(i, " <-> ", t); - } - } - break; - case "unlink": - break; - default: - writeln(l, ": unknown command"); - } - - }); - } - - void quit(){ - info("Quitting the bot"); - ircClient.quit("Exiting gracefully"); - disconnect(); - stop(); - } - - void start(){ - info("Starting the event-listeners"); - ev_io_start(eventloop, &w_irc.io); - ev_io_start(eventloop, &w_stdin.io); - } - - void stop(){ - trace("Stopping the Eventloop"); - ev_io_stop(eventloop, &w_tele.io); - ev_io_stop(eventloop, &w_irc.io); - ev_io_stop(eventloop, &w_stdin.io); - ev_break(eventloop, EVBREAK_ALL); - } - - void run(){ - info("Running the event-loop"); - ev_run(eventloop,0); - } - - void disconnect(){ - telegram.disconnect(); - } - - void save(File f){ - trace("Saving to file ",f.name); - foreach(link; lut.links){ - f.writeln(link[0], " ", link[1].to!string); - } - } - void load(File f){ - trace("loading from file ",f.name); - foreach(l; f.byLine.filter!(a=>a.length>0)){ - auto split=l.findSplit(" "); - link(split[0].idup, split[2].to!long); - } - } -} - - -int main(string[] args){ - Bot b=Bot(args[1]); - b.initialize(); - if(exists("savefile")){ - auto f=File("savefile", "r"); - b.load(f); - f.close(); - } - b.connect(); - scope(failure){ - b.quit(); - } - b.setupEventloop(); - b.start(); - b.run(); - scope(exit){ - auto f=File("savefile", "w+"); - b.save(f); - f.close(); - } - - return 0; - -} -