added send buffer to limit socket operations to one thread

This commit is contained in:
Max-Wilhelm Bruker 2011-03-22 20:45:18 +01:00
parent 81a5d58d70
commit 45890b836b
5 changed files with 40 additions and 7 deletions

View file

@ -4,6 +4,7 @@
#include <QObject> #include <QObject>
#include <QStringList> #include <QStringList>
#include <QMap> #include <QMap>
#include <QMutex>
class Server_Game; class Server_Game;
class Server_Room; class Server_Room;
@ -22,6 +23,7 @@ private slots:
void gameClosing(int gameId); void gameClosing(int gameId);
void broadcastRoomUpdate(); void broadcastRoomUpdate();
public: public:
mutable QMutex serverMutex;
Server(QObject *parent = 0); Server(QObject *parent = 0);
~Server(); ~Server();
AuthenticationResult loginUser(Server_ProtocolHandler *session, QString &name, const QString &password); AuthenticationResult loginUser(Server_ProtocolHandler *session, QString &name, const QString &password);

View file

@ -17,6 +17,7 @@ Server_ProtocolHandler::Server_ProtocolHandler(Server *_server, QObject *parent)
: QObject(parent), server(_server), authState(PasswordWrong), acceptsUserListChanges(false), acceptsRoomListChanges(false), userInfo(0), timeRunning(0), lastDataReceived(0) : QObject(parent), server(_server), authState(PasswordWrong), acceptsUserListChanges(false), acceptsRoomListChanges(false), userInfo(0), timeRunning(0), lastDataReceived(0)
{ {
connect(server, SIGNAL(pingClockTimeout()), this, SLOT(pingClockTimeout())); connect(server, SIGNAL(pingClockTimeout()), this, SLOT(pingClockTimeout()));
connect(this, SIGNAL(sigGameCreated(Server_Game *)), this, SLOT(processSigGameCreated(Server_Game *)), Qt::QueuedConnection);
} }
Server_ProtocolHandler::~Server_ProtocolHandler() Server_ProtocolHandler::~Server_ProtocolHandler()
@ -430,6 +431,13 @@ ResponseCode Server_ProtocolHandler::cmdCreateGame(Command_CreateGame *cmd, Comm
void Server_ProtocolHandler::gameCreated(Server_Game *game) void Server_ProtocolHandler::gameCreated(Server_Game *game)
{ {
emit sigGameCreated(game);
}
void Server_ProtocolHandler::processSigGameCreated(Server_Game *game)
{
QMutexLocker locker(&game->gameMutex);
Server_Player *creator = game->getPlayers().values().first(); Server_Player *creator = game->getPlayers().values().first();
games.insert(game->getGameId(), QPair<Server_Game *, Server_Player *>(game, creator)); games.insert(game->getGameId(), QPair<Server_Game *, Server_Player *>(game, creator));

View file

@ -91,7 +91,11 @@ private:
ResponseCode processCommandHelper(Command *command, CommandContainer *cont); ResponseCode processCommandHelper(Command *command, CommandContainer *cont);
private slots: private slots:
void pingClockTimeout(); void pingClockTimeout();
void processSigGameCreated(Server_Game *game);
signals:
void sigGameCreated(Server_Game *game);
public: public:
mutable QMutex protocolHandlerMutex;
Server_ProtocolHandler(Server *_server, QObject *parent = 0); Server_ProtocolHandler(Server *_server, QObject *parent = 0);
~Server_ProtocolHandler(); ~Server_ProtocolHandler();
void playerRemovedFromGame(Server_Game *game); void playerRemovedFromGame(Server_Game *game);

View file

@ -35,18 +35,18 @@
ServerSocketInterface::ServerSocketInterface(Servatrice *_server, QTcpSocket *_socket, QObject *parent) ServerSocketInterface::ServerSocketInterface(Servatrice *_server, QTcpSocket *_socket, QObject *parent)
: Server_ProtocolHandler(_server, parent), servatrice(_server), socket(_socket), topLevelItem(0) : Server_ProtocolHandler(_server, parent), servatrice(_server), socket(_socket), topLevelItem(0)
{ {
xmlWriter = new QXmlStreamWriter; xmlWriter = new QXmlStreamWriter(&xmlBuffer);
xmlWriter->setDevice(socket);
xmlReader = new QXmlStreamReader; xmlReader = new QXmlStreamReader;
connect(socket, SIGNAL(readyRead()), this, SLOT(readClient())); connect(socket, SIGNAL(readyRead()), this, SLOT(readClient()));
connect(socket, SIGNAL(disconnected()), this, SLOT(deleteLater())); connect(socket, SIGNAL(disconnected()), this, SLOT(deleteLater()));
connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(catchSocketError(QAbstractSocket::SocketError))); connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(catchSocketError(QAbstractSocket::SocketError)));
connect(this, SIGNAL(xmlBufferChanged()), this, SLOT(flushXmlBuffer()), Qt::QueuedConnection);
xmlWriter->writeStartDocument(); xmlWriter->writeStartDocument();
xmlWriter->writeStartElement("cockatrice_server_stream"); xmlWriter->writeStartElement("cockatrice_server_stream");
xmlWriter->writeAttribute("version", QString::number(ProtocolItem::protocolVersion)); xmlWriter->writeAttribute("version", QString::number(ProtocolItem::protocolVersion));
flushXmlBuffer();
int maxUsers = _server->getMaxUsersPerAddress(); int maxUsers = _server->getMaxUsersPerAddress();
if ((maxUsers > 0) && (_server->getUsersWithAddress(socket->peerAddress()) >= maxUsers)) { if ((maxUsers > 0) && (_server->getUsersWithAddress(socket->peerAddress()) >= maxUsers)) {
@ -58,6 +58,8 @@ ServerSocketInterface::ServerSocketInterface(Servatrice *_server, QTcpSocket *_s
ServerSocketInterface::~ServerSocketInterface() ServerSocketInterface::~ServerSocketInterface()
{ {
QMutexLocker locker(&protocolHandlerMutex);
logger->logMessage("ServerSocketInterface destructor"); logger->logMessage("ServerSocketInterface destructor");
socket->flush(); socket->flush();
@ -75,8 +77,18 @@ void ServerSocketInterface::processProtocolItem(ProtocolItem *item)
processCommandContainer(cont); processCommandContainer(cont);
} }
void ServerSocketInterface::flushXmlBuffer()
{
QMutexLocker locker(&xmlBufferMutex);
socket->write(xmlBuffer.toUtf8());
socket->flush();
xmlBuffer.clear();
}
void ServerSocketInterface::readClient() void ServerSocketInterface::readClient()
{ {
QMutexLocker locker(&protocolHandlerMutex);
QByteArray data = socket->readAll(); QByteArray data = socket->readAll();
logger->logMessage(QString(data)); logger->logMessage(QString(data));
xmlReader->addData(data); xmlReader->addData(data);
@ -101,13 +113,13 @@ void ServerSocketInterface::catchSocketError(QAbstractSocket::SocketError socket
void ServerSocketInterface::sendProtocolItem(ProtocolItem *item, bool deleteItem) void ServerSocketInterface::sendProtocolItem(ProtocolItem *item, bool deleteItem)
{ {
static QMutex mutex; QMutexLocker locker(&xmlBufferMutex);
mutex.lock();
item->write(xmlWriter); item->write(xmlWriter);
socket->flush();
mutex.unlock();
if (deleteItem) if (deleteItem)
delete item; delete item;
emit xmlBufferChanged();
} }
int ServerSocketInterface::getUserIdInDB(const QString &name) const int ServerSocketInterface::getUserIdInDB(const QString &name) const

View file

@ -22,6 +22,7 @@
#include <QTcpSocket> #include <QTcpSocket>
#include <QHostAddress> #include <QHostAddress>
#include <QMutex>
#include "server_protocolhandler.h" #include "server_protocolhandler.h"
class QTcpSocket; class QTcpSocket;
@ -30,6 +31,7 @@ class QXmlStreamReader;
class QXmlStreamWriter; class QXmlStreamWriter;
class DeckList; class DeckList;
class TopLevelProtocolItem; class TopLevelProtocolItem;
class QByteArray;
class ServerSocketInterface : public Server_ProtocolHandler class ServerSocketInterface : public Server_ProtocolHandler
{ {
@ -38,11 +40,16 @@ private slots:
void readClient(); void readClient();
void catchSocketError(QAbstractSocket::SocketError socketError); void catchSocketError(QAbstractSocket::SocketError socketError);
void processProtocolItem(ProtocolItem *item); void processProtocolItem(ProtocolItem *item);
void flushXmlBuffer();
signals:
void xmlBufferChanged();
private: private:
QMutex xmlBufferMutex;
Servatrice *servatrice; Servatrice *servatrice;
QTcpSocket *socket; QTcpSocket *socket;
QXmlStreamWriter *xmlWriter; QXmlStreamWriter *xmlWriter;
QXmlStreamReader *xmlReader; QXmlStreamReader *xmlReader;
QString xmlBuffer;
TopLevelProtocolItem *topLevelItem; TopLevelProtocolItem *topLevelItem;
int getUserIdInDB(const QString &name) const; int getUserIdInDB(const QString &name) const;