commit ce38c2cefad48b5f528eca41f23cbf6ae5cf3cc7
parent f57c01763e414adf8363424d1253a2deb95cc34c
Author: Dominik Schmidt <das1993@hotmail.com>
Date: Fri, 31 Aug 2018 20:17:39 +0200
Further work on generic interfaces.
Diffstat:
6 files changed, 70 insertions(+), 22 deletions(-)
diff --git a/src/bastlibridge/base.d b/src/bastlibridge/base.d
@@ -6,7 +6,9 @@ import std.exception;
import std.concurrency;
import std.format;
import std.algorithm;
+import std.typecons;
import std.array;
+import core.sync.mutex;
import std.experimental.logger;
@@ -16,6 +18,7 @@ abstract class Endpoint{
package Manager manager;
package string name;
package string cmdline;
+ protected Mutex mtx;
protected SysTime lastUpdate;
protected void heartbeat(){
@@ -30,6 +33,7 @@ abstract class Endpoint{
this(Manager m, string args){
manager=m;
heartbeat();
+ mtx=new Mutex();
}
abstract void open();
@@ -106,6 +110,26 @@ abstract class Endpoint{
mixin(logHook!"fatal");
}
+abstract class QueuedEndpoint : Endpoint{
+ Tuple!(Message,Channel)[] msgqueue;
+ this(Manager m, string args){
+ super(m,args);
+ }
+ final override void sendMessage(Message m, Channel c){
+ if(mtx.tryLock()){
+ sendMessageQueueless(m,c);
+ mtx.unlock();
+ }
+ else{
+ msgqueue~=tuple(m,c);
+ }
+ }
+ void sendQueue(){
+ msgqueue.each!(a=>sendMessageQueueless(a[0],a[1]));
+ }
+ abstract void sendMessageQueueless(Message m, Channel c);
+}
+
abstract class Channel{
package string _name;
}
diff --git a/src/bastlibridge/bot.d b/src/bastlibridge/bot.d
@@ -109,7 +109,7 @@ static this(){
static this(){
- //endpointTypes.add!Telegram("telegram");
+ endpointTypes.add!Telegram("telegram");
endpointTypes.add!IRC("irc");
endpointTypes.add!Mail("mail");
}
diff --git a/src/bastlibridge/interfaces/irc.d b/src/bastlibridge/interfaces/irc.d
@@ -1,4 +1,5 @@
module bastlibridge.interfaces.irc;
+import bastlibridge.util;
import bastlibridge.base;
import bastlibridge.manager;
import bastlibridge.command;
@@ -16,15 +17,7 @@ import std.array;
import std.datetime : SysTime,Clock;
import std.experimental.logger;
-
-void waitForData(Socket sock) @trusted{
- version(Windows){
- static assert(false);
- }
- import core.sys.posix.sys.socket;
- ubyte[4] buf;
- recv(sock.handle, buf.ptr, buf.length, MSG_PEEK);
-}
+alias waitForData=wait;
final class IRCMessage : Message{
import irc.client;
@@ -76,7 +69,7 @@ final class IRCChannel:Channel{
}
-final class IRC : Endpoint{
+final class IRC : QueuedEndpoint{
import ssl.socket;
import irc.url;
import irc.client;
@@ -288,7 +281,7 @@ final class IRC : Endpoint{
ircClient.part(ic._name);
}
- override void sendMessage(scope Message m, Channel chan){
+ override void sendMessageQueueless(scope Message m, Channel chan){
auto ichan=cast(IRCChannel)chan;
trace("Delivering message of type ",typeid(m)," to channel", ichan._name);
if(cast(TelegramMessage)m){
@@ -304,9 +297,7 @@ final class IRC : Endpoint{
}
protected void send(T)(in char[] channel, T text){
- synchronized(ircClient){
- ircClient.send(channel, text);
- }
+ ircClient.send(channel, text);
}
override void run(){
@@ -319,17 +310,18 @@ final class IRC : Endpoint{
info("IRC message received");
if(shutdown)
break;
- synchronized(ircClient){
+ synchronized(mtx){
if(ircClient.read()){
break;
}
+ sendQueue();
}
}
}
override void stop(){
- synchronized(this)
+ synchronized(mtx){
shutdown=true;
- synchronized(ircClient)
ircClient.quit("Planned shutdown");
+ }
}
}
diff --git a/src/bastlibridge/interfaces/telegram.d b/src/bastlibridge/interfaces/telegram.d
@@ -9,7 +9,7 @@ import std.datetime : SysTime, Clock;
import std.json;
import std.algorithm;
-class Telegram: Endpoint{
+class Telegram: QueuedEndpoint{
private tg.Telegram telegram, telegram_listener;
Mutex telegram_lock, listener_lock;
@@ -84,15 +84,23 @@ class Telegram: Endpoint{
}
- override void sendMessage(Message m, Channel c){
+ override void sendMessageQueueless(Message m, Channel c){
send((cast(TelegramChannel)c).id, "<"~m.userName()~"> "~m.getMessage());
}
private __gshared bool shutdown=false;
override void run(){
+ import ssl.openssl;
+ loadOpenSSL(); //We have to do this once for every thread
while(!shutdown){
- telegram_listener.triggerUpdates();
+ synchronized(mtx)
+ telegram_listener.triggerUpdates();
+ telegram_listener.wait();
+ synchronized(mtx){
+ telegram_listener.read();
+ sendQueue();
+ }
}
}
diff --git a/src/bastlibridge/util.d b/src/bastlibridge/util.d
@@ -1,5 +1,6 @@
module bastlibridge.util;
import std.exception;
+import std.socket;
import std.range;
void tryNTimes(alias func, alias errhandle)(uint N){
@@ -27,3 +28,12 @@ unittest{
assertThrown(tryNTimes!(func, (e)=>assert(e.msg=="derp"))(5));
assert(i==5);
}
+
+void wait(Socket sock) @trusted{
+ version(Windows){
+ static assert(false);
+ }
+ import core.sys.posix.sys.socket;
+ ubyte[4] buf;
+ recv(sock.handle, buf.ptr, buf.length, MSG_PEEK);
+}
diff --git a/src/telegram/telegram.d b/src/telegram/telegram.d
@@ -17,6 +17,16 @@ import std.json;
import std.exception;
import std.datetime :SysTime,Clock;
+
+void wait(Socket sock) @trusted{
+ version(Windows){
+ static assert(false);
+ }
+ import core.sys.posix.sys.socket;
+ ubyte[4] buf;
+ recv(sock.handle, buf.ptr, buf.length, MSG_PEEK);
+}
+
class TelegramErrnoException : ErrnoException{
this(string msg, string file=__FILE__, size_t line=__LINE__){
super(msg,file,line);
@@ -96,6 +106,10 @@ struct Telegram{
}
}
+ auto wait(){
+ return sock.wait();
+ }
+
void getApiAddress(){
ApiAddrObj=getAddress(ApiAddr,ApiPort).front;
}
@@ -104,7 +118,7 @@ struct Telegram{
void disconnect(){
sock.shutdown(SocketShutdown.BOTH);
sock.close();
- delete sock;
+ sock.destroy();
}
auto getURI(in char[] method){