From 43d7cf6a1abaf6aad459cbaaf583cc9bda6ce907 Mon Sep 17 00:00:00 2001 From: Max-Wilhelm Bruker Date: Sun, 14 Oct 2012 13:19:03 +0200 Subject: [PATCH] removed recursive lock in server; moved object serialisation to worker thread --- common/server.cpp | 7 ++- common/server.h | 1 + common/server_protocolhandler.cpp | 4 ++ servatrice/src/servatrice.cpp | 1 + servatrice/src/serversocketinterface.cpp | 68 +++++++++++++----------- servatrice/src/serversocketinterface.h | 9 ++-- 6 files changed, 52 insertions(+), 38 deletions(-) diff --git a/common/server.cpp b/common/server.cpp index 32f3c878..06171da8 100644 --- a/common/server.cpp +++ b/common/server.cpp @@ -36,7 +36,7 @@ #include Server::Server(bool _threaded, QObject *parent) - : QObject(parent), threaded(_threaded), clientsLock(QReadWriteLock::Recursive), nextLocalGameId(0) + : QObject(parent), threaded(_threaded), nextLocalGameId(0) { qRegisterMetaType("ServerInfo_Game"); qRegisterMetaType("ServerInfo_Room"); @@ -79,10 +79,9 @@ void Server::prepareDestroy() clientsLock.unlock(); } while (!done); } else { - clientsLock.lockForWrite(); + // no locking is needed in unthreaded mode while (!clients.isEmpty()) clients.first()->prepareDestroy(); - clientsLock.unlock(); } roomsLock.lockForWrite(); @@ -224,7 +223,7 @@ void Server::removeClient(Server_ProtocolHandler *client) void Server::externalUserJoined(const ServerInfo_User &userInfo) { // This function is always called from the main thread via signal/slot. - QWriteLocker locker(&clientsLock); + clientsLock.lockForWrite(); Server_RemoteUserInterface *newUser = new Server_RemoteUserInterface(this, ServerInfo_User_Container(userInfo)); externalUsers.insert(QString::fromStdString(userInfo.name()), newUser); diff --git a/common/server.h b/common/server.h index aeb11bad..8265c7d2 100644 --- a/common/server.h +++ b/common/server.h @@ -42,6 +42,7 @@ public: mutable QReadWriteLock clientsLock, roomsLock; // locking order: roomsLock before clientsLock Server(bool _threaded, QObject *parent = 0); ~Server(); + void setThreaded(bool _threaded) { threaded = _threaded; } AuthenticationResult loginUser(Server_ProtocolHandler *session, QString &name, const QString &password, QString &reason, int &secondsLeft); const QMap &getRooms() { return rooms; } diff --git a/common/server_protocolhandler.cpp b/common/server_protocolhandler.cpp index c4a9e00a..382b2ebc 100644 --- a/common/server_protocolhandler.cpp +++ b/common/server_protocolhandler.cpp @@ -38,6 +38,8 @@ Server_ProtocolHandler::~Server_ProtocolHandler() { } +// This function must only be called from the thread this object lives in. +// The thread must not hold any server locks when calling this (e.g. clientsLock, roomsLock). void Server_ProtocolHandler::prepareDestroy() { if (deleted) @@ -399,6 +401,8 @@ Response::ResponseCode Server_ProtocolHandler::cmdGetGamesOfUser(const Command_G if (authState == NotLoggedIn) return Response::RespLoginNeeded; + // XXX This does not take external users into account. + // XXX Maybe remove this check and test if the result list is empty at the end. server->clientsLock.lockForRead(); if (!server->getUsers().contains(QString::fromStdString(cmd.user_name()))) return Response::RespNameNotFound; diff --git a/servatrice/src/servatrice.cpp b/servatrice/src/servatrice.cpp index ae53f634..2207cf9c 100644 --- a/servatrice/src/servatrice.cpp +++ b/servatrice/src/servatrice.cpp @@ -44,6 +44,7 @@ Servatrice_GameServer::Servatrice_GameServer(Servatrice *_server, int _numberPoo server(_server) { if (_numberPools == 0) { + server->setThreaded(false); Servatrice_DatabaseInterface *newDatabaseInterface = new Servatrice_DatabaseInterface(0, server); Servatrice_ConnectionPool *newPool = new Servatrice_ConnectionPool(newDatabaseInterface); diff --git a/servatrice/src/serversocketinterface.cpp b/servatrice/src/serversocketinterface.cpp index 03060eee..ab64a837 100644 --- a/servatrice/src/serversocketinterface.cpp +++ b/servatrice/src/serversocketinterface.cpp @@ -72,14 +72,14 @@ ServerSocketInterface::ServerSocketInterface(Servatrice *_server, Servatrice_Dat socket->setSocketOption(QAbstractSocket::LowDelayOption, 1); connect(socket, SIGNAL(readyRead()), this, SLOT(readClient())); connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(catchSocketError(QAbstractSocket::SocketError))); - connect(this, SIGNAL(outputBufferChanged()), this, SLOT(flushOutputBuffer()), Qt::QueuedConnection); + connect(this, SIGNAL(outputQueueChanged()), this, SLOT(flushOutputQueue())); } ServerSocketInterface::~ServerSocketInterface() { logger->logMessage("ServerSocketInterface destructor", this); - flushOutputBuffer(); + flushOutputQueue(); } void ServerSocketInterface::initConnection(int socketDescriptor) @@ -93,11 +93,10 @@ void ServerSocketInterface::initSessionDeprecated() { // dirty hack to make v13 client display the correct error message - outputBufferMutex.lock(); - outputBuffer = ""; - outputBufferMutex.unlock(); - - emit outputBufferChanged(); + QByteArray buf; + buf.append(""); + socket->write(buf); + socket->flush(); } bool ServerSocketInterface::initSession() @@ -125,17 +124,6 @@ bool ServerSocketInterface::initSession() return true; } -void ServerSocketInterface::flushOutputBuffer() -{ - QMutexLocker locker(&outputBufferMutex); - if (outputBuffer.isEmpty()) - return; - servatrice->incTxBytes(outputBuffer.size()); - socket->write(outputBuffer); - socket->flush(); - outputBuffer.clear(); -} - void ServerSocketInterface::readClient() { QByteArray data = socket->readAll(); @@ -183,18 +171,38 @@ void ServerSocketInterface::catchSocketError(QAbstractSocket::SocketError socket void ServerSocketInterface::transmitProtocolItem(const ServerMessage &item) { - QByteArray buf; - unsigned int size = item.ByteSize(); - buf.resize(size + 4); - item.SerializeToArray(buf.data() + 4, size); - buf.data()[3] = (unsigned char) size; - buf.data()[2] = (unsigned char) (size >> 8); - buf.data()[1] = (unsigned char) (size >> 16); - buf.data()[0] = (unsigned char) (size >> 24); + outputQueueMutex.lock(); + outputQueue.append(item); + outputQueueMutex.unlock(); - QMutexLocker locker(&outputBufferMutex); - outputBuffer.append(buf); - emit outputBufferChanged(); + emit outputQueueChanged(); +} + +void ServerSocketInterface::flushOutputQueue() +{ + QMutexLocker locker(&outputQueueMutex); + if (outputQueue.isEmpty()) + return; + + int totalBytes = 0; + while (!outputQueue.isEmpty()) { + const ServerMessage &item = outputQueue.first(); + + QByteArray buf; + unsigned int size = item.ByteSize(); + buf.resize(size + 4); + item.SerializeToArray(buf.data() + 4, size); + buf.data()[3] = (unsigned char) size; + buf.data()[2] = (unsigned char) (size >> 8); + buf.data()[1] = (unsigned char) (size >> 16); + buf.data()[0] = (unsigned char) (size >> 24); + socket->write(buf); + + totalBytes += size + 4; + outputQueue.removeFirst(); + } + servatrice->incTxBytes(totalBytes); + socket->flush(); } void ServerSocketInterface::logDebugMessage(const QString &message) @@ -715,8 +723,8 @@ Response::ResponseCode ServerSocketInterface::cmdBanFromServer(const Command_Ban for (int i = 0; i < userList.size(); ++i) { SessionEvent *se = userList[i]->prepareSessionEvent(event); userList[i]->sendProtocolItem(*se); - userList[i]->prepareDestroy(); delete se; + QMetaObject::invokeMethod(userList[i], "prepareDestroy", Qt::QueuedConnection); } } diff --git a/servatrice/src/serversocketinterface.h b/servatrice/src/serversocketinterface.h index 184f9d76..6a2a41f8 100644 --- a/servatrice/src/serversocketinterface.h +++ b/servatrice/src/serversocketinterface.h @@ -54,18 +54,19 @@ class ServerSocketInterface : public Server_ProtocolHandler private slots: void readClient(); void catchSocketError(QAbstractSocket::SocketError socketError); - void flushOutputBuffer(); + void flushOutputQueue(); signals: - void outputBufferChanged(); + void outputQueueChanged(); protected: void logDebugMessage(const QString &message); private: - QMutex outputBufferMutex; + QMutex outputQueueMutex; Servatrice *servatrice; Servatrice_DatabaseInterface *sqlInterface; QTcpSocket *socket; - QByteArray inputBuffer, outputBuffer; + QByteArray inputBuffer; + QList outputQueue; bool messageInProgress; bool handshakeStarted; int messageLength;