Some improvements to Servatice network code (#3969)

* Some improvements to Servatice network code

1. fix crash on fuzzy connection (tcp server only)
2. ensure websockets are parent()ed to avoid leaking them
3. quick catch disconnect()ed sockets instead of waiting for a socket error to happen
4. supporto mulltiple connection pools on the websocket server; they are still bound to the same thread due to a qt5 limitation.
This commit is contained in:
ctrlaltca 2020-04-24 22:26:59 +02:00 committed by GitHub
parent 46fe0cd725
commit d30691559a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 12 deletions

View file

@ -29,12 +29,12 @@ port=4747
; Set to 0 to disable the tcp server. ; Set to 0 to disable the tcp server.
number_pools=1 number_pools=1
; Servatrice can listen for clients on websockets, too. Unfortunately it can't support more than one thread. ; Servatrice can listen for clients on websockets, too. Multiple connection pools are available but
; unfortunately, due to a Qt limitation, they must run in the same execution thread.
; Set to 0 to disable the websocket server. ; Set to 0 to disable the websocket server.
websocket_number_pools=1 websocket_number_pools=1
; The IP address servatrice will listen on for websockets clients; defaults to "any" ; The IP address servatrice will listen on for websockets clients; defaults to "any"
websocket_host=any websocket_host=any
; The TCP port number servatrice will listen on for websockets clients; default is 4748 ; The TCP port number servatrice will listen on for websockets clients; default is 4748

View file

@ -111,21 +111,30 @@ Servatrice_ConnectionPool *Servatrice_GameServer::findLeastUsedConnectionPool()
#define WEBSOCKET_POOL_NUMBER 999 #define WEBSOCKET_POOL_NUMBER 999
Servatrice_WebsocketGameServer::Servatrice_WebsocketGameServer(Servatrice *_server, Servatrice_WebsocketGameServer::Servatrice_WebsocketGameServer(Servatrice *_server,
int /* _numberPools */, int _numberPools,
const QSqlDatabase &_sqlDatabase, const QSqlDatabase &_sqlDatabase,
QObject *parent) QObject *parent)
: QWebSocketServer("Servatrice", QWebSocketServer::NonSecureMode, parent), server(_server) : QWebSocketServer("Servatrice", QWebSocketServer::NonSecureMode, parent), server(_server)
{ {
// Qt limitation: websockets can't be moved to another thread for (int i = 0; i < _numberPools; ++i) {
auto newDatabaseInterface = new Servatrice_DatabaseInterface(WEBSOCKET_POOL_NUMBER, server); int poolNumber = WEBSOCKET_POOL_NUMBER + i;
auto newPool = new Servatrice_ConnectionPool(newDatabaseInterface); auto newDatabaseInterface = new Servatrice_DatabaseInterface(poolNumber, server);
auto newPool = new Servatrice_ConnectionPool(newDatabaseInterface);
server->addDatabaseInterface(thread(), newDatabaseInterface); auto newThread = new QThread;
newDatabaseInterface->initDatabase(_sqlDatabase); newThread->setObjectName("pool_" + QString::number(poolNumber));
newPool->moveToThread(newThread);
newDatabaseInterface->moveToThread(newThread);
server->addDatabaseInterface(newThread, newDatabaseInterface);
connectionPools.append(newPool); newThread->start();
QMetaObject::invokeMethod(newDatabaseInterface, "initDatabase", Qt::BlockingQueuedConnection,
Q_ARG(QSqlDatabase, _sqlDatabase));
connect(this, SIGNAL(newConnection()), this, SLOT(onNewConnection())); connectionPools.append(newPool);
connect(this, SIGNAL(newConnection()), this, SLOT(onNewConnection()));
}
} }
Servatrice_WebsocketGameServer::~Servatrice_WebsocketGameServer() Servatrice_WebsocketGameServer::~Servatrice_WebsocketGameServer()
@ -143,7 +152,11 @@ void Servatrice_WebsocketGameServer::onNewConnection()
Servatrice_ConnectionPool *pool = findLeastUsedConnectionPool(); Servatrice_ConnectionPool *pool = findLeastUsedConnectionPool();
auto ssi = new WebsocketServerSocketInterface(server, pool->getDatabaseInterface()); auto ssi = new WebsocketServerSocketInterface(server, pool->getDatabaseInterface());
// ssi->moveToThread(pool->thread()); /*
* Due to a Qt limitation, websockets can't be moved to another thread.
* This will hopefully change in Qt6 if QtWebSocket will be integrated in QtNetwork
*/
// ssi->moveToThread(pool->thread());
pool->addClient(); pool->addClient();
connect(ssi, SIGNAL(destroyed()), pool, SLOT(removeClient())); connect(ssi, SIGNAL(destroyed()), pool, SLOT(removeClient()));

View file

@ -122,6 +122,11 @@ void AbstractServerSocketInterface::catchSocketError(QAbstractSocket::SocketErro
prepareDestroy(); prepareDestroy();
} }
void AbstractServerSocketInterface::catchSocketDisconnected()
{
prepareDestroy();
}
void AbstractServerSocketInterface::transmitProtocolItem(const ServerMessage &item) void AbstractServerSocketInterface::transmitProtocolItem(const ServerMessage &item)
{ {
outputQueueMutex.lock(); outputQueueMutex.lock();
@ -1511,6 +1516,7 @@ TcpServerSocketInterface::TcpServerSocketInterface(Servatrice *_server,
connect(socket, SIGNAL(readyRead()), this, SLOT(readClient())); connect(socket, SIGNAL(readyRead()), this, SLOT(readClient()));
connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this,
SLOT(catchSocketError(QAbstractSocket::SocketError))); SLOT(catchSocketError(QAbstractSocket::SocketError)));
connect(socket, SIGNAL(disconnected()), this, SLOT(catchSocketDisconnected()));
} }
TcpServerSocketInterface::~TcpServerSocketInterface() TcpServerSocketInterface::~TcpServerSocketInterface()
@ -1594,7 +1600,7 @@ void TcpServerSocketInterface::readClient()
} else } else
return; return;
} }
if (inputBuffer.size() < messageLength) if (inputBuffer.size() < messageLength || messageLength < 0)
return; return;
CommandContainer newCommandContainer; CommandContainer newCommandContainer;
@ -1679,6 +1685,7 @@ WebsocketServerSocketInterface::~WebsocketServerSocketInterface()
void WebsocketServerSocketInterface::initConnection(void *_socket) void WebsocketServerSocketInterface::initConnection(void *_socket)
{ {
socket = (QWebSocket *)_socket; socket = (QWebSocket *)_socket;
socket->setParent(this);
address = socket->peerAddress(); address = socket->peerAddress();
QByteArray websocketIPHeader = settingsCache->value("server/web_socket_ip_header", "").toByteArray(); QByteArray websocketIPHeader = settingsCache->value("server/web_socket_ip_header", "").toByteArray();
@ -1699,6 +1706,7 @@ void WebsocketServerSocketInterface::initConnection(void *_socket)
SLOT(binaryMessageReceived(const QByteArray &))); SLOT(binaryMessageReceived(const QByteArray &)));
connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this,
SLOT(catchSocketError(QAbstractSocket::SocketError))); SLOT(catchSocketError(QAbstractSocket::SocketError)));
connect(socket, SIGNAL(disconnected()), this, SLOT(catchSocketDisconnected()));
// Add this object to the server's list of connections before it can receive socket events. // Add this object to the server's list of connections before it can receive socket events.
// Otherwise, in case a of a socket error, it could be removed from the list before it is added. // Otherwise, in case a of a socket error, it could be removed from the list before it is added.

View file

@ -59,6 +59,7 @@ class AbstractServerSocketInterface : public Server_ProtocolHandler
Q_OBJECT Q_OBJECT
protected slots: protected slots:
void catchSocketError(QAbstractSocket::SocketError socketError); void catchSocketError(QAbstractSocket::SocketError socketError);
void catchSocketDisconnected();
virtual void flushOutputQueue() = 0; virtual void flushOutputQueue() = 0;
signals: signals:
void outputQueueChanged(); void outputQueueChanged();