BastliBridge

A bot framework bridgin multiple IM protocols, and mail
git clone git://xatko.vsos.ethz.ch/BastliBridge.git
Log | Files | Refs | Submodules

commit f57c01763e414adf8363424d1253a2deb95cc34c
parent 5fb71932fc7e3efef7fe6a78adb03334aef0412e
Author: Dominik Schmidt <das1993@hotmail.com>
Date:   Fri, 24 Aug 2018 17:14:53 +0200

Generalize the BastliBridge

This is a major rewrite, almost everything has been fleshed out

Diffstat:
Makefile | 4++--
src/bastlibridge/base.d | 146+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/bastlibridge/bot.d | 688+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/bastlibridge/command.d | 63+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/bastlibridge/http.d | 362-------------------------------------------------------------------------------
src/bastlibridge/interfaces/irc.d | 335+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/bastlibridge/interfaces/mail.d | 63+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/bastlibridge/interfaces/stdout.d | 48++++++++++++++++++++++++++++++++++++++++++++++++
src/bastlibridge/interfaces/telegram.d | 172+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/bastlibridge/manager.d | 232+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/bastlibridge/telegram.d | 333-------------------------------------------------------------------------------
src/bastlibridge/util.d | 29+++++++++++++++++++++++++++++
src/bastliproxy/proxy.d | 136+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/bot.d | 575-------------------------------------------------------------------------------
src/proxy.d | 136-------------------------------------------------------------------------------
src/telegram/http.d | 362+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/telegram/telegram.d | 333+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
17 files changed, 2609 insertions(+), 1408 deletions(-)

diff --git a/Makefile b/Makefile @@ -3,12 +3,12 @@ DEBUG ?= -g RELEASE ?= -release -O -d-version=StdLoggerDisableTrace DFLAGS ?= INCLUDES = Dirk/source/ src Dirk/libev Dirk/ssl -SRC := src/bot.d $(wildcard src/bastlibridge/*.d) $(wildcard Dirk/source/*/*.d) Dirk/libev/deimos/ev.d +SRC := $(wildcard src/bastlibridge/*.d) $(wildcard src/telegram/*.d) $(wildcard src/bastlibridge/*/*.d) $(wildcard Dirk/source/*/*.d) Dirk/libev/deimos/ev.d TARGET = bastlibridge LIBS = ev TARGET2 = bastliproxy -SRC2 = $(wildcard src/bastlibridge/*.d) $(wildcard Dirk/source/ssl/*.d Dirk/source/loader/*.d) src/proxy.d +SRC2 = $(wildcard src/bastliproxy/*.d) $(wildcard src/telegram/*.d) $(wildcard Dirk/source/ssl/*.d Dirk/source/loader/*.d) src/proxy.d _DFLAGS := $(addprefix -I, $(INCLUDES)) $(addprefix -L-l, $(LIBS)) $(DFLAGS) diff --git a/src/bastlibridge/base.d b/src/bastlibridge/base.d @@ -0,0 +1,146 @@ +module bastlibridge.base; +public import bastlibridge.manager; +import bastlibridge.base; +import std.datetime : SysTime,Clock; +import std.exception; +import std.concurrency; +import std.format; +import std.algorithm; +import std.array; +import std.experimental.logger; + + +abstract class Endpoint{ + alias channelID=const(char)[]; + + package Manager manager; + package string name; + package string cmdline; + protected SysTime lastUpdate; + + protected void heartbeat(){ + lastUpdate=Clock.currTime; + } + + package string lastSeen(){ + return format("Last update on %s: %s (%s ago)", name, + lastUpdate.toUTC.toString(), (Clock.currTime-lastUpdate).toString); + } + + this(Manager m, string args){ + manager=m; + heartbeat(); + } + + abstract void open(); + abstract Channel join(string c); + abstract void part(Channel c); + abstract void sendMessage(Message m, Channel c); + abstract void run(); + abstract void stop(); + + Channel[string] channelMap; + + Channel getChannel(in char[] c){ + auto chan=c in channelMap; + if(chan){ + return *chan; + } + return null; + } + + Channel createChannel(in char[] c){ + auto gc=getChannel(c); + if(gc) + return gc; + + auto str=c.idup; + auto newchan=join(str); + channelMap[str]=newchan; + newchan._name=str; + return newchan; + } + + Port getPort(in char[] c){ + auto chan=getChannel(c); + if(!chan){ + throw new Exception(format("Channel %s not found in endpoint", c)); + } + return getPort(chan); + } + + Port getNewPort(in char[] c){ + return getPort(createChannel(c)); + } + + Port getPort(Channel cid){ + return Port(this,cid); + } + + void thread(){ + info("Running up endpoint"); + run(); + info("Endpoint finished"); + info("Sending message to ", manager.TID); + send(manager.TID, name); + info("Sent message to parent thread"); + } + + auto ports(){ + return channelMap.byValue.map!(a=>Port(this,a)); + } + + static string logHook(string fct)(){ + return format(`auto %s` + ~`(int line = __LINE__, string file = __FILE__, string funcName = __FUNCTION__,` + ~`string prettyFuncName = __PRETTY_FUNCTION__, string moduleName = __MODULE__, ` + ~`A...)(A args){` + ~`return .%s!(line,file,funcName,prettyFuncName,moduleName)` + ~`("Endpoint ", name, ": ", args);}`, fct, fct); + } + + mixin(logHook!"trace"); + mixin(logHook!"info"); + mixin(logHook!"warning"); + mixin(logHook!"error"); + mixin(logHook!"fatal"); +} + +abstract class Channel{ + package string _name; +} + +struct Port{ + Endpoint ep; + Channel chan; + void send(Message m){ + ep.sendMessage(m,chan); + } + string toString() const{ + return format("%s:%s", ep.name, chan._name); + } +} + +abstract class Message{ + Endpoint source; + abstract const(char)[] userName(); + abstract const(char)[] getMessage(); + abstract const(char)[] getChannelName(); + abstract bool auth(); + abstract SysTime getTime(); + abstract void respond(in char[] response); +} + +struct EndpointTypes{ + private Endpoint function(Manager,string)[string] table; + + void add(T)(string str){ + table[str]=(Manager m, string args){return new T(m,args);}; + } + + Endpoint get(Manager m, string type, string args){ + return (*enforce(type in table, format("Endpoint type %s unknown", type)))(m,args); + } +} + +EndpointTypes endpointTypes; diff --git a/src/bastlibridge/bot.d b/src/bastlibridge/bot.d @@ -0,0 +1,688 @@ +//import bastlibridge.telegram; +//import bastlibridge.http; +//import deimos.ev; +module bastlibridge.bot; +import bastlibridge.manager; +import bastlibridge.interfaces.irc; +import bastlibridge.interfaces.telegram; +import bastlibridge.interfaces.mail; +import bastlibridge.interfaces.stdout; +import std.socket; +import std.json; +import std.array; +import std.getopt; +import std.conv; +import std.process : environment; +import std.file; +import std.experimental.logger; +import std.algorithm; +import std.datetime; +import std.stdio; +import core.sync.rwmutex; +import std.string; +import std.file; +import std.traits; +import std.meta; +import std.format; +import std.datetime; +import core.thread; +import std.exception; +import std.typecons; +import ssl.socket; +import std.range; +import bastlibridge.command; +import bastlibridge.base; + +/* +struct IDCounter(T){ + T[Endpoint.channelID] table; + Endpoint.channelID[T] revtable; + Endpoint.channelID counter; + + T opIndex(Endpoint.channelID id){ + return table[id]; + } + + Endpoint.channelID opIndex(T id){ + auto v=id in revtable; + if(v){ + return *v; + } + auto cnt=counter++; + table[cnt]=id; + revtable[id]=cnt; + return cnt; + } + + auto lookup(T id){ + auto tmp=counter; + auto res=opIndex(id); + return tuple(tmp!=counter, res); + } + + auto remove(T id){ + auto tmp=revtable[id]; + table.remove(tmp); + revtable.remove(id); + return tmp; + } + auto remove(Endpoint.channelID id){ + auto tmp=table[id]; + revtable.remove(tmp); + table.remove(id); + return tmp; + } + + Nullable!(Endpoint.channelID) getExisting(scope T id){ + auto v=id in revtable; + if(v){ + return nullable(*v); + } + else { + return Nullable!(Endpoint.channelID).init; + } + } + +} +*/ + + + + + +static this(){ + globalCommands.add!((Message m, in char[] a, in char[] b){m.source.manager.link(a,b);})("link"); + globalCommands.add!((Message m, in char[] a, in char[] b){m.source.manager.linkDirected(a,b);})("linkDirected"); + globalCommands.add!((Message m, in char[] b){m.source.manager.link(Port(m.source, m.source.getChannel(m.getChannelName)),b);})("linkWith"); + globalCommands.add!((Message m){m.source.stop();})("quit"); + globalCommands.add!((Message m){m.source.manager.teardown();})("teardown"); + globalCommands.add!((Message m, in char[] other){m.respond(enforce(m.source.manager.getEndpoint(other),"Endpoint unknown").endpoint.lastSeen());})("lastUpdate"); +} + +/** + * + * This struct keeps a table of strings, that generate new endpoint instances + * based on a name, e.g. "telegram" → new Telegram() + * + */ + + + +static this(){ + //endpointTypes.add!Telegram("telegram"); + endpointTypes.add!IRC("irc"); + endpointTypes.add!Mail("mail"); +} + + +void main(string[] args){ + string savefile; + auto helpInformation=getopt(args, + "savefile|s", "Specify the savefile to load initial config and save the config afterwards", &savefile); + if(helpInformation.helpWanted){ + defaultGetoptPrinter("Usage: ", helpInformation.options); + return; + } + string savefilebuf; + Manager m=new Manager(); + NullEndpoint nep=new NullEndpoint(m, ""); + m.savefile=savefile; + args[1..$].each!(a=>m.addEndpoint(a)); + if(savefile && savefile.exists){ + info("Loading configuration from ", savefile); + savefilebuf=cast(string)read(savefile); + foreach(line; savefilebuf.splitter("\n").until!"a.length==0"){ + try{ + m.addEndpoint(line); + } + catch(Exception e){ + warning(e); + } + savefilebuf=savefilebuf[line.length+1..$]; + } + } + m.run(); + if(savefilebuf){ + foreach(line; savefilebuf[1..$].splitter("\n")){ + auto s=line.findSplit(" "); + globalCommands.execute(s[0], new NullMessage(nep,line), s[2]); + } + } + m.serve(); +} + +/* + + +struct LookupTable{ + private long[][string] _irc; + private string[][long] _telegram; + + void connect(string irc, long telegram){ + _irc[irc]~=telegram; + _telegram[telegram]~=irc; + } + void disconnect(string irc, long telegram){ + auto i=irc in _irc; + *i=remove!(a=>a==telegram)(*i); + auto t=telegram in _telegram; + *t=remove!(a=>a==irc)(*t); + } + + long[] irc(in char[] irc){ + return _irc.get(cast(string)irc,[]); + } + + string[] telegram(in long telegram){ + return _telegram.get(telegram,[]); + } + auto ircChannels(){ + return _irc.byKey; + } + auto telegramChannels(){ + return _telegram.byKey; + } + + auto links(){ + return _irc.byPair.map!(a=>a[1].map!(b=>tuple(a[0],b))).joiner; + } +} + + +struct Watcher{ + ev_io io; + void delegate(int revents) cb; + extern(C) static void callback(ev_loop_t* loop, ev_io *io, int revents){ + auto watcher=cast(Watcher*) io; + watcher.cb(revents); + } + this(Socket sock, typeof(this.cb) cb){ + this(sock.handle,cb); + } + this(int fd, typeof(this.cb) cb){ + this.cb=cb; + ev_io_init(&io, &callback, fd, EV_READ); + } +} + +struct Bot{ + Socket ircSocket; + Socket controlSocket; + string controlPath; + Address controlAddress; + IrcClient ircClient; + Telegram telegram, telegram_listener; + LookupTable lut; + Watcher w_irc,w_tele,w_ctrl; + ev_loop_t *eventloop; + string proxy_url; + uint max_retries = 5; + + string[long] telegram_channels; + + private Address delegate() _ircAddressgen; + + @property void ircAddresses(Range)(Range r) if(isInputRange!(Range)&&is(ElementType!Range: Address)){ + static if(isInfinite!Range){ + _ircAddressgen=(){ + auto res=r.front; + r.popFront(); + return res; + }; + } + else{ + ircAddresses(r.cycle); + } + } + @property auto ircAddresses(){ + import std.range:generate; + return generate!(()=>_ircAddressgen()); + } + + Address ircAddress(){ + return _ircAddressgen(); + } + + struct LastUpdate{ + SysTime Telegram,IRC; + void updateTelegram(){ + Telegram=Clock.currTime(); + } + void updateIRC(){ + IRC=Clock.currTime(); + } + string toString(string separator=", "){ + auto now=Clock.currTime; + string str=format!( + "Last Update received from IRC: %s UTC, that is %s ago%s"~ + "Last update received from Telegram: %s UTC, that is %s ago" + )(IRC.toUTC.toString(), (now-IRC).toString, + separator, + Telegram.toUTC.toString, (now-Telegram).toString); + return str; + } + } + + LastUpdate lastUpdate; + + this(string tgramApiKey){ + telegram=Telegram("253031348:AAGn4dHCPwFDSN4Bmt4ZWc3bIAmARmzsf9k"); + telegram_listener=Telegram("253031348:AAGn4dHCPwFDSN4Bmt4ZWc3bIAmARmzsf9k"); + } + + static immutable string unknown_username="Unknown"; + static string TelegramUser(JSONValue User){ + string username=unknown_username; + foreach(key; ["username", "first_name", "last_name"]){ + if(key in User){ + username=User[key].str; + break; + } + } + return username; + } + + string formatProxyURL(string file_id){ + return proxy_url~file_id; + } + + string locationToOSMUrl(JSONValue loc){ + return locationToOSMUrl(loc["latitude"].floating, loc["longitude"].floating); + } + string locationToOSMUrl(float lat, float lng){ + return format!"https://www.openstreetmap.org/#map=%f/%f"(lat, lng); + } + + string TelegramUserToIRC(JSONValue user){ + dchar mode=' '; + if(user["is_bot"].type==JSON_TYPE.TRUE){ + mode='*'; + } + return "<"~mode.to!string~TelegramUser(user)~">"; + } + + void TelegramToIRC(JSONValue Message, ref Appender!string app){ + string username="< "~unknown_username~">"; + if(auto from="from" in Message){ + app~=TelegramUserToIRC(*from); + app~=" "; + } + if(auto fwd="forward_from" in Message){ + app~=TelegramUserToIRC(*fwd); + app~=" "; + } + foreach(t; ["text", "caption"]){ + if(auto tv=t in Message){ + app~=(*tv).str; + app~=" "; + } + } + + if(auto photo="photo" in Message){ + string fid; + long size=long.min; + foreach(p; (*photo).array){ + if(p["file_size"].integer>size){ + size=p["file_size"].integer; + fid=p["file_id"].str; + } + } + app~=formatProxyURL(fid); + app~=" "; + } + + foreach(t; ["audio", "document", "sticker", "video", "voice"]){ + if(auto tv=t in Message){ + app~=formatProxyURL((*tv)["file_id"].str); + app~=" "; + } + } + if(auto location="location" in Message){ + app~=locationToOSMUrl(*location); + app~=" "; + } + if("contact" in Message){ + auto c=Message["contact"]; + app~=c["first_name"].str; + app~=" "; + if("last_name" in c){ + app~=c["last_name"].str; + app~=" "; + } + app~="("; + app~=c["phone_number"].str; + app~=") "; + } + if(auto venue="venue" in Message){ + app~=(*venue).str; + app~=locationToOSMUrl((*venue)["location"]); + app~=" "; + } + if(auto jv="reply_to_message" in Message){ + TelegramToIRC(*jv, app); + } + } + + string TelegramToIRC(JSONValue Message){ + Appender!string app; + + TelegramToIRC(Message, app); + + return app.data[0..$-1]; + } + + void initialize(){ + auto af=ircAddress.addressFamily; + ircSocket=new SslSocket(af); + ircClient=new IrcClient(ircSocket); + + controlAddress = new UnixAddress(controlPath); + controlSocket = new Socket(AddressFamily.UNIX, SocketType.STREAM); + + //lut.connect("#bastli_wasserstoffreakteur", -8554836); + + + ircClient.realName="Bastli"; + ircClient.userName="bastli"; + ircClient.nickName="bastli"; + ircClient.onConnect~=((){ + info("Connected to IRC on ",ircClient.serverAddress); + ev_io_start(eventloop, &w_tele.io); + telegram_listener.triggerUpdates(); + foreach(k;lut.ircChannels){ + ircClient.join(k); + } + }); + ircClient.onMessage~=(u,tt,m){ + trace("IRC received message from "~u.nickName~" on "~tt~": "~m); + string msg=cast(string)m; + if(msg.skipOver("!bastli ")){ + trace("Message is a command"); + try{ + auto split=msg.findSplit(" "); + switch(split[0]){ + case "links": + ircClient.send(tt,lut.irc(tt).map!(a=>telegram.getChatName(a)~"("~a.to!string~")").join(", ")); + break; + case "link": + //link(tt.idup, split[2].to!long); + break; + case "unlink": + unlink(tt.idup, split[2].to!long); + break; + case "list_telegram": + ircClient.send(tt, telegram_channels.byPair.map!(a=>a[1]~"("~a[0].to!string~")").join(", ")); + break; + case "lastUpdate": + ircClient.send(tt, lastUpdate.toString(", ")); + break; + default: + ircClient.send(tt, "Unknown command"); + break; + } + } + catch(Exception e){ + ircClient.send(tt, "Error while executing command: "~e.msg); + warning(e.to!string); + } + } + else{ + foreach(chatid; lut.irc(tt)){ + telegram.send(chatid, "< "~u.nickName~"> "~m); + } + } + }; + + + telegram_listener.onMessage~=(j){ + trace("Received Message "~j.toPrettyString()); + auto jv="text" in j; + string msg=""; + if(jv){ + msg=j["text"].str; + } + + auto chatid=j["chat"]["id"].integer; + if(chatid !in telegram_channels){ + if("title" in j["chat"]){ + telegram_channels[chatid]=j["chat"]["title"].str; + } + else{ + telegram_channels[chatid]=TelegramUser(j["chat"]); + } + } + void forward_msg(){ + auto irc_chans=lut.telegram(chatid); + if(irc_chans.length==0){ + warning("Input on not-connected telegram-channel "~j["chat"].toPrettyString); + } + foreach(chan; irc_chans){ + ircClient.send(chan, TelegramToIRC(j)); + } + } + if(msg.front=='/'){ + trace("Bot command received"); + try{ + auto split=msg.findSplit(" "); + auto cmdbot=split[0].findSplit("@"); + if(cmdbot[1].length!=0 && cmdbot[2]!=telegram.botName){ + trace("Not one of our commands"); + forward_msg(); + } + else{ + switch(cmdbot[0][1..$]){ + case "link": + link(split[2], chatid); + ircClient.notice(split[2], "Linked with "~chatid.to!string); + break; + case "links": + telegram.send(chatid, lut.telegram(chatid).join(", ")); + break; + case "unlink": + unlink(split[2], chatid); + ircClient.notice(split[2], "Unlinked with "~chatid.to!string); + break; + case "lastUpdate": + telegram.send(chatid, lastUpdate.toString("\n")); + break; + default: + //telegram.send(chatid, split[0]~": Unknown command"); + break; + } + } + } + catch(Exception e){ + telegram.send(chatid, "Error executing command: "~e.msg); + warning(e.to!string); + } + } + else{ + forward_msg(); + } + }; + } + + void unlink(string irc, long telegram){ + info("Unlinking ", irc, " from ", telegram); + lut.disconnect(irc, telegram); + if(ircClient.connected()){ + if(lut.irc(irc).length==0){ + ircClient.part(irc); + } + } + } + + void link(string irc, long telegram){ + info("Linking ", irc, " with ", telegram); + lut.connect(irc, telegram); + if(ircClient.connected()){ + ircClient.join(irc); + } + } + + void connect(){ + info("Connecting to Telegram"); + tryNTimes!(()=>telegram.connect(), (e)=>warning(e))(max_retries); + tryNTimes!(()=>telegram_listener.connect(), (e)=>warning(e))(max_retries); + + //ircClient.connect(new InternetAddress("127.0.0.1",6667)); + info("Connecting to IRC on ", ircAddress.toString); + tryNTimes!(()=>ircClient.connect(ircAddress), (e)=>warning(e))(max_retries); + + if(controlPath.exists()){ + remove(controlPath); + } + controlSocket.bind(controlAddress); + controlSocket.listen(10); + } + + void setupEventloop(){ + eventloop=ev_default_loop(0); + + w_irc=Watcher(ircSocket,(revents){ + trace("IRC Update"); + lastUpdate.updateIRC(); + + if(ircClient.read()){ + ev_io_stop(eventloop, &w_irc.io); + } + }); + + w_tele=Watcher(telegram_listener.sock,(revents){ + trace("Telegram Update"); + lastUpdate.updateTelegram(); + + Exception e; + bool res; + try res=telegram_listener.read(); + catch(TelegramException te) error(te); + if(res){ + ev_io_stop(eventloop, &w_tele.io); + ev_io_set(&w_tele.io, telegram_listener.sock.handle, EV_READ); + ev_io_start(eventloop, &w_tele.io); + } + else{ + telegram_listener.triggerUpdates(); + } + }); + + w_ctrl=Watcher(controlSocket.handle, (revents){ + auto ns=controlSocket.accept(); + trace("New control connection "); + Watcher *nw; + nw=new Watcher(ns.handle, (rrevents){ + char[512] buf; + auto ret=ns.receive(buf); + if(ret==0){ + ev_io_stop(eventloop, &nw.io); + delete nw; + } + auto line=buf[0..ret]; + trace("New control command ",buf); + foreach(l; line.splitter("\n")){ + auto split=l.findSplit(" "); + switch(split[0]){ + case "quit": + quit(); + break; + case "link": + auto split2=split[2].findSplit(" "); + link(split2[0].idup, split2[2].to!long); + break; + case "links": + foreach(i,tt; lut._irc.byPair){ + foreach(t; tt){ + ns.send("%s <-> %s\n".format(i,t)); + } + } + break; + case "unlink": + break; + case "": + break; + default: + ns.send(l~": unknown command\n"); + } + } + }); + ev_io_start(eventloop, &nw.io); + }); + } + + void quit(){ + info("Quitting the bot"); + ircClient.quit("Exiting gracefully"); + disconnect(); + stop(); + } + + void start(){ + info("Starting the event-listeners"); + ev_io_start(eventloop, &w_irc.io); + ev_io_start(eventloop, &w_ctrl.io); + } + + void stop(){ + trace("Stopping the Eventloop"); + ev_io_stop(eventloop, &w_tele.io); + ev_io_stop(eventloop, &w_irc.io); + ev_io_stop(eventloop, &w_ctrl.io); + ev_break(eventloop, EVBREAK_ALL); + } + + void run(){ + info("Running the event-loop"); + ev_run(eventloop,0); + } + + void disconnect(){ + telegram.disconnect(); + telegram_listener.disconnect(); + controlSocket.close(); + std.file.remove(controlPath); + } + + void save(File f){ + trace("Saving to file ",f.name); + foreach(link; lut.links){ + f.writeln(link[0], " ", link[1].to!string); + } + } + void load(File f){ + trace("loading from file ",f.name); + foreach(l; f.byLine.filter!(a=>a.length>0)){ + auto split=l.findSplit(" "); + link(split[0].idup, split[2].to!long); + } + } +} + + +int main(string[] args){ + Bot b=Bot(args[1]); + b.proxy_url=args[2]; + b.controlPath=args[3]; + b.ircAddresses=getAddress(args[4],args[5].to!ushort); + b.initialize(); + if(exists("savefile")){ + auto f=File("savefile", "r"); + b.load(f); + f.close(); + } + b.connect(); + scope(failure){ + b.quit(); + } + b.setupEventloop(); + b.start(); + b.run(); + scope(exit){ + auto f=File("savefile", "w+"); + b.save(f); + f.close(); + } + + return 0; + +} +*/ diff --git a/src/bastlibridge/command.d b/src/bastlibridge/command.d @@ -0,0 +1,63 @@ +module bastlibridge.command; +import bastlibridge.base; +import bastlibridge.manager; +import std.datetime : SysTime; +import std.exception; +import std.format; +import std.meta; +import std.conv; +import std.algorithm; +import std.traits; +import std.experimental.logger; + +class CommandException : Exception{ + this(string msg, string file= __FILE__, size_t line=__LINE__, Throwable next=null){ + super(msg,file,line,next); + } +} + +struct CommandMachine{ + struct Function{ + void function(Message m, in char[] args) func; + bool need_auth=false; + } + Function[string] commands; + + auto execute(in char[] name, Message m, in char[] args){ + auto cmd=enforce!CommandException(name in commands, format("Command %s unknown", name)); + assert(cmd); + if(cmd.need_auth && !m.auth()){ + throw new CommandException("Permission denied"); + } + return cmd.func(m, args); + } + + static void wrapperFunction(alias T)(Message m, in char[] args) if(isCallable!T){ + auto s=args.splitter(" "); + auto pop(){ + if(s.empty) + throw new Exception("Too few arguments given"); + auto ret=s.front; + s.popFront(); + return ret; + } + alias fct=staticMap!(to,Parameters!T[1..$]); + + auto ref evaluate(alias Func)(){ + return Func(pop()); + } + + try{ + T(m, staticMap!(evaluate, fct)); + } + catch(Exception e){ + m.respond(e.msg); + warning(e.toString); + } + } + + void add(alias T)(string name) if(isCallable!T && is(Parameters!T[0] == Message)){ + commands[name]=Function(&wrapperFunction!T, hasUDA!(T, "admin")); + } +} +CommandMachine globalCommands; diff --git a/src/bastlibridge/http.d b/src/bastlibridge/http.d @@ -1,362 +0,0 @@ -module bastlibridge.http; - -import std.stdio; -import std.array; -import std.typecons; -import std.socket; -import std.format; -import std.range; -import std.algorithm; -import std.meta; -import std.conv; -import std.traits; -import std.exception; -import std.experimental.logger; - -@safe: - -class OutBuffer{ - ubyte[] buffer; - size_t offset; - - invariant(){ - assert(offset <= buffer.length); - } - - void reserve(size_t len) - in{ - assert(offset + len >= offset); - } - out{ - assert(offset + len <= buffer.length); - } - body{ - if(offset+len>buffer.length){ - buffer.length=(offset + len)*2; - } - } - alias put = write; - - ubyte[] write(const(ubyte)[] bytes){ - reserve(bytes.length); - auto slice=buffer[offset..offset+bytes.length]; - slice[]=bytes[]; - offset+=bytes.length; - return slice; - } - - unittest{ - ubyte[] test=iota(0,255).map!(a=>cast(ubyte)a).array; - OutBuffer ob; - ob.write(test); - assert(ob.toBytes == test); - } - - void write(const(char)[] bytes){ - write(cast(const(ubyte)[])bytes); - } - void write(in ubyte b){ - reserve(ubyte.sizeof); - buffer[offset]=b; - offset++; - } - - ubyte[] toBytes(){ - return buffer[0..offset]; - } - - override string toString(){ - return (cast(const(char)[])toBytes).idup; - } - -} - -private static immutable string http_version="HTTP/1.1"; -private static immutable string nl="\r\n"; - -ubyte[] receiveData(Socket sock, ubyte[] buffer, in size_t chunk_size=1024){ - ubyte[] buf=buffer; - size_t pos=0,res=0; - while(true){ - res=sock.receive(buf); - if(res==0){ - return []; - } - else if(res==Socket.ERROR){ - throw new ErrnoException("Error reading from socket"); - } - pos+=res; - if(pos<buffer.length) - break; - if(pos>=buffer.length){ - buffer.length+=chunk_size; - } - buf=buffer[pos..$]; - } - return buffer[0..pos]; -} - -Nullable!(HttpResponse!T) receiveHttpResponse(T=void)(Socket sock, ubyte[] buffer, in size_t chunk_size=1024){ - auto data=receiveData(sock,buffer,chunk_size); - if(data.length==0){ - return Nullable!(HttpResponse!T).init; - } - return nullable(HttpResponse!T.parse(data)); -} - -Nullable!(HttpRequest!T) receiveHttpRequest(T=void)(Socket sock, ubyte[] buffer, in size_t chunk_size=1024){ - auto data=receiveData(sock,buffer,chunk_size); - if(data.length==0){ - return Nullable!(HttpRequest!T).init; - } - return nullable(HttpRequest!T.parse(data)); -} - - -struct HttpRequest(T=void){ - private auto ob=appender!(ubyte[])(); - private uint state=0; - const(char)[] method; - const(char)[] url; - const(char)[] ver; - const(ubyte)[] content; - - static if(!is(T == void)){ - static assert(__traits(compiles, (){const(char)[] c=""; (T.init)[c]=c;})); - T headers; - } - - private void clear(){ - ob.clear(); - state=0; - } - - void write(T)(in T[] buf) if(is(T:ubyte)){ - ob.put(cast(const(ubyte)[])buf); - } - - void request(TM, TU)(TM method, TU url){ - clear(); - ob.formattedWrite!("%s /%s "~http_version~nl)(method,url); - state=1; - } - - void header(TN,TV)(TN name, TV value) - in{ - assert(state==1); - } - body{ - ob.formattedWrite!("%s: %s"~nl)(name, value); - } - - void header(A...)(A args) - in{ - assert(args.length%2==0); - } - body{ - - string generateMixin(){ - string str; - for(size_t i=0; i<args.length; i+=2){ - str~=format!"header(args[%d],args[%d]);"(i,i+1); - } - return str; - } - mixin(generateMixin()); - } - - void data(TD)(TD data) if(hasLength!TD) - in{ - assert(state<2); - } - body{ - header("Content-Length", data.length); - write(nl); - state=2; - write(data); - } - - void data (TD)(TD data) if(isSomeString!TD){ - import std.string:representation; - this.data(data.representation); - } - - void finalize(){ - if(state<2){ - write(nl); - state=2; - } - } - - void perform(Socket sock){ - finalize(); - writeln(cast(char[])ob.data); - sock.send(ob.data); - } - - static HttpRequest parse(const(ubyte)[] buf){ - HttpRequest r; - const(char)[] buffer=cast(const(char)[])buf; - auto lines=buffer.splitter(nl); - - auto s=lines.front.findSplit(" "); - r.method=s[0]; - s=s[2].findSplit(" "); - r.url=s[0]; - r.ver=s[2]; - - lines.popFront(); - static if(!is(T==void)){ - foreach(line; lines){ - s=line.findSplit(": "); - res._headers[s[0]]=s[2]; - } - } - if(buffer.findSkip(nl~nl)){ - r.content=cast(const(ubyte)[])buffer; - } - return r; - } - - const(char)[] bufferString() const{ - return cast(const(char)[])ob.data; - } - - string toString(){ - static if(is(T==void)){ - return format!"HttpRequest %s %s\n%s"(method, url, cast(const char[])content); - } - else{ - return format!"HttpRequest %s %s\n%s\n%s"(method, url, headers.to!string, cast(const char[])content); - } - } -} - -struct CopyAssoc{ - const(char)[][string] assoc; - mixin Proxy!assoc; - - auto opIndexAssign(in char[] value, in char[] key){ - assoc[key.idup]=value; - return value; - } -} - -struct HttpResponse(T=void){ - private auto ob=appender!(ubyte[])(); - string ver; - ushort code; - string codename; - private uint state; - - static if(!is(T == void)){ - static assert(__traits(compiles, (){const(char)[] c=""; (T.init)[c]=c;})); - T headers; - } - const(ubyte)[] content; - - static HttpResponse parse(const(ubyte)[] data){ - const(char)[] buf=cast(const char[]) data; - HttpResponse res; - auto lines=buf.splitter(nl); - auto status=lines.front; - auto split=status.splitter(" "); - - res.ver=split.front.idup; - split.popFront(); - res.code=split.front.to!ushort; - split.popFront(); - res.codename=split.joiner.to!string; - static if(!is(T==void)){ - lines.popFront(); - foreach(line;lines.until!(a=>a.length==0)){ - auto headsplit=line.findSplit(": "); - res.headers[headsplit[0]]=headsplit[2]; - } - } - - res.content=cast(const(ubyte)[])buf.findSplit("\r\n\r\n")[2]; - return res; - } - - void response(TN)(ushort code, TN codename){ - ob.clear(); - ob.formattedWrite!(http_version~" %d %s"~nl)(code,codename); - state=1; - } - - void header(TN,TV)(TN name, TV value) - in{ - assert(state==1); - } - body{ - ob.formattedWrite!("%s: %s"~nl)(name, value); - } - - void header(A...)(A args) - in{ - assert(args.length%2==0); - } - body{ - - string generateMixin(){ - string str; - for(size_t i=0; i<args.length; i+=2){ - str~=format!"header(args[%d],args[%d]);"(i,i+1); - } - return str; - } - mixin(generateMixin()); - } - - void write(T)(in T[] buf) if(is(T:ubyte)){ - ob.put(cast(const(ubyte)[])buf); - } - - void data(TD)(TD data) if(hasLength!TD) - in{ - assert(state<2); - } - body{ - header("Content-Length", data.length); - write(nl); - state=2; - write(data); - } - - void data (TD)(TD data) if(isSomeString!TD){ - import std.string:representation; - this.data(data.representation); - } - - void finalize(){ - if(state<2){ - write(nl); - state=2; - } - } - - void perform(Socket sock){ - finalize(); - writeln(bufferString); - sock.send(ob.data); - } - - const(char)[] str() const{ - return cast(const(char)[]) content; - } - - const(char)[] bufferString() const{ - return cast(const(char)[])ob.data; - } - - string toString(){ - static if(is(T==void)){ - return format!"HttpResponse %d %s\n%s"(code, codename, cast(const char[])content); - } - else{ - return format!"HttpResponse %d %s\n%s\n%s"(code, codename, headers.to!string, cast(const char[])content); - } - } -} - diff --git a/src/bastlibridge/interfaces/irc.d b/src/bastlibridge/interfaces/irc.d @@ -0,0 +1,335 @@ +module bastlibridge.interfaces.irc; +import bastlibridge.base; +import bastlibridge.manager; +import bastlibridge.command; +import bastlibridge.interfaces.telegram; +import std.socket; +import std.exception; +import std.typecons; +import irc.client; +import std.algorithm; +import std.range; +import std.json; +import std.conv; +import std.format; +import std.array; +import std.datetime : SysTime,Clock; +import std.experimental.logger; + + +void waitForData(Socket sock) @trusted{ + version(Windows){ + static assert(false); + } + import core.sys.posix.sys.socket; + ubyte[4] buf; + recv(sock.handle, buf.ptr, buf.length, MSG_PEEK); +} + +final class IRCMessage : Message{ + import irc.client; + IrcUser user; + const(char)[] msg,channel; + SysTime dt; + + this(IrcUser u, in char[] msg, in char[] channel){ + this.user=u; + this.msg=msg; + this.channel=channel; + dt=Clock.currTime(); + } + + override const(char)[] getMessage(){ + return msg; + } + + override const(char)[] userName(){ + return user.nickName; + } + + override const(char)[] getChannelName(){ + if(channel==(cast(IRC)source).ircClient.nickName){ + return user.nickName; + } + else{ + return channel; + } + } + + override SysTime getTime(){ + return dt; + } + + override bool auth(){ + return user.nickName=="Doeme"; + } + + override void respond(in char[] r){ + auto ep=cast(IRC)source; + const(char)[] chan=getChannelName(); + trace("Sending response for ", msg, " on ", channel, " to ", chan, ": ", r); + ep.send(chan, r); + } +} + +final class IRCChannel:Channel{ + +} + +final class IRC : Endpoint{ + import ssl.socket; + import irc.url; + import irc.client; + static import irc.url; + + private IrcClient ircClient; + private Socket sock; + private string commandPrefix; + + Address addr; + + private __gshared bool shutdown=false; + + + private{ + static immutable string unknown_username="Unknown"; + string proxy_url; + string formatProxyURL(string file_id){ + return proxy_url~file_id; + } + + string locationToOSMUrl(JSONValue loc){ + return locationToOSMUrl(loc["latitude"].floating, loc["longitude"].floating); + } + string locationToOSMUrl(float lat, float lng){ + return format!"https://www.openstreetmap.org/#map=%f/%f"(lat, lng); + } + + + static string TelegramUserToIRC(JSONValue user, TelegramMessage msg){ + dchar mode=' '; + if(user["is_bot"].type==JSON_TYPE.TRUE){ + mode='*'; + } + return "<"~mode.to!string~msg.userName().idup~">"; + } + + void TelegramToIRC(TelegramMessage msg, ref Appender!string app){ + TelegramToIRC(msg, app, msg.json); + } + void TelegramToIRC(TelegramMessage msg, ref Appender!string app, JSONValue m){ + string username="< "~unknown_username~">"; + if(auto from="from" in m){ + app~=TelegramUserToIRC(*from,msg); + app~=" "; + } + if(auto fwd="forward_from" in m){ + app~=TelegramUserToIRC(*fwd,msg); + app~=" "; + } + foreach(t; ["text", "caption"]){ + if(auto tv=t in m){ + app~=(*tv).str; + app~=" "; + } + } + + if(auto photo="photo" in m){ + string fid; + long size=long.min; + foreach(p; (*photo).array){ + if(p["file_size"].integer>size){ + size=p["file_size"].integer; + fid=p["file_id"].str; + } + } + app~=formatProxyURL(fid); + app~=" "; + } + + foreach(t; ["audio", "document", "sticker", "video", "voice"]){ + if(auto tv=t in m){ + app~=formatProxyURL((*tv)["file_id"].str); + app~=" "; + } + } + if(auto location="location" in m){ + app~=locationToOSMUrl(*location); + app~=" "; + } + if("contact" in m){ + auto c=m["contact"]; + app~=c["first_name"].str; + app~=" "; + if("last_name" in c){ + app~=c["last_name"].str; + app~=" "; + } + app~="("; + app~=c["phone_number"].str; + app~=") "; + } + if(auto venue="venue" in m){ + app~=(*venue).str; + app~=locationToOSMUrl((*venue)["location"]); + app~=" "; + } + if(auto jv="reply_to_message" in m){ + TelegramToIRC(msg, app, *jv); + } + } + string TelegramToIRC(TelegramMessage m){ + Appender!string app; + + TelegramToIRC(m, app); + + return app.data[0..$-1]; + } + } + + this(Manager m, string args){ + super(m,args); + + string nick,username,realname; + bool have_addr=false; + ConnectionInfo info; + + foreach(arg; args.splitter(",")){ + auto s=arg.findSplit("="); + switch(s[0]){ + case "proxyurl": + proxy_url=s[2]; + break; + case "realname": + realname=s[2]; + break; + case "username": + username=s[2]; + break; + case "nick": + nick=s[2]; + break; + default: + info = irc.url.parse(cast(string)s[0]); + have_addr=true; + break; + } + } + + enforce(have_addr, "You have to provide an URL to connect to with irc://address"); + enforce(nick, "You have to provide at least a nickname with nick=foo"); + if(!username) username=nick; + if(!realname) realname=nick; + + this.commandPrefix=("!"~nick); + + addr = getAddress(info.address, info.port).front; + auto af=addr.addressFamily; + if(info.secure){ + sock=new SslSocket(af); + } + else{ + sock=new TcpSocket(af); + } + ircClient = new IrcClient(sock); + ircClient.nickName=nick; + ircClient.userName(username); + ircClient.realName(realname); + ircClient.onMessage~=&onInput; + } + + void onInput(IrcUser user, in char[] channel, in char[] msg){ + trace("Got message from ",user.nickName, " in ", channel, ": ", msg); + heartbeat(); + auto msg2=scoped!(IRCMessage)(user, msg, channel); + msg2.source=this; + if(msg.startsWith(this.commandPrefix)){ + trace("Is a command"); + onCommand(msg2); + } + else{ + trace("Is a message"); + onMessage(msg2); + } + } + + void onMessage(IRCMessage msg){ + auto chan=getChannel(msg.getChannelName()); + if(!chan){ + info("Got message on non-connected channel ", msg.channel); + return; + } + this.manager.distribute(Port(this,chan), msg); + } + + void onCommand(IRCMessage msg){ + auto s=msg.msg[commandPrefix.length+1..$].findSplit(" "); + globalCommands.execute(s[0], msg, s[2]); + } + + override void open(){ + info("Connecting to IRC on ", addr.toString); + ircClient.connect(addr); + } + + override Channel join(string c){ + if(c[0]=='#'){ + trace("Joining irc channel ", c); + ircClient.join(c); + } + else{ + trace("Not joining query ", c); + } + return new IRCChannel(); + } + + override void part(Channel c){ + auto ic=cast(IRCChannel)c; + ircClient.part(ic._name); + } + + override void sendMessage(scope Message m, Channel chan){ + auto ichan=cast(IRCChannel)chan; + trace("Delivering message of type ",typeid(m)," to channel", ichan._name); + if(cast(TelegramMessage)m){ + send(ichan, chain("< ",m.userName(), "> ", m.getMessage())); + } + else{ + send(ichan, chain("< ",m.userName(), "> ", m.getMessage())); + } + } + + protected void send(T)(IRCChannel c, T text){ + send(c._name, text); + } + + protected void send(T)(in char[] channel, T text){ + synchronized(ircClient){ + ircClient.send(channel, text); + } + } + + override void run(){ + import ssl.openssl; + loadOpenSSL(); //We have to do this once for every thread + info("Listening for IRC messages on"); + ubyte[1] buf; + while(true){ + waitForData(sock); + info("IRC message received"); + if(shutdown) + break; + synchronized(ircClient){ + if(ircClient.read()){ + break; + } + } + } + } + override void stop(){ + synchronized(this) + shutdown=true; + synchronized(ircClient) + ircClient.quit("Planned shutdown"); + } +} diff --git a/src/bastlibridge/interfaces/mail.d b/src/bastlibridge/interfaces/mail.d @@ -0,0 +1,63 @@ +module bastlibridge.interfaces.mail; +import bastlibridge.base; +import std.process; +import std.format; +import std.algorithm; +import std.concurrency; + + +class Mail: Endpoint{ + string shellcmd; + __gshared Tid tid; + + this(Manager m, string args){ + super(m,args); + shellcmd=args.idup; + } + + override void open(){ + + } + + override Channel join(string c){ + return new MailAddress(); + } + + override void part(Channel c){ + + } + + override void sendMessage(Message m, Channel c){ + trace("Sending mail to ", c._name, " with command \"", shellcmd, `"`); + auto pipe=pipeShell(shellcmd); + trace("Process ", pipe.pid, " opened"); + formattedWrite(pipe.stdin.lockingTextWriter, "To: %s\nSubject: New message on %s\n\n%s", c._name, m.getChannelName, m.getMessage()); + trace("Text written to stdin"); + pipe.stdin.close(); + pipe.stdout.close(); + trace("Closed pipes, waiting for termination of ", pipe.pid); + if(pipe.pid.wait()!=0){ + warning("Sending mail failed"); + } + else{ + trace("Mail successfully sent"); + } + } + + override void run(){ + synchronized(this){ + tid=thisTid(); + } + while(!receiveOnly!bool()){} + } + + override void stop(){ + synchronized(this){ + send(tid, true); + } + } +} + +class MailAddress : Channel{ + +} diff --git a/src/bastlibridge/interfaces/stdout.d b/src/bastlibridge/interfaces/stdout.d @@ -0,0 +1,48 @@ +module bastlibridge.interfaces.stdout; +import bastlibridge.base; +import std.process; +import std.experimental.logger; +import std.datetime: SysTime, Clock; + +class NullMessage : Message{ + string msg; + this(Endpoint ep, string msg){ + this.msg=msg; + this.source=ep; + } + override const(char)[] userName(){ + return environment["USER"]; + } + override const(char)[] getMessage(){ + return msg; + } + override const(char)[] getChannelName(){ + return "stdio"; + } + override bool auth(){ + return true; + } + override SysTime getTime(){ + return Clock.currTime; + } + override void respond(in char[] response){ + warning(response); + } +} + +class NullEndpoint : Endpoint{ + + this(Manager m, string args){ + super(m,args); + } + override void open(){} + override Channel join(string c){return new NullChan();} + override void part(Channel c){} + override void sendMessage(Message m, Channel c){info(m.getMessage);} + override void run(){} + override void stop(){} +} + +class NullChan : Channel{ + +} diff --git a/src/bastlibridge/interfaces/telegram.d b/src/bastlibridge/interfaces/telegram.d @@ -0,0 +1,172 @@ +module bastlibridge.interfaces.telegram; +static import tg=telegram.telegram; +import bastlibridge.base; +import std.conv; +import std.format; +import core.sync.mutex; +import std.array; +import std.datetime : SysTime, Clock; +import std.json; +import std.algorithm; + +class Telegram: Endpoint{ + private tg.Telegram telegram, telegram_listener; + Mutex telegram_lock, listener_lock; + + this(Manager m, string args){ + super(m,args); + + telegram=tg.Telegram(args); + telegram_listener=tg.Telegram(args); + + telegram_lock=new Mutex(); + listener_lock=new Mutex(); + + telegram_listener.onMessage~=(j){ + trace("Received Message "~j.toPrettyString()); + auto jv="text" in j; + string msg=""; + if(jv){ + msg=j["text"].str; + } + + auto chatid=j["chat"]["id"].integer; + auto chan=chatid in telegram_channels; + auto get_msg(){ + return new TelegramMessage(j); + } + void forward_msg(){ + if(!chan){ + warning("Message to unknown chat ", chatid); + } + manager.distribute(Port(this, *chan), get_msg()); + } + + if(msg.front=='/'){ + trace("Bot command received"); + try{ + auto split=msg.findSplit(" "); + auto cmdbot=split[0].findSplit("@"); + if(cmdbot[1].length!=0 && cmdbot[2]!=telegram.botName){ + trace("Not one of our commands"); + forward_msg(); + } + else{ + + } + } + catch(Exception e){ + + warning(e.to!string); + } + } + else{ + trace("Normal message"); + forward_msg(); + } + }; + } + + override void open(){ + telegram.connect(); + telegram_listener.connect(); + } + + private TelegramChannel[long] telegram_channels; + override Channel join(string c){ + auto id=c.to!long; + auto chan=new TelegramChannel(id); + telegram_channels[id]=chan; + return chan; + } + + override void part(Channel c){ + + } + + override void sendMessage(Message m, Channel c){ + send((cast(TelegramChannel)c).id, "<"~m.userName()~"> "~m.getMessage()); + } + + private __gshared bool shutdown=false; + + override void run(){ + while(!shutdown){ + telegram_listener.triggerUpdates(); + } + } + + override void stop(){ + shutdown=true; + telegram_listener.disconnect(); + synchronized(telegram_lock) + telegram.disconnect(); + } + + void send(long id, in char[] msg){ + synchronized(telegram_lock){ + telegram.send(id, msg); + } + } +} + +class TelegramChannel : Channel{ + long id; + this(long id){ + this.id=id; + } +} + +final class TelegramMessage : Message{ + JSONValue json; + + @property Telegram _source(){ + return cast(Telegram)(source); + } + + this(JSONValue json){ + this.json=json; + } + static string TelegramUser(JSONValue User){ + string username="unknown"; + foreach(key; ["username", "first_name", "last_name"]){ + if(key in User){ + username=User[key].str; + break; + } + } + return username; + } + override const(char)[] userName(){ + if(auto from="from" in json){ + return TelegramUser(*from); + } + return ""; + } + + override const(char)[] getMessage(){ + if(auto t="text" in json){ + return t.str; + } + return ""; + } + override const(char)[] getChannelName(){ + auto chat=json["chat"]; + if(auto t="title" in chat){ + return t.str; + } + return json["id"].integer.to!string; + } + + override bool auth(){ + return false; + } + + override SysTime getTime(){ + return SysTime(); + } + + override void respond(in char[] response){ + _source.send(json["chat"]["id"].integer, response); + } +} diff --git a/src/bastlibridge/manager.d b/src/bastlibridge/manager.d @@ -0,0 +1,232 @@ +module bastlibridge.manager; +import bastlibridge.base; +import std.format; +import core.thread; +import core.sync.rwmutex; +import std.array; +import std.exception; +import std.range; +import std.stdio; +import std.concurrency; +import std.algorithm; +import std.experimental.logger; + + + +class Manager{ + struct Process{ + Endpoint endpoint; + Thread thread; + this(Endpoint ep){ + endpoint=ep; + thread=new Thread(&ep.thread); + } + void stop(){ + endpoint.stop(); + } + void start(){ + endpoint.open(); + thread.start(); + } + } + + private Process[string] endpoints; + + private LookupTable LUT; + + private ReadWriteMutex LUT_lock; + private ReadWriteMutex endpoints_lock; + public string savefile; + + package __gshared Tid TID; + + this(){ + LUT_lock=new ReadWriteMutex(); + endpoints_lock=new ReadWriteMutex(); + TID=thisTid(); + } + + /** + * Syntax: name=basetype:argstring + */ + void addEndpoint(string cmdline){ + string cmdlinetmp=cmdline; + string name,basetype,args; + cmdline.formattedRead!"%s=%s:%s"(name,basetype,args); + if(name in endpoints){ + throw new Exception(format("Endpoint %s already defined", name)); + } + auto ep=endpointTypes.get(this, basetype, args); + ep.name=name; + ep.cmdline=cmdlinetmp; + synchronized(endpoints_lock.writer){ + endpoints[name]=Process(ep); + } + } + + void dumpConfig(scope void delegate(in char[] s) writer){ + synchronized(endpoints_lock.reader){ + foreach(ep; endpoints.byValue){ + writer(ep.endpoint.cmdline); + writer("\n"); + } + } + writer("\n"); + LUT.dumpConfig(writer); + } + + void distribute(Port src, Message msg){ + synchronized(LUT_lock.reader){ + LUT[src] + .each!(a=>a.send(msg)); + } + } + + auto getEndpoint(in char[] name){ + synchronized(endpoints_lock.reader){ + return name in endpoints; + } + } + + void run(){ + trace("Firing up the threads"); + endpoints.byValue.each!(a=>a.start()); + } + + auto serve(){ + trace("waiting for messages"); + while(true){ + string name=receiveOnly!string(); + trace("Manager is terminating thread ", name); + getEndpoint(name).thread.join; + removeEndpoint(name); + trace("Current endpoint count ", endpoints.length); + if(endpoints.length==0){ + break; + } + } + info("All processes stopped. We will now exit"); + } + + void terminate(in char[] name){ + auto val=getEndpoint(name); + if(val){ + val.stop(); + } + } + + void removeEndpoint(in char[] name){ + trace("Removing endpoint ", name); + synchronized(endpoints_lock.reader){ + synchronized(LUT_lock.writer){ + foreach(port; endpoints[name].endpoint.ports){ + LUT.disconnectAll(port); + } + } + } + synchronized(endpoints_lock.writer){ + endpoints.remove(cast(string)name); + } + } + + void teardown(){ + synchronized(endpoints_lock.reader){ + endpoints.byValue.each!(a=>a.endpoint.stop()); + } + } + + Port getPort(bool create=false)(in char[] a){ + auto s=a.findSplit(":"); + if(s[2].length==0){ + throw new Exception(format(`Endpoint specifier "%s" malformed`, a)); + } + + Process* ep=enforce(getEndpoint(s[0]), format("Endpoint %s not found", s[0])); + static if(create){ + return ep.endpoint.getNewPort(s[2]); + } + else{ + return ep.endpoint.getPort(s[2]); + } + } + + + void save(string file){ + info("Saving configuration into ", file); + File f=File(file, "w+"); + scope(exit)f.close(); + dumpConfig((in char[] t)=>f.write(t)); + } + + static string LUTMap(string func1, string func2, bool create=false)(){ + return format!(` + void %s(TA, TB)(TA a, TB b){ + Port gp(T)(T t){ + static if(is(T == Port)){ + return t; + } + else{ + return getPort!(%s)(t); + } + } + Port pa=gp(a),pb=gp(b); + trace("Doing a %s on ", a, " and ", b); + synchronized(LUT_lock.writer){ + LUT.%s(pa,pb); + } + if(savefile) + save(savefile); + }`)(func1, create, func2, func2); + } + mixin(LUTMap!("link", "connect", true)()); + mixin(LUTMap!("linkDirected", "connectDirected", true)()); + mixin(LUTMap!("unlink", "disconnect", false)()); + mixin(LUTMap!("unlinkDirected", "disconnectDirected", false)()); +} + +struct LookupTable{ + + private Port[][Port] lut; + + void connect(Port a, Port b){ + connectDirected(a,b); + connectDirected(b,a); + } + + void connectDirected(Port a, Port b){ + lut[a]~=b; + } + + void disconnectAll(Port a){ + lut.remove(a); + foreach(ref ports; lut.byValue){ + ports=ports.remove!(p=>p==a)(); + } + } + + void disconnect(Port a, Port b){ + disconnectDirected(a,b); + disconnectDirected(b,a); + } + + void disconnectDirected(Port a, Port b){ + auto i=a in lut; + if(!i){ + return; + } + *i=remove!(a=>a==b)(*i); + } + + void dumpConfig(scope void delegate(in char[] s) writer){ + foreach(port; lut.byPair){ + auto from=port[0]; + foreach(to; port[1]){ + formattedWrite(writer,"linkDirected %s %s\n", from.toString, to.toString); + } + } + } + + auto opIndex(Port a){ + return lut[a]; + } +} diff --git a/src/bastlibridge/telegram.d b/src/bastlibridge/telegram.d @@ -1,333 +0,0 @@ -module bastlibridge.telegram; - -import bastlibridge.http; -import std.stdio; -import std.socket; -import std.range; -import std.algorithm; -import std.format; -import std.conv; -import std.experimental.logger; -import std.traits; -import ssl.socket; -import std.file; -import std.typecons; -import std.string; -import std.json; -import std.exception; -import std.datetime :SysTime,Clock; - -class TelegramErrnoException : ErrnoException{ - this(string msg, string file=__FILE__, size_t line=__LINE__){ - super(msg,file,line); - } -} -class TelegramException : Exception{ - this(string msg, string file=__FILE__, size_t line=__LINE__, Throwable next=null){ - super(msg,file,line,next); - } -} - -class TelegramError : Error{ - this(string msg, string file=__FILE__, size_t line=__LINE__, Throwable next=null){ - super(msg,file,line,next); - } -} - -ubyte[] receiveDynamic(Socket sock, ubyte[] buffer, size_t chunk_size=1024){ - ubyte[] buf=buffer; - size_t pos=0,res=0; - while(true){ - trace("Reading on ", sock.handle, "(", sock , ") ", " with buffer ", buf.ptr, " of length ", buf.length); - res=sock.receive(buf); - if(res==0){ - info("Socket ", sock.handle, " died"); - return []; - } - else if(res==Socket.ERROR){ - throw new ErrnoException("Error reading from socket"); - } - pos+=res; - if(pos<buffer.length) - break; - if(pos>=buffer.length){ - buffer.length+=chunk_size; - } - buf=buffer[pos..$]; - } - return buffer[0..pos]; -} - -struct Telegram{ - string token=""; - enum ApiAddr="api.telegram.org"; - enum ApiPort=443; - - long lastUpdate=-1; - - SslSocket sock; - - private Address ApiAddrObj; - - private HttpRequest!void httpRequest; - private static immutable httpBufferChunk=1024; - private ubyte[] httpBuffer=new ubyte[httpBufferChunk]; - - @disable this(); - this(string token){ - this.token=token; - } - - void connect(bool test=true){ - if(!ApiAddrObj){ - getApiAddress(); - } - trace("Opening socket to Telegram on "~ApiAddrObj.to!string); - auto af=ApiAddrObj.addressFamily; - sock=new SslSocket(af); - sock.connect(ApiAddrObj); - scope(failure){ - sock.close(); - } - if(test){ - //Try whether the socket works - trace("Trying to get the Botname via newly opened socket "~sock.handle.to!string); - botName=getBotName(); - } - } - - void getApiAddress(){ - ApiAddrObj=getAddress(ApiAddr,ApiPort).front; - } - - - void disconnect(){ - sock.shutdown(SocketShutdown.BOTH); - sock.close(); - delete sock; - } - - auto getURI(in char[] method){ - return chain("bot",token,"/",method); - } - - void httpRequestHeaders(ref HttpRequest!void httpRequest){ - httpRequest.header("Host", ApiAddr); - httpRequest.header("Connection", "keep-alive"); - } - - void http(in char[] method){ - httpRequest.request("GET", getURI(method)); - httpRequestHeaders(httpRequest); - httpRequest.perform(sock); - } - - void post(T)(in char[] method, T data){ - httpRequest.request("POST", getURI(method)); - httpRequestHeaders(httpRequest); - httpRequest.header("Content-Type", "application/x-www-form-urlencoded"); - httpRequest.data(data); - httpRequest.perform(sock); - } - - void json(in char[] method, in JSONValue jv){ - httpRequest.request("POST", getURI(method)); - httpRequestHeaders(httpRequest); - httpRequest.header("Content-Type", "application/json"); - httpRequest.data(jv.toString); - httpRequest.perform(sock); - } - - /** - * Receives all available data from the socket. - * The returned buffer gets reused, if you want it immutable, - * use .idup on it. - */ - const(char)[] receive() - in - { - assert(sock !is null); - assert(sock.isAlive); - } - body{ - static immutable size_t chunk_size=1024; - - static ubyte[] buffer=new ubyte[chunk_size]; //This initialization is static, i.e. only done once - return cast(char[])sock.receiveDynamic(buffer, chunk_size); - } - - void checkCode(T)(in HttpResponse!T res){ - if(res.code!=200){ - throw new TelegramException(format!"Server returned %d\n%s"(res.code, res.str)); - } - } - - auto getJSON(T)(in HttpResponse!T res){ - auto j=parseJSON(res.str); - if(j["ok"].type != JSON_TYPE.TRUE){ - throw new TelegramException("API request failed: "~j["description"].str); - } - return j["result"]; - } - - Nullable!(JSONValue) response(){ - auto h=receiveHttpResponse(sock, httpBuffer, httpBufferChunk); - trace("Got response from Telegram: ", h.to!string); - if(h.isNull){ - //throw new TelegramError("Socket closed"); - warning("Telegram closed _another_ socket :/ ",sock.handle); - reconnectSocket(); - return Nullable!JSONValue.init; - } - checkCode(h); - return Nullable!JSONValue(getJSON(h)); - } - - void reconnectSocket(){ - trace("Reconnecting socket ", sock.handle); - disconnect(); - connect(); - trace("Reconnected socket, is now ", sock.handle); - } - - struct HTTP{ - string ver; - ushort code; - string code_name; - const(char)[] content; - - static HTTP parse(const(char)[] buf) - in{ - assert(buf.length>0); - } - body{ - HTTP res; - trace("Parseing "~buf); - auto lines=buf.splitter("\r\n"); - auto status=lines.front; - auto split=status.splitter(" "); - - res.ver=split.front.idup; - split.popFront(); - res.code=split.front.to!ushort; - split.popFront(); - res.code_name=split.joiner.to!string; - - res.content=buf.findSplit("\r\n\r\n")[2]; - trace("Parsed to "~res.to!string); - return res; - } - - void checkCode(){ - if(code!=200){ - throw new TelegramException(format!"Server returned %d\n%s"(code, content)); - } - } - - auto getJSON(){ - import std.json; - - auto j=parseJSON(content); - if(j["ok"].type != JSON_TYPE.TRUE){ - throw new TelegramException("API request failed: "~j["description"].str); - } - return j["result"]; - } - } - - bool updateRunning=false; - - void triggerUpdates(uint timeout=600){ - if(updateRunning){ - warning("Update called in a row without a reply"); - return; - } - updateRunning=true; - trace("Triggering Update with timeout ", timeout); - string params=format!"offset=%d&timeout=%d"(lastUpdate+1,timeout); - post("getUpdates",params); - } - - void delegate(JSONValue jv)[] onMessage; - - bool read(){ - updateRunning=false; - auto updates=response(); - if(updates.isNull){ - triggerUpdates(); - return true; - } - foreach(size_t i, update; updates){ - lastUpdate=max(lastUpdate, update["update_id"].integer); - if("message" in update){ - onMessage.each!(a=>a(update["message"])); - } - } - - return false; - } - - void send(in long chatid, in char[] buf){ - trace("Sending "~buf~" to "~chatid.to!string); - JSONValue j; - j.object=null; - j["chat_id"] = JSONValue(chatid); - j["text"] = JSONValue(buf); - trace("Sending telegram message "~j.toPrettyString); - json("sendMessage", j); - auto res=response(); - if(res.isNull){ - send(chatid,buf); - } - } - - JSONValue getChat(T)(in T chatid) if(isNarrowString!T || is(T==long)){ - JSONValue j; - j.object=null; - j["chat_id"]=JSONValue(chatid); - json("getChat", j); - auto res=response(); - if(res.isNull){ - return getChat(chatid); - } - return res.get; - } - long getChatId(in char[] chatid){ - return getChat(chatid)["id"].integer; - } - string getChatName(long chatid){ - auto j=getChat(chatid); - if("title" in j){ - //We are in a group - return j["title"].str; - } - //We are in a query - return j["username"].str; - } - - string botName; - - string getBotName(){ - http("getMe"); - string _botName=response()["username"].str; - return _botName; - } - - JSONValue getFile(in char[] file_id){ - JSONValue j; - j.object=null; - j["file_id"]=file_id; - json("getFile", j); - auto r=response(); - if(r.isNull){ - return getFile(file_id); - } - j=r.get; - auto v="file_path" in j; - if(v) - return j; - else - throw new TelegramException(cast(string)("Path of file "~file_id~" was not found")); - } -} - diff --git a/src/bastlibridge/util.d b/src/bastlibridge/util.d @@ -0,0 +1,29 @@ +module bastlibridge.util; +import std.exception; +import std.range; + +void tryNTimes(alias func, alias errhandle)(uint N){ + Exception last_e; + foreach(n;iota(0,N)){ + try{ + func(); + } + catch(Exception e){ + errhandle(e); + last_e=e; + continue; + } + return; + } + throw last_e; +} +unittest{ + uint i=0; + auto func=(){ + i++; + throw new Exception("derp"); + }; + import std.exception:assertThrown; + assertThrown(tryNTimes!(func, (e)=>assert(e.msg=="derp"))(5)); + assert(i==5); +} diff --git a/src/bastliproxy/proxy.d b/src/bastliproxy/proxy.d @@ -0,0 +1,136 @@ +import std.socket; +import std.stdio; +import std.conv:to; +import std.typecons; +import std.experimental.logger; +import std.range; +import bastlibridge.http; +import bastlibridge.telegram; + +auto telegram=Nullable!Telegram.init; +ubyte[] buf=new ubyte[1024]; + +immutable string index=` + ____ ___ _____________ ____ + / __ )/ | / ___/_ __/ / / _/ + / __ / /| | \__ \ / / / / / / + / /_/ / ___ |___/ // / / /____/ / +/_____/_/ |_/____//_/ /_____/___/ +`c; + +immutable string fof=` + _ _ ___ _ _ +| || | / _ \| || | +| || |_| | | | || |_ +|__ _| |_| |__ _| + |_| \___/ |_| +`c; + +immutable string teapot=r" + _ + _,(_)._ + ___,(_______). + ,'__. \ /\_ + /,' / \ / / +| | | |,' / + \`.| / + `. : : / + `. :.,' + `-.________,-' +"c; + +void bridge(Socket ins, Socket outs, ubyte[] buf){ + trace("Bridging between sockets"); + size_t total=0; + size_t ret=0; + do{ + ret=ins.receive(buf); + total+=ret; + outs.send(buf[0..ret]); + }while(ret==buf.length); + trace("Bridged a total of ", total, " bytes"); +} + +void handleClient(Socket newsock){ + scope(exit){ + newsock.close(); + } + trace("Handling client "~newsock.remoteAddress.to!string); + static HttpResponse!void res; + auto req=receiveHttpRequest(newsock, buf); + if(req.isNull){ + info("Socket gone"); + return; + } + writeln(req); + void not_found(){ + res.response(404, "Not Found"); + res.header("Connection","close"); + res.header("Content-Type","text/plain"); + res.data(fof); + res.perform(newsock); + } + if(req.url=="/"){ + res.response(200, "OK"); + res.header("Content-Type","text/plain"); + res.header("Connection","close"); + res.data(index); + res.perform(newsock); + } + else if(req.url[1]!='?'){ + not_found(); + } + else{ + JSONValue file; + try{ + file=telegram.getFile(req.url[2..$]); + } + catch(TelegramException e){ + not_found(); + return; + } + catch(Exception e){ + res.response(418, "I'm a teapot"); + res.header("Connection", "close"); + res.header("Content-Type", "text/plain"); + res.data(teapot); + res.perform(newsock); + return; + } + string path=file["file_path"].str; + static HttpRequest!void r2; + r2.request("GET", chain("file/bot", telegram.token, "/", path)); + r2.header("Host", telegram.ApiAddr); + r2.header("Connection", "close"); + r2.perform(telegram.sock); + ubyte[1024] buffer; + bridge(telegram.sock, newsock, buffer); + } +} + +void main(string[] args){ + if(args.length!=4){ + stderr.writeln("Usage: ", args[0], " <TELEGRAM_TOKEN> <IP> <PORT>"); + } + + Address addr=parseAddress(args[2], args[3].to!ushort); + telegram=Telegram(args[1]); + telegram.connect(); + scope(exit){ + telegram.disconnect(); + } + info("Listening on "~addr.toString); + + Socket server=new TcpSocket(); + info("Binding to Address"); + server.bind(addr); + scope(exit){ + server.close(); + } + info("Listening"); + server.listen(10); + while(true){ + auto newsock=server.accept(); + handleClient(newsock); + } +} diff --git a/src/bot.d b/src/bot.d @@ -1,575 +0,0 @@ -import bastlibridge.telegram; -import bastlibridge.http; -import deimos.ev; -import std.socket; -import std.json; -import std.array; -import std.conv; -import std.experimental.logger; -import std.algorithm; -import std.datetime; -import std.stdio; -import std.string; -import std.file; -import std.typecons; -import irc.client; -import ssl.socket; -import std.range; - -void tryNTimes(alias func, alias errhandle)(uint N){ - Exception last_e; - foreach(n;iota(0,N)){ - try{ - func(); - } - catch(Exception e){ - errhandle(e); - last_e=e; - continue; - } - return; - } - throw last_e; -} -unittest{ - uint i=0; - auto func=(){ - i++; - throw new Exception("derp"); - }; - import std.exception:assertThrown; - assertThrown(tryNTimes!(func, (e)=>assert(e.msg=="derp"))(5)); - assert(i==5); -} - -struct LookupTable{ - private long[][string] _irc; - private string[][long] _telegram; - - void connect(string irc, long telegram){ - _irc[irc]~=telegram; - _telegram[telegram]~=irc; - } - void disconnect(string irc, long telegram){ - auto i=irc in _irc; - *i=remove!(a=>a==telegram)(*i); - auto t=telegram in _telegram; - *t=remove!(a=>a==irc)(*t); - } - - long[] irc(in char[] irc){ - return _irc.get(cast(string)irc,[]); - } - - string[] telegram(in long telegram){ - return _telegram.get(telegram,[]); - } - auto ircChannels(){ - return _irc.byKey; - } - auto telegramChannels(){ - return _telegram.byKey; - } - - auto links(){ - return _irc.byPair.map!(a=>a[1].map!(b=>tuple(a[0],b))).joiner; - } -} - - -struct Watcher{ - ev_io io; - void delegate(int revents) cb; - extern(C) static void callback(ev_loop_t* loop, ev_io *io, int revents){ - auto watcher=cast(Watcher*) io; - watcher.cb(revents); - } - this(Socket sock, typeof(this.cb) cb){ - this(sock.handle,cb); - } - this(int fd, typeof(this.cb) cb){ - this.cb=cb; - ev_io_init(&io, &callback, fd, EV_READ); - } -} - -struct Bot{ - Socket ircSocket; - Socket controlSocket; - string controlPath; - Address controlAddress; - IrcClient ircClient; - Telegram telegram, telegram_listener; - LookupTable lut; - Watcher w_irc,w_tele,w_ctrl; - ev_loop_t *eventloop; - string proxy_url; - uint max_retries = 5; - - string[long] telegram_channels; - - private Address delegate() _ircAddressgen; - - @property void ircAddresses(Range)(Range r) if(isInputRange!(Range)&&is(ElementType!Range: Address)){ - static if(isInfinite!Range){ - _ircAddressgen=(){ - auto res=r.front; - r.popFront(); - return res; - }; - } - else{ - ircAddresses(r.cycle); - } - } - @property auto ircAddresses(){ - import std.range:generate; - return generate!(()=>_ircAddressgen()); - } - - Address ircAddress(){ - return _ircAddressgen(); - } - - struct LastUpdate{ - SysTime Telegram,IRC; - void updateTelegram(){ - Telegram=Clock.currTime(); - } - void updateIRC(){ - IRC=Clock.currTime(); - } - string toString(string separator=", "){ - auto now=Clock.currTime; - string str=format!( - "Last Update received from IRC: %s UTC, that is %s ago%s"~ - "Last update received from Telegram: %s UTC, that is %s ago" - )(IRC.toUTC.toString(), (now-IRC).toString, - separator, - Telegram.toUTC.toString, (now-Telegram).toString); - return str; - } - } - - LastUpdate lastUpdate; - - this(string tgramApiKey){ - telegram=Telegram("253031348:AAGn4dHCPwFDSN4Bmt4ZWc3bIAmARmzsf9k"); - telegram_listener=Telegram("253031348:AAGn4dHCPwFDSN4Bmt4ZWc3bIAmARmzsf9k"); - } - - static immutable string unknown_username="Unknown"; - static string TelegramUser(JSONValue User){ - string username=unknown_username; - foreach(key; ["username", "first_name", "last_name"]){ - if(key in User){ - username=User[key].str; - break; - } - } - return username; - } - - string formatProxyURL(string file_id){ - return proxy_url~file_id; - } - - string locationToOSMUrl(JSONValue loc){ - return locationToOSMUrl(loc["latitude"].floating, loc["longitude"].floating); - } - string locationToOSMUrl(float lat, float lng){ - return format!"https://www.openstreetmap.org/#map=%f/%f"(lat, lng); - } - - string TelegramUserToIRC(JSONValue user){ - dchar mode=' '; - if(user["is_bot"].type==JSON_TYPE.TRUE){ - mode='*'; - } - return "<"~mode.to!string~TelegramUser(user)~">"; - } - - void TelegramToIRC(JSONValue Message, ref Appender!string app){ - string username="< "~unknown_username~">"; - if(auto from="from" in Message){ - app~=TelegramUserToIRC(*from); - app~=" "; - } - if(auto fwd="forward_from" in Message){ - app~=TelegramUserToIRC(*fwd); - app~=" "; - } - foreach(t; ["text", "caption"]){ - if(auto tv=t in Message){ - app~=(*tv).str; - app~=" "; - } - } - - if(auto photo="photo" in Message){ - string fid; - long size=long.min; - foreach(p; (*photo).array){ - if(p["file_size"].integer>size){ - size=p["file_size"].integer; - fid=p["file_id"].str; - } - } - app~=formatProxyURL(fid); - app~=" "; - } - - foreach(t; ["audio", "document", "sticker", "video", "voice"]){ - if(auto tv=t in Message){ - app~=formatProxyURL((*tv)["file_id"].str); - app~=" "; - } - } - if(auto location="location" in Message){ - app~=locationToOSMUrl(*location); - app~=" "; - } - if("contact" in Message){ - auto c=Message["contact"]; - app~=c["first_name"].str; - app~=" "; - if("last_name" in c){ - app~=c["last_name"].str; - app~=" "; - } - app~="("; - app~=c["phone_number"].str; - app~=") "; - } - if(auto venue="venue" in Message){ - app~=(*venue).str; - app~=locationToOSMUrl((*venue)["location"]); - app~=" "; - } - if(auto jv="reply_to_message" in Message){ - TelegramToIRC(*jv, app); - } - } - - string TelegramToIRC(JSONValue Message){ - Appender!string app; - - TelegramToIRC(Message, app); - - return app.data[0..$-1]; - } - - void initialize(){ - auto af=ircAddress.addressFamily; - ircSocket=new SslSocket(af); - ircClient=new IrcClient(ircSocket); - - controlAddress = new UnixAddress(controlPath); - controlSocket = new Socket(AddressFamily.UNIX, SocketType.STREAM); - - //lut.connect("#bastli_wasserstoffreakteur", -8554836); - - - ircClient.realName="Bastli"; - ircClient.userName="bastli"; - ircClient.nickName="bastli"; - ircClient.onConnect~=((){ - info("Connected to IRC on ",ircClient.serverAddress); - ev_io_start(eventloop, &w_tele.io); - telegram_listener.triggerUpdates(); - foreach(k;lut.ircChannels){ - ircClient.join(k); - } - }); - ircClient.onMessage~=(u,tt,m){ - trace("IRC received message from "~u.nickName~" on "~tt~": "~m); - string msg=cast(string)m; - if(msg.skipOver("!bastli ")){ - trace("Message is a command"); - try{ - auto split=msg.findSplit(" "); - switch(split[0]){ - case "links": - ircClient.send(tt,lut.irc(tt).map!(a=>telegram.getChatName(a)~"("~a.to!string~")").join(", ")); - break; - case "link": - //link(tt.idup, split[2].to!long); - break; - case "unlink": - unlink(tt.idup, split[2].to!long); - break; - case "list_telegram": - ircClient.send(tt, telegram_channels.byPair.map!(a=>a[1]~"("~a[0].to!string~")").join(", ")); - break; - case "lastUpdate": - ircClient.send(tt, lastUpdate.toString(", ")); - break; - default: - ircClient.send(tt, "Unknown command"); - break; - } - } - catch(Exception e){ - ircClient.send(tt, "Error while executing command: "~e.msg); - warning(e.to!string); - } - } - else{ - foreach(chatid; lut.irc(tt)){ - telegram.send(chatid, "< "~u.nickName~"> "~m); - } - } - }; - - - telegram_listener.onMessage~=(j){ - trace("Received Message "~j.toPrettyString()); - auto jv="text" in j; - string msg=""; - if(jv){ - msg=j["text"].str; - } - - auto chatid=j["chat"]["id"].integer; - if(chatid !in telegram_channels){ - if("title" in j["chat"]){ - telegram_channels[chatid]=j["chat"]["title"].str; - } - else{ - telegram_channels[chatid]=TelegramUser(j["chat"]); - } - } - void forward_msg(){ - auto irc_chans=lut.telegram(chatid); - if(irc_chans.length==0){ - warning("Input on not-connected telegram-channel "~j["chat"].toPrettyString); - } - foreach(chan; irc_chans){ - ircClient.send(chan, TelegramToIRC(j)); - } - } - if(msg.front=='/'){ - trace("Bot command received"); - try{ - auto split=msg.findSplit(" "); - auto cmdbot=split[0].findSplit("@"); - if(cmdbot[1].length!=0 && cmdbot[2]!=telegram.botName){ - trace("Not one of our commands"); - forward_msg(); - } - else{ - switch(cmdbot[0][1..$]){ - case "link": - link(split[2], chatid); - ircClient.notice(split[2], "Linked with "~chatid.to!string); - break; - case "links": - telegram.send(chatid, lut.telegram(chatid).join(", ")); - break; - case "unlink": - unlink(split[2], chatid); - ircClient.notice(split[2], "Unlinked with "~chatid.to!string); - break; - case "lastUpdate": - telegram.send(chatid, lastUpdate.toString("\n")); - break; - default: - //telegram.send(chatid, split[0]~": Unknown command"); - break; - } - } - } - catch(Exception e){ - telegram.send(chatid, "Error executing command: "~e.msg); - warning(e.to!string); - } - } - else{ - forward_msg(); - } - }; - } - - void unlink(string irc, long telegram){ - info("Unlinking ", irc, " from ", telegram); - lut.disconnect(irc, telegram); - if(ircClient.connected()){ - if(lut.irc(irc).length==0){ - ircClient.part(irc); - } - } - } - - void link(string irc, long telegram){ - info("Linking ", irc, " with ", telegram); - lut.connect(irc, telegram); - if(ircClient.connected()){ - ircClient.join(irc); - } - } - - void connect(){ - info("Connecting to Telegram"); - tryNTimes!(()=>telegram.connect(), (e)=>warning(e))(max_retries); - tryNTimes!(()=>telegram_listener.connect(), (e)=>warning(e))(max_retries); - - //ircClient.connect(new InternetAddress("127.0.0.1",6667)); - info("Connecting to IRC on ", ircAddress.toString); - tryNTimes!(()=>ircClient.connect(ircAddress), (e)=>warning(e))(max_retries); - - if(controlPath.exists()){ - remove(controlPath); - } - controlSocket.bind(controlAddress); - controlSocket.listen(10); - } - - void setupEventloop(){ - eventloop=ev_default_loop(0); - - w_irc=Watcher(ircSocket,(revents){ - trace("IRC Update"); - lastUpdate.updateIRC(); - - if(ircClient.read()){ - ev_io_stop(eventloop, &w_irc.io); - } - }); - - w_tele=Watcher(telegram_listener.sock,(revents){ - trace("Telegram Update"); - lastUpdate.updateTelegram(); - - Exception e; - bool res; - try res=telegram_listener.read(); - catch(TelegramException te) error(te); - if(res){ - ev_io_stop(eventloop, &w_tele.io); - ev_io_set(&w_tele.io, telegram_listener.sock.handle, EV_READ); - ev_io_start(eventloop, &w_tele.io); - } - else{ - telegram_listener.triggerUpdates(); - } - }); - - w_ctrl=Watcher(controlSocket.handle, (revents){ - auto ns=controlSocket.accept(); - trace("New control connection "); - Watcher *nw; - nw=new Watcher(ns.handle, (rrevents){ - char[512] buf; - auto ret=ns.receive(buf); - if(ret==0){ - ev_io_stop(eventloop, &nw.io); - delete nw; - } - auto line=buf[0..ret]; - trace("New control command ",buf); - foreach(l; line.splitter("\n")){ - auto split=l.findSplit(" "); - switch(split[0]){ - case "quit": - quit(); - break; - case "link": - auto split2=split[2].findSplit(" "); - link(split2[0].idup, split2[2].to!long); - break; - case "links": - foreach(i,tt; lut._irc.byPair){ - foreach(t; tt){ - ns.send("%s <-> %s\n".format(i,t)); - } - } - break; - case "unlink": - break; - case "": - break; - default: - ns.send(l~": unknown command\n"); - } - } - }); - ev_io_start(eventloop, &nw.io); - }); - } - - void quit(){ - info("Quitting the bot"); - ircClient.quit("Exiting gracefully"); - disconnect(); - stop(); - } - - void start(){ - info("Starting the event-listeners"); - ev_io_start(eventloop, &w_irc.io); - ev_io_start(eventloop, &w_ctrl.io); - } - - void stop(){ - trace("Stopping the Eventloop"); - ev_io_stop(eventloop, &w_tele.io); - ev_io_stop(eventloop, &w_irc.io); - ev_io_stop(eventloop, &w_ctrl.io); - ev_break(eventloop, EVBREAK_ALL); - } - - void run(){ - info("Running the event-loop"); - ev_run(eventloop,0); - } - - void disconnect(){ - telegram.disconnect(); - telegram_listener.disconnect(); - controlSocket.close(); - std.file.remove(controlPath); - } - - void save(File f){ - trace("Saving to file ",f.name); - foreach(link; lut.links){ - f.writeln(link[0], " ", link[1].to!string); - } - } - void load(File f){ - trace("loading from file ",f.name); - foreach(l; f.byLine.filter!(a=>a.length>0)){ - auto split=l.findSplit(" "); - link(split[0].idup, split[2].to!long); - } - } -} - - -int main(string[] args){ - Bot b=Bot(args[1]); - b.proxy_url=args[2]; - b.controlPath=args[3]; - b.ircAddresses=getAddress(args[4],args[5].to!ushort); - b.initialize(); - if(exists("savefile")){ - auto f=File("savefile", "r"); - b.load(f); - f.close(); - } - b.connect(); - scope(failure){ - b.quit(); - } - b.setupEventloop(); - b.start(); - b.run(); - scope(exit){ - auto f=File("savefile", "w+"); - b.save(f); - f.close(); - } - - return 0; - -} diff --git a/src/proxy.d b/src/proxy.d @@ -1,136 +0,0 @@ -import std.socket; -import std.stdio; -import std.conv:to; -import std.typecons; -import std.experimental.logger; -import std.range; -import bastlibridge.http; -import bastlibridge.telegram; - -auto telegram=Nullable!Telegram.init; -ubyte[] buf=new ubyte[1024]; - -immutable string index=` - ____ ___ _____________ ____ - / __ )/ | / ___/_ __/ / / _/ - / __ / /| | \__ \ / / / / / / - / /_/ / ___ |___/ // / / /____/ / -/_____/_/ |_/____//_/ /_____/___/ -`c; - -immutable string fof=` - _ _ ___ _ _ -| || | / _ \| || | -| || |_| | | | || |_ -|__ _| |_| |__ _| - |_| \___/ |_| -`c; - -immutable string teapot=r" - _ - _,(_)._ - ___,(_______). - ,'__. \ /\_ - /,' / \ / / -| | | |,' / - \`.| / - `. : : / - `. :.,' - `-.________,-' -"c; - -void bridge(Socket ins, Socket outs, ubyte[] buf){ - trace("Bridging between sockets"); - size_t total=0; - size_t ret=0; - do{ - ret=ins.receive(buf); - total+=ret; - outs.send(buf[0..ret]); - }while(ret==buf.length); - trace("Bridged a total of ", total, " bytes"); -} - -void handleClient(Socket newsock){ - scope(exit){ - newsock.close(); - } - trace("Handling client "~newsock.remoteAddress.to!string); - static HttpResponse!void res; - auto req=receiveHttpRequest(newsock, buf); - if(req.isNull){ - info("Socket gone"); - return; - } - writeln(req); - void not_found(){ - res.response(404, "Not Found"); - res.header("Connection","close"); - res.header("Content-Type","text/plain"); - res.data(fof); - res.perform(newsock); - } - if(req.url=="/"){ - res.response(200, "OK"); - res.header("Content-Type","text/plain"); - res.header("Connection","close"); - res.data(index); - res.perform(newsock); - } - else if(req.url[1]!='?'){ - not_found(); - } - else{ - JSONValue file; - try{ - file=telegram.getFile(req.url[2..$]); - } - catch(TelegramException e){ - not_found(); - return; - } - catch(Exception e){ - res.response(418, "I'm a teapot"); - res.header("Connection", "close"); - res.header("Content-Type", "text/plain"); - res.data(teapot); - res.perform(newsock); - return; - } - string path=file["file_path"].str; - static HttpRequest!void r2; - r2.request("GET", chain("file/bot", telegram.token, "/", path)); - r2.header("Host", telegram.ApiAddr); - r2.header("Connection", "close"); - r2.perform(telegram.sock); - ubyte[1024] buffer; - bridge(telegram.sock, newsock, buffer); - } -} - -void main(string[] args){ - if(args.length!=4){ - stderr.writeln("Usage: ", args[0], " <TELEGRAM_TOKEN> <IP> <PORT>"); - } - - Address addr=parseAddress(args[2], args[3].to!ushort); - telegram=Telegram(args[1]); - telegram.connect(); - scope(exit){ - telegram.disconnect(); - } - info("Listening on "~addr.toString); - - Socket server=new TcpSocket(); - info("Binding to Address"); - server.bind(addr); - scope(exit){ - server.close(); - } - info("Listening"); - server.listen(10); - while(true){ - auto newsock=server.accept(); - handleClient(newsock); - } -} diff --git a/src/telegram/http.d b/src/telegram/http.d @@ -0,0 +1,362 @@ +module telegram.http; + +import std.stdio; +import std.array; +import std.typecons; +import std.socket; +import std.format; +import std.range; +import std.algorithm; +import std.meta; +import std.conv; +import std.traits; +import std.exception; +import std.experimental.logger; + +@safe: + +class OutBuffer{ + ubyte[] buffer; + size_t offset; + + invariant(){ + assert(offset <= buffer.length); + } + + void reserve(size_t len) + in{ + assert(offset + len >= offset); + } + out{ + assert(offset + len <= buffer.length); + } + body{ + if(offset+len>buffer.length){ + buffer.length=(offset + len)*2; + } + } + alias put = write; + + ubyte[] write(const(ubyte)[] bytes){ + reserve(bytes.length); + auto slice=buffer[offset..offset+bytes.length]; + slice[]=bytes[]; + offset+=bytes.length; + return slice; + } + + unittest{ + ubyte[] test=iota(0,255).map!(a=>cast(ubyte)a).array; + OutBuffer ob; + ob.write(test); + assert(ob.toBytes == test); + } + + void write(const(char)[] bytes){ + write(cast(const(ubyte)[])bytes); + } + void write(in ubyte b){ + reserve(ubyte.sizeof); + buffer[offset]=b; + offset++; + } + + ubyte[] toBytes(){ + return buffer[0..offset]; + } + + override string toString(){ + return (cast(const(char)[])toBytes).idup; + } + +} + +private static immutable string http_version="HTTP/1.1"; +private static immutable string nl="\r\n"; + +ubyte[] receiveData(Socket sock, ubyte[] buffer, in size_t chunk_size=1024){ + ubyte[] buf=buffer; + size_t pos=0,res=0; + while(true){ + res=sock.receive(buf); + if(res==0){ + return []; + } + else if(res==Socket.ERROR){ + throw new ErrnoException("Error reading from socket"); + } + pos+=res; + if(pos<buffer.length) + break; + if(pos>=buffer.length){ + buffer.length+=chunk_size; + } + buf=buffer[pos..$]; + } + return buffer[0..pos]; +} + +Nullable!(HttpResponse!T) receiveHttpResponse(T=void)(Socket sock, ubyte[] buffer, in size_t chunk_size=1024){ + auto data=receiveData(sock,buffer,chunk_size); + if(data.length==0){ + return Nullable!(HttpResponse!T).init; + } + return nullable(HttpResponse!T.parse(data)); +} + +Nullable!(HttpRequest!T) receiveHttpRequest(T=void)(Socket sock, ubyte[] buffer, in size_t chunk_size=1024){ + auto data=receiveData(sock,buffer,chunk_size); + if(data.length==0){ + return Nullable!(HttpRequest!T).init; + } + return nullable(HttpRequest!T.parse(data)); +} + + +struct HttpRequest(T=void){ + private auto ob=appender!(ubyte[])(); + private uint state=0; + const(char)[] method; + const(char)[] url; + const(char)[] ver; + const(ubyte)[] content; + + static if(!is(T == void)){ + static assert(__traits(compiles, (){const(char)[] c=""; (T.init)[c]=c;})); + T headers; + } + + private void clear(){ + ob.clear(); + state=0; + } + + void write(T)(in T[] buf) if(is(T:ubyte)){ + ob.put(cast(const(ubyte)[])buf); + } + + void request(TM, TU)(TM method, TU url){ + clear(); + ob.formattedWrite!("%s /%s "~http_version~nl)(method,url); + state=1; + } + + void header(TN,TV)(TN name, TV value) + in{ + assert(state==1); + } + body{ + ob.formattedWrite!("%s: %s"~nl)(name, value); + } + + void header(A...)(A args) + in{ + assert(args.length%2==0); + } + body{ + + string generateMixin(){ + string str; + for(size_t i=0; i<args.length; i+=2){ + str~=format!"header(args[%d],args[%d]);"(i,i+1); + } + return str; + } + mixin(generateMixin()); + } + + void data(TD)(TD data) if(hasLength!TD) + in{ + assert(state<2); + } + body{ + header("Content-Length", data.length); + write(nl); + state=2; + write(data); + } + + void data (TD)(TD data) if(isSomeString!TD){ + import std.string:representation; + this.data(data.representation); + } + + void finalize(){ + if(state<2){ + write(nl); + state=2; + } + } + + void perform(Socket sock){ + finalize(); + writeln(cast(char[])ob.data); + sock.send(ob.data); + } + + static HttpRequest parse(const(ubyte)[] buf){ + HttpRequest r; + const(char)[] buffer=cast(const(char)[])buf; + auto lines=buffer.splitter(nl); + + auto s=lines.front.findSplit(" "); + r.method=s[0]; + s=s[2].findSplit(" "); + r.url=s[0]; + r.ver=s[2]; + + lines.popFront(); + static if(!is(T==void)){ + foreach(line; lines){ + s=line.findSplit(": "); + res._headers[s[0]]=s[2]; + } + } + if(buffer.findSkip(nl~nl)){ + r.content=cast(const(ubyte)[])buffer; + } + return r; + } + + const(char)[] bufferString() const{ + return cast(const(char)[])ob.data; + } + + string toString(){ + static if(is(T==void)){ + return format!"HttpRequest %s %s\n%s"(method, url, cast(const char[])content); + } + else{ + return format!"HttpRequest %s %s\n%s\n%s"(method, url, headers.to!string, cast(const char[])content); + } + } +} + +struct CopyAssoc{ + const(char)[][string] assoc; + mixin Proxy!assoc; + + auto opIndexAssign(in char[] value, in char[] key){ + assoc[key.idup]=value; + return value; + } +} + +struct HttpResponse(T=void){ + private auto ob=appender!(ubyte[])(); + string ver; + ushort code; + string codename; + private uint state; + + static if(!is(T == void)){ + static assert(__traits(compiles, (){const(char)[] c=""; (T.init)[c]=c;})); + T headers; + } + const(ubyte)[] content; + + static HttpResponse parse(const(ubyte)[] data){ + const(char)[] buf=cast(const char[]) data; + HttpResponse res; + auto lines=buf.splitter(nl); + auto status=lines.front; + auto split=status.splitter(" "); + + res.ver=split.front.idup; + split.popFront(); + res.code=split.front.to!ushort; + split.popFront(); + res.codename=split.joiner.to!string; + static if(!is(T==void)){ + lines.popFront(); + foreach(line;lines.until!(a=>a.length==0)){ + auto headsplit=line.findSplit(": "); + res.headers[headsplit[0]]=headsplit[2]; + } + } + + res.content=cast(const(ubyte)[])buf.findSplit("\r\n\r\n")[2]; + return res; + } + + void response(TN)(ushort code, TN codename){ + ob.clear(); + ob.formattedWrite!(http_version~" %d %s"~nl)(code,codename); + state=1; + } + + void header(TN,TV)(TN name, TV value) + in{ + assert(state==1); + } + body{ + ob.formattedWrite!("%s: %s"~nl)(name, value); + } + + void header(A...)(A args) + in{ + assert(args.length%2==0); + } + body{ + + string generateMixin(){ + string str; + for(size_t i=0; i<args.length; i+=2){ + str~=format!"header(args[%d],args[%d]);"(i,i+1); + } + return str; + } + mixin(generateMixin()); + } + + void write(T)(in T[] buf) if(is(T:ubyte)){ + ob.put(cast(const(ubyte)[])buf); + } + + void data(TD)(TD data) if(hasLength!TD) + in{ + assert(state<2); + } + body{ + header("Content-Length", data.length); + write(nl); + state=2; + write(data); + } + + void data (TD)(TD data) if(isSomeString!TD){ + import std.string:representation; + this.data(data.representation); + } + + void finalize(){ + if(state<2){ + write(nl); + state=2; + } + } + + void perform(Socket sock){ + finalize(); + writeln(bufferString); + sock.send(ob.data); + } + + const(char)[] str() const{ + return cast(const(char)[]) content; + } + + const(char)[] bufferString() const{ + return cast(const(char)[])ob.data; + } + + string toString(){ + static if(is(T==void)){ + return format!"HttpResponse %d %s\n%s"(code, codename, cast(const char[])content); + } + else{ + return format!"HttpResponse %d %s\n%s\n%s"(code, codename, headers.to!string, cast(const char[])content); + } + } +} + diff --git a/src/telegram/telegram.d b/src/telegram/telegram.d @@ -0,0 +1,333 @@ +module telegram.telegram; + +import telegram.http; +import std.stdio; +import std.socket; +import std.range; +import std.algorithm; +import std.format; +import std.conv; +import std.experimental.logger; +import std.traits; +import ssl.socket; +import std.file; +import std.typecons; +import std.string; +import std.json; +import std.exception; +import std.datetime :SysTime,Clock; + +class TelegramErrnoException : ErrnoException{ + this(string msg, string file=__FILE__, size_t line=__LINE__){ + super(msg,file,line); + } +} +class TelegramException : Exception{ + this(string msg, string file=__FILE__, size_t line=__LINE__, Throwable next=null){ + super(msg,file,line,next); + } +} + +class TelegramError : Error{ + this(string msg, string file=__FILE__, size_t line=__LINE__, Throwable next=null){ + super(msg,file,line,next); + } +} + +ubyte[] receiveDynamic(Socket sock, ubyte[] buffer, size_t chunk_size=1024){ + ubyte[] buf=buffer; + size_t pos=0,res=0; + while(true){ + trace("Reading on ", sock.handle, "(", sock , ") ", " with buffer ", buf.ptr, " of length ", buf.length); + res=sock.receive(buf); + if(res==0){ + info("Socket ", sock.handle, " died"); + return []; + } + else if(res==Socket.ERROR){ + throw new ErrnoException("Error reading from socket"); + } + pos+=res; + if(pos<buffer.length) + break; + if(pos>=buffer.length){ + buffer.length+=chunk_size; + } + buf=buffer[pos..$]; + } + return buffer[0..pos]; +} + +struct Telegram{ + string token=""; + enum ApiAddr="api.telegram.org"; + enum ApiPort=443; + + long lastUpdate=-1; + + SslSocket sock; + + private Address ApiAddrObj; + + private HttpRequest!void httpRequest; + private static immutable httpBufferChunk=1024; + private ubyte[] httpBuffer=new ubyte[httpBufferChunk]; + + @disable this(); + this(string token){ + this.token=token; + } + + void connect(bool test=true){ + if(!ApiAddrObj){ + getApiAddress(); + } + trace("Opening socket to Telegram on "~ApiAddrObj.to!string); + auto af=ApiAddrObj.addressFamily; + sock=new SslSocket(af); + sock.connect(ApiAddrObj); + scope(failure){ + sock.close(); + } + if(test){ + //Try whether the socket works + trace("Trying to get the Botname via newly opened socket "~sock.handle.to!string); + botName=getBotName(); + } + } + + void getApiAddress(){ + ApiAddrObj=getAddress(ApiAddr,ApiPort).front; + } + + + void disconnect(){ + sock.shutdown(SocketShutdown.BOTH); + sock.close(); + delete sock; + } + + auto getURI(in char[] method){ + return chain("bot",token,"/",method); + } + + void httpRequestHeaders(ref HttpRequest!void httpRequest){ + httpRequest.header("Host", ApiAddr); + httpRequest.header("Connection", "keep-alive"); + } + + void http(in char[] method){ + httpRequest.request("GET", getURI(method)); + httpRequestHeaders(httpRequest); + httpRequest.perform(sock); + } + + void post(T)(in char[] method, T data){ + httpRequest.request("POST", getURI(method)); + httpRequestHeaders(httpRequest); + httpRequest.header("Content-Type", "application/x-www-form-urlencoded"); + httpRequest.data(data); + httpRequest.perform(sock); + } + + void json(in char[] method, in JSONValue jv){ + httpRequest.request("POST", getURI(method)); + httpRequestHeaders(httpRequest); + httpRequest.header("Content-Type", "application/json"); + httpRequest.data(jv.toString); + httpRequest.perform(sock); + } + + /** + * Receives all available data from the socket. + * The returned buffer gets reused, if you want it immutable, + * use .idup on it. + */ + const(char)[] receive() + in + { + assert(sock !is null); + assert(sock.isAlive); + } + body{ + static immutable size_t chunk_size=1024; + + static ubyte[] buffer=new ubyte[chunk_size]; //This initialization is static, i.e. only done once + return cast(char[])sock.receiveDynamic(buffer, chunk_size); + } + + void checkCode(T)(in HttpResponse!T res){ + if(res.code!=200){ + throw new TelegramException(format!"Server returned %d\n%s"(res.code, res.str)); + } + } + + auto getJSON(T)(in HttpResponse!T res){ + auto j=parseJSON(res.str); + if(j["ok"].type != JSON_TYPE.TRUE){ + throw new TelegramException("API request failed: "~j["description"].str); + } + return j["result"]; + } + + Nullable!(JSONValue) response(){ + auto h=receiveHttpResponse(sock, httpBuffer, httpBufferChunk); + trace("Got response from Telegram: ", h.to!string); + if(h.isNull){ + //throw new TelegramError("Socket closed"); + warning("Telegram closed _another_ socket :/ ",sock.handle); + reconnectSocket(); + return Nullable!JSONValue.init; + } + checkCode(h); + return Nullable!JSONValue(getJSON(h)); + } + + void reconnectSocket(){ + trace("Reconnecting socket ", sock.handle); + disconnect(); + connect(); + trace("Reconnected socket, is now ", sock.handle); + } + + struct HTTP{ + string ver; + ushort code; + string code_name; + const(char)[] content; + + static HTTP parse(const(char)[] buf) + in{ + assert(buf.length>0); + } + body{ + HTTP res; + trace("Parseing "~buf); + auto lines=buf.splitter("\r\n"); + auto status=lines.front; + auto split=status.splitter(" "); + + res.ver=split.front.idup; + split.popFront(); + res.code=split.front.to!ushort; + split.popFront(); + res.code_name=split.joiner.to!string; + + res.content=buf.findSplit("\r\n\r\n")[2]; + trace("Parsed to "~res.to!string); + return res; + } + + void checkCode(){ + if(code!=200){ + throw new TelegramException(format!"Server returned %d\n%s"(code, content)); + } + } + + auto getJSON(){ + import std.json; + + auto j=parseJSON(content); + if(j["ok"].type != JSON_TYPE.TRUE){ + throw new TelegramException("API request failed: "~j["description"].str); + } + return j["result"]; + } + } + + bool updateRunning=false; + + void triggerUpdates(uint timeout=600){ + if(updateRunning){ + warning("Update called in a row without a reply"); + return; + } + updateRunning=true; + trace("Triggering Update with timeout ", timeout); + string params=format!"offset=%d&timeout=%d"(lastUpdate+1,timeout); + post("getUpdates",params); + } + + void delegate(JSONValue jv)[] onMessage; + + bool read(){ + updateRunning=false; + auto updates=response(); + if(updates.isNull){ + triggerUpdates(); + return true; + } + foreach(size_t i, update; updates){ + lastUpdate=max(lastUpdate, update["update_id"].integer); + if("message" in update){ + onMessage.each!(a=>a(update["message"])); + } + } + + return false; + } + + void send(in long chatid, in char[] buf){ + trace("Sending "~buf~" to "~chatid.to!string); + JSONValue j; + j.object=null; + j["chat_id"] = JSONValue(chatid); + j["text"] = JSONValue(buf); + trace("Sending telegram message "~j.toPrettyString); + json("sendMessage", j); + auto res=response(); + if(res.isNull){ + send(chatid,buf); + } + } + + JSONValue getChat(T)(in T chatid) if(isNarrowString!T || is(T==long)){ + JSONValue j; + j.object=null; + j["chat_id"]=JSONValue(chatid); + json("getChat", j); + auto res=response(); + if(res.isNull){ + return getChat(chatid); + } + return res.get; + } + long getChatId(in char[] chatid){ + return getChat(chatid)["id"].integer; + } + string getChatName(long chatid){ + auto j=getChat(chatid); + if("title" in j){ + //We are in a group + return j["title"].str; + } + //We are in a query + return j["username"].str; + } + + string botName; + + string getBotName(){ + http("getMe"); + string _botName=response()["username"].str; + return _botName; + } + + JSONValue getFile(in char[] file_id){ + JSONValue j; + j.object=null; + j["file_id"]=file_id; + json("getFile", j); + auto r=response(); + if(r.isNull){ + return getFile(file_id); + } + j=r.get; + auto v="file_path" in j; + if(v) + return j; + else + throw new TelegramException(cast(string)("Path of file "~file_id~" was not found")); + } +} +