removed recursive lock in server; moved object serialisation to worker thread

This commit is contained in:
Max-Wilhelm Bruker 2012-10-14 13:19:03 +02:00
parent e7fc3b59a7
commit 43d7cf6a1a
6 changed files with 52 additions and 38 deletions

View file

@ -36,7 +36,7 @@
#include <QDebug> #include <QDebug>
Server::Server(bool _threaded, QObject *parent) Server::Server(bool _threaded, QObject *parent)
: QObject(parent), threaded(_threaded), clientsLock(QReadWriteLock::Recursive), nextLocalGameId(0) : QObject(parent), threaded(_threaded), nextLocalGameId(0)
{ {
qRegisterMetaType<ServerInfo_Game>("ServerInfo_Game"); qRegisterMetaType<ServerInfo_Game>("ServerInfo_Game");
qRegisterMetaType<ServerInfo_Room>("ServerInfo_Room"); qRegisterMetaType<ServerInfo_Room>("ServerInfo_Room");
@ -79,10 +79,9 @@ void Server::prepareDestroy()
clientsLock.unlock(); clientsLock.unlock();
} while (!done); } while (!done);
} else { } else {
clientsLock.lockForWrite(); // no locking is needed in unthreaded mode
while (!clients.isEmpty()) while (!clients.isEmpty())
clients.first()->prepareDestroy(); clients.first()->prepareDestroy();
clientsLock.unlock();
} }
roomsLock.lockForWrite(); roomsLock.lockForWrite();
@ -224,7 +223,7 @@ void Server::removeClient(Server_ProtocolHandler *client)
void Server::externalUserJoined(const ServerInfo_User &userInfo) void Server::externalUserJoined(const ServerInfo_User &userInfo)
{ {
// This function is always called from the main thread via signal/slot. // This function is always called from the main thread via signal/slot.
QWriteLocker locker(&clientsLock); clientsLock.lockForWrite();
Server_RemoteUserInterface *newUser = new Server_RemoteUserInterface(this, ServerInfo_User_Container(userInfo)); Server_RemoteUserInterface *newUser = new Server_RemoteUserInterface(this, ServerInfo_User_Container(userInfo));
externalUsers.insert(QString::fromStdString(userInfo.name()), newUser); externalUsers.insert(QString::fromStdString(userInfo.name()), newUser);

View file

@ -42,6 +42,7 @@ public:
mutable QReadWriteLock clientsLock, roomsLock; // locking order: roomsLock before clientsLock mutable QReadWriteLock clientsLock, roomsLock; // locking order: roomsLock before clientsLock
Server(bool _threaded, QObject *parent = 0); Server(bool _threaded, QObject *parent = 0);
~Server(); ~Server();
void setThreaded(bool _threaded) { threaded = _threaded; }
AuthenticationResult loginUser(Server_ProtocolHandler *session, QString &name, const QString &password, QString &reason, int &secondsLeft); AuthenticationResult loginUser(Server_ProtocolHandler *session, QString &name, const QString &password, QString &reason, int &secondsLeft);
const QMap<int, Server_Room *> &getRooms() { return rooms; } const QMap<int, Server_Room *> &getRooms() { return rooms; }

View file

@ -38,6 +38,8 @@ Server_ProtocolHandler::~Server_ProtocolHandler()
{ {
} }
// This function must only be called from the thread this object lives in.
// The thread must not hold any server locks when calling this (e.g. clientsLock, roomsLock).
void Server_ProtocolHandler::prepareDestroy() void Server_ProtocolHandler::prepareDestroy()
{ {
if (deleted) if (deleted)
@ -399,6 +401,8 @@ Response::ResponseCode Server_ProtocolHandler::cmdGetGamesOfUser(const Command_G
if (authState == NotLoggedIn) if (authState == NotLoggedIn)
return Response::RespLoginNeeded; return Response::RespLoginNeeded;
// XXX This does not take external users into account.
// XXX Maybe remove this check and test if the result list is empty at the end.
server->clientsLock.lockForRead(); server->clientsLock.lockForRead();
if (!server->getUsers().contains(QString::fromStdString(cmd.user_name()))) if (!server->getUsers().contains(QString::fromStdString(cmd.user_name())))
return Response::RespNameNotFound; return Response::RespNameNotFound;

View file

@ -44,6 +44,7 @@ Servatrice_GameServer::Servatrice_GameServer(Servatrice *_server, int _numberPoo
server(_server) server(_server)
{ {
if (_numberPools == 0) { if (_numberPools == 0) {
server->setThreaded(false);
Servatrice_DatabaseInterface *newDatabaseInterface = new Servatrice_DatabaseInterface(0, server); Servatrice_DatabaseInterface *newDatabaseInterface = new Servatrice_DatabaseInterface(0, server);
Servatrice_ConnectionPool *newPool = new Servatrice_ConnectionPool(newDatabaseInterface); Servatrice_ConnectionPool *newPool = new Servatrice_ConnectionPool(newDatabaseInterface);

View file

@ -72,14 +72,14 @@ ServerSocketInterface::ServerSocketInterface(Servatrice *_server, Servatrice_Dat
socket->setSocketOption(QAbstractSocket::LowDelayOption, 1); socket->setSocketOption(QAbstractSocket::LowDelayOption, 1);
connect(socket, SIGNAL(readyRead()), this, SLOT(readClient())); connect(socket, SIGNAL(readyRead()), this, SLOT(readClient()));
connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(catchSocketError(QAbstractSocket::SocketError))); connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(catchSocketError(QAbstractSocket::SocketError)));
connect(this, SIGNAL(outputBufferChanged()), this, SLOT(flushOutputBuffer()), Qt::QueuedConnection); connect(this, SIGNAL(outputQueueChanged()), this, SLOT(flushOutputQueue()));
} }
ServerSocketInterface::~ServerSocketInterface() ServerSocketInterface::~ServerSocketInterface()
{ {
logger->logMessage("ServerSocketInterface destructor", this); logger->logMessage("ServerSocketInterface destructor", this);
flushOutputBuffer(); flushOutputQueue();
} }
void ServerSocketInterface::initConnection(int socketDescriptor) void ServerSocketInterface::initConnection(int socketDescriptor)
@ -93,11 +93,10 @@ void ServerSocketInterface::initSessionDeprecated()
{ {
// dirty hack to make v13 client display the correct error message // dirty hack to make v13 client display the correct error message
outputBufferMutex.lock(); QByteArray buf;
outputBuffer = "<?xml version=\"1.0\"?><cockatrice_server_stream version=\"14\">"; buf.append("<?xml version=\"1.0\"?><cockatrice_server_stream version=\"14\">");
outputBufferMutex.unlock(); socket->write(buf);
socket->flush();
emit outputBufferChanged();
} }
bool ServerSocketInterface::initSession() bool ServerSocketInterface::initSession()
@ -125,17 +124,6 @@ bool ServerSocketInterface::initSession()
return true; return true;
} }
void ServerSocketInterface::flushOutputBuffer()
{
QMutexLocker locker(&outputBufferMutex);
if (outputBuffer.isEmpty())
return;
servatrice->incTxBytes(outputBuffer.size());
socket->write(outputBuffer);
socket->flush();
outputBuffer.clear();
}
void ServerSocketInterface::readClient() void ServerSocketInterface::readClient()
{ {
QByteArray data = socket->readAll(); QByteArray data = socket->readAll();
@ -183,18 +171,38 @@ void ServerSocketInterface::catchSocketError(QAbstractSocket::SocketError socket
void ServerSocketInterface::transmitProtocolItem(const ServerMessage &item) void ServerSocketInterface::transmitProtocolItem(const ServerMessage &item)
{ {
QByteArray buf; outputQueueMutex.lock();
unsigned int size = item.ByteSize(); outputQueue.append(item);
buf.resize(size + 4); outputQueueMutex.unlock();
item.SerializeToArray(buf.data() + 4, size);
buf.data()[3] = (unsigned char) size;
buf.data()[2] = (unsigned char) (size >> 8);
buf.data()[1] = (unsigned char) (size >> 16);
buf.data()[0] = (unsigned char) (size >> 24);
QMutexLocker locker(&outputBufferMutex); emit outputQueueChanged();
outputBuffer.append(buf); }
emit outputBufferChanged();
void ServerSocketInterface::flushOutputQueue()
{
QMutexLocker locker(&outputQueueMutex);
if (outputQueue.isEmpty())
return;
int totalBytes = 0;
while (!outputQueue.isEmpty()) {
const ServerMessage &item = outputQueue.first();
QByteArray buf;
unsigned int size = item.ByteSize();
buf.resize(size + 4);
item.SerializeToArray(buf.data() + 4, size);
buf.data()[3] = (unsigned char) size;
buf.data()[2] = (unsigned char) (size >> 8);
buf.data()[1] = (unsigned char) (size >> 16);
buf.data()[0] = (unsigned char) (size >> 24);
socket->write(buf);
totalBytes += size + 4;
outputQueue.removeFirst();
}
servatrice->incTxBytes(totalBytes);
socket->flush();
} }
void ServerSocketInterface::logDebugMessage(const QString &message) void ServerSocketInterface::logDebugMessage(const QString &message)
@ -715,8 +723,8 @@ Response::ResponseCode ServerSocketInterface::cmdBanFromServer(const Command_Ban
for (int i = 0; i < userList.size(); ++i) { for (int i = 0; i < userList.size(); ++i) {
SessionEvent *se = userList[i]->prepareSessionEvent(event); SessionEvent *se = userList[i]->prepareSessionEvent(event);
userList[i]->sendProtocolItem(*se); userList[i]->sendProtocolItem(*se);
userList[i]->prepareDestroy();
delete se; delete se;
QMetaObject::invokeMethod(userList[i], "prepareDestroy", Qt::QueuedConnection);
} }
} }

View file

@ -54,18 +54,19 @@ class ServerSocketInterface : public Server_ProtocolHandler
private slots: private slots:
void readClient(); void readClient();
void catchSocketError(QAbstractSocket::SocketError socketError); void catchSocketError(QAbstractSocket::SocketError socketError);
void flushOutputBuffer(); void flushOutputQueue();
signals: signals:
void outputBufferChanged(); void outputQueueChanged();
protected: protected:
void logDebugMessage(const QString &message); void logDebugMessage(const QString &message);
private: private:
QMutex outputBufferMutex; QMutex outputQueueMutex;
Servatrice *servatrice; Servatrice *servatrice;
Servatrice_DatabaseInterface *sqlInterface; Servatrice_DatabaseInterface *sqlInterface;
QTcpSocket *socket; QTcpSocket *socket;
QByteArray inputBuffer, outputBuffer; QByteArray inputBuffer;
QList<ServerMessage> outputQueue;
bool messageInProgress; bool messageInProgress;
bool handshakeStarted; bool handshakeStarted;
int messageLength; int messageLength;