Commit 520300dc authored by Max-Wilhelm Bruker's avatar Max-Wilhelm Bruker
Browse files

made server threaded, fixing issue #51

parent b328c1ed
......@@ -56,11 +56,21 @@ Server::~Server()
void Server::prepareDestroy()
{
clientsLock.lockForWrite();
while (!clients.isEmpty())
clients.first()->prepareDestroy();
clientsLock.lockForRead();
for (int i = 0; i < clients.size(); ++i)
QMetaObject::invokeMethod(clients.at(i), "prepareDestroy", Qt::QueuedConnection);
clientsLock.unlock();
// dirty :(
bool done = false;
do {
usleep(10000);
clientsLock.lockForRead();
if (clients.isEmpty())
done = true;
clientsLock.unlock();
} while (!done);
roomsLock.lockForWrite();
QMapIterator<int, Server_Room *> roomIterator(rooms);
while (roomIterator.hasNext())
......
......@@ -23,6 +23,7 @@
Server_ProtocolHandler::Server_ProtocolHandler(Server *_server, Server_DatabaseInterface *_databaseInterface, QObject *parent)
: QObject(parent),
Server_AbstractUserInterface(_server),
deleted(false),
databaseInterface(_databaseInterface),
authState(NotLoggedIn),
acceptsUserListChanges(false),
......@@ -39,7 +40,9 @@ Server_ProtocolHandler::~Server_ProtocolHandler()
void Server_ProtocolHandler::prepareDestroy()
{
qDebug("Server_ProtocolHandler::prepareDestroy");
if (deleted)
return;
deleted = true;
QMapIterator<int, Server_Room *> roomIterator(rooms);
while (roomIterator.hasNext())
......
......@@ -43,7 +43,8 @@ class Server_ProtocolHandler : public QObject, public Server_AbstractUserInterfa
Q_OBJECT
protected:
QMap<int, Server_Room *> rooms;
bool deleted;
Server_DatabaseInterface *databaseInterface;
AuthenticationResult authState;
bool acceptsUserListChanges;
......
......@@ -6,6 +6,7 @@ SET(servatrice_SOURCES
src/main.cpp
src/passwordhasher.cpp
src/servatrice.cpp
src/servatrice_connection_pool.cpp
src/servatrice_database_interface.cpp
src/server_logger.cpp
src/serversocketinterface.cpp
......
......@@ -4,7 +4,13 @@ statusupdate=15000
logfile=server.log
name="My Cockatrice server"
id=1
threaded=0
number_pools=1
[servernetwork]
active=0
port=14747
ssl_cert=ssl_cert.pem
ssl_key=ssl_key.pem
[authentication]
method=none
......
......@@ -17,8 +17,11 @@
* Free Software Foundation, Inc., *
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
***************************************************************************/
#include <QtSql>
#include <QSqlQuery>
#include <QSettings>
#include <QFile>
#include <QTimer>
#include <QDateTime>
#include <QDebug>
#include <iostream>
#include "servatrice.h"
......@@ -36,45 +39,59 @@
#include "pb/event_server_shutdown.pb.h"
#include "pb/event_connection_closed.pb.h"
Servatrice_GameServer::Servatrice_GameServer(Servatrice *_server, bool _threaded, int _numberPools, const QSqlDatabase &_sqlDatabase, QObject *parent)
Servatrice_GameServer::Servatrice_GameServer(Servatrice *_server, int _numberPools, const QSqlDatabase &_sqlDatabase, QObject *parent)
: QTcpServer(parent),
server(_server),
threaded(_threaded)
server(_server)
{
for (int i = 0; i < _numberPools; ++i) {
Servatrice_DatabaseInterface *newDatabaseInterface = new Servatrice_DatabaseInterface(i, server);
Servatrice_ConnectionPool *newPool = new Servatrice_ConnectionPool(newDatabaseInterface);
// ---
newDatabaseInterface->initDatabase(_sqlDatabase);
// ---
QThread *newThread = new QThread;
newPool->moveToThread(newThread);
newDatabaseInterface->moveToThread(newThread);
server->addDatabaseInterface(newThread, newDatabaseInterface);
newThread->start();
QMetaObject::invokeMethod(newDatabaseInterface, "initDatabase", Qt::BlockingQueuedConnection, Q_ARG(QSqlDatabase, _sqlDatabase));
connectionPools.append(newPool);
}
}
Servatrice_GameServer::~Servatrice_GameServer()
{
for (int i = 0; i < connectionPools.size(); ++i) {
logger->logMessage(QString("Closing pool %1...").arg(i));
QThread *poolThread = connectionPools[i]->thread();
connectionPools[i]->deleteLater(); // pool destructor calls thread()->quit()
poolThread->wait();
}
}
void Servatrice_GameServer::incomingConnection(int socketDescriptor)
{
// Determine connection pool with smallest client count
int minClientCount = -1;
int poolIndex = -1;
QStringList debugStr;
for (int i = 0; i < connectionPools.size(); ++i) {
const int clientCount = connectionPools[i]->getClientCount();
if ((poolIndex == -1) || (clientCount < minClientCount)) {
minClientCount = clientCount;
poolIndex = i;
}
debugStr.append(QString::number(clientCount));
}
qDebug() << "Pool utilisation:" << debugStr;
Servatrice_ConnectionPool *pool = connectionPools[poolIndex];
QTcpSocket *socket = new QTcpSocket;
ServerSocketInterface *ssi = new ServerSocketInterface(server, pool->getDatabaseInterface(), socket);
ServerSocketInterface *ssi = new ServerSocketInterface(server, pool->getDatabaseInterface());
ssi->moveToThread(pool->thread());
pool->addClient();
connect(ssi, SIGNAL(destroyed()), pool, SLOT(removeClient()));
socket->setSocketDescriptor(socketDescriptor);
socket->setSocketOption(QAbstractSocket::LowDelayOption, 1);
logger->logMessage(QString("[pool %1] Incoming connection: %2").arg(poolIndex).arg(socket->peerAddress().toString()), ssi);
ssi->initSessionDeprecated();
QMetaObject::invokeMethod(ssi, "initConnection", Qt::QueuedConnection, Q_ARG(int, socketDescriptor));
}
void Servatrice_IslServer::incomingConnection(int socketDescriptor)
......@@ -97,8 +114,8 @@ Servatrice::Servatrice(QSettings *_settings, QObject *parent)
Servatrice::~Servatrice()
{
gameServer->close();
prepareDestroy();
QSqlDatabase::database().close();
}
bool Servatrice::initServer()
......@@ -261,9 +278,8 @@ bool Servatrice::initServer()
statusUpdateClock->start(statusUpdateTime);
}
threaded = settings->value("server/threaded", false).toInt();
const int numberPools = settings->value("server/number_pools", 1).toInt();
gameServer = new Servatrice_GameServer(this, threaded, numberPools, servatriceDatabaseInterface->getDatabase(), this);
gameServer = new Servatrice_GameServer(this, numberPools, servatriceDatabaseInterface->getDatabase(), this);
const int gamePort = settings->value("server/port", 4747).toInt();
qDebug() << "Starting server on port" << gamePort;
if (gameServer->listen(QHostAddress::Any, gamePort))
......@@ -275,6 +291,11 @@ bool Servatrice::initServer()
return true;
}
void Servatrice::addDatabaseInterface(QThread *thread, Servatrice_DatabaseInterface *databaseInterface)
{
databaseInterfaces.insert(thread, databaseInterface);
}
void Servatrice::updateServerList()
{
qDebug() << "Updating server list...";
......
......@@ -44,10 +44,10 @@ class Servatrice_GameServer : public QTcpServer {
Q_OBJECT
private:
Servatrice *server;
bool threaded;
QList<Servatrice_ConnectionPool *> connectionPools;
public:
Servatrice_GameServer(Servatrice *_server, bool _threaded, int _numberPools, const QSqlDatabase &_sqlDatabase, QObject *parent = 0);
Servatrice_GameServer(Servatrice *_server, int _numberPools, const QSqlDatabase &_sqlDatabase, QObject *parent = 0);
~Servatrice_GameServer();
protected:
void incomingConnection(int socketDescriptor);
};
......@@ -102,7 +102,6 @@ private:
QSettings *settings;
Servatrice_DatabaseInterface *servatriceDatabaseInterface;
int serverId;
bool threaded;
int uptime;
QMutex txBytesMutex, rxBytesMutex;
quint64 txBytes, rxBytes;
......@@ -135,7 +134,6 @@ public:
int getMaxMessageSizePerInterval() const { return maxMessageSizePerInterval; }
int getMaxGamesPerUser() const { return maxGamesPerUser; }
AuthenticationMethod getAuthenticationMethod() const { return authenticationMethod; }
bool getThreaded() const { return threaded; }
QString getDbPrefix() const { return dbPrefix; }
int getServerId() const { return serverId; }
void updateLoginMessage();
......@@ -144,6 +142,7 @@ public:
void incTxBytes(quint64 num);
void incRxBytes(quint64 num);
void storeGameInformation(int secondsElapsed, const QSet<QString> &allPlayersEver, const QSet<QString> &allSpectatorsEver, const QList<GameReplay *> &replays);
void addDatabaseInterface(QThread *thread, Servatrice_DatabaseInterface *databaseInterface);
bool islConnectionExists(int serverId) const;
void addIslInterface(int serverId, IslInterface *interface);
......
#include "servatrice_connection_pool.h"
#include "servatrice_database_interface.h"
#include <QThread>
Servatrice_ConnectionPool::Servatrice_ConnectionPool(Servatrice_DatabaseInterface *_databaseInterface)
: databaseInterface(_databaseInterface),
clientCount(0)
{
}
Servatrice_ConnectionPool::~Servatrice_ConnectionPool()
{
delete databaseInterface;
thread()->quit();
}
......@@ -11,13 +11,13 @@ class Servatrice_ConnectionPool : public QObject {
Q_OBJECT
private:
Servatrice_DatabaseInterface *databaseInterface;
bool threaded;
mutable QMutex clientCountMutex;
int clientCount;
public:
Servatrice_ConnectionPool(Servatrice_DatabaseInterface *_databaseInterface)
: databaseInterface(_databaseInterface), clientCount(0)
{
}
Servatrice_ConnectionPool(Servatrice_DatabaseInterface *_databaseInterface);
~Servatrice_ConnectionPool();
Servatrice_DatabaseInterface *getDatabaseInterface() const { return databaseInterface; }
int getClientCount() const { QMutexLocker locker(&clientCountMutex); return clientCount; }
......
......@@ -15,6 +15,11 @@ Servatrice_DatabaseInterface::Servatrice_DatabaseInterface(int _instanceId, Serv
{
}
Servatrice_DatabaseInterface::~Servatrice_DatabaseInterface()
{
sqlDatabase.close();
}
void Servatrice_DatabaseInterface::initDatabase(const QSqlDatabase &_sqlDatabase)
{
sqlDatabase = QSqlDatabase::cloneDatabase(_sqlDatabase, "pool_" + QString::number(instanceId));
......
......@@ -18,9 +18,11 @@ private:
ServerInfo_User evalUserQueryResult(const QSqlQuery &query, bool complete, bool withId = false);
protected:
AuthenticationResult checkUserPassword(Server_ProtocolHandler *handler, const QString &user, const QString &password, QString &reasonStr, int &secondsLeft);
public slots:
void initDatabase(const QSqlDatabase &_sqlDatabase);
public:
Servatrice_DatabaseInterface(int _instanceId, Servatrice *_server);
void initDatabase(const QSqlDatabase &_sqlDatabase);
~Servatrice_DatabaseInterface();
void initDatabase(const QString &type, const QString &hostName, const QString &databaseName, const QString &userName, const QString &password);
bool openDatabase();
bool checkSql();
......
......@@ -47,7 +47,7 @@ void ServerLogger::logMessage(QString message, void *caller)
QString callerString;
if (caller)
callerString = QString::number((qulonglong) caller, 16) + " ";
buffer.append(QDateTime::currentDateTime().toString() + " " + QString::number((qulonglong) QThread::currentThread(), 16) + " " + callerString + message);
buffer.append(QDateTime::currentDateTime().toString() + " " + callerString + message);
bufferMutex.unlock();
emit sigFlushBuffer();
......
......@@ -61,14 +61,15 @@
static const int protocolVersion = 14;
ServerSocketInterface::ServerSocketInterface(Servatrice *_server, Servatrice_DatabaseInterface *_databaseInterface, QTcpSocket *_socket, QObject *parent)
ServerSocketInterface::ServerSocketInterface(Servatrice *_server, Servatrice_DatabaseInterface *_databaseInterface, QObject *parent)
: Server_ProtocolHandler(_server, _databaseInterface, parent),
servatrice(_server),
sqlInterface(reinterpret_cast<Servatrice_DatabaseInterface *>(databaseInterface)),
socket(_socket),
messageInProgress(false),
handshakeStarted(false)
{
socket = new QTcpSocket(this);
socket->setSocketOption(QAbstractSocket::LowDelayOption, 1);
connect(socket, SIGNAL(readyRead()), this, SLOT(readClient()));
connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(catchSocketError(QAbstractSocket::SocketError)));
connect(this, SIGNAL(outputBufferChanged()), this, SLOT(flushOutputBuffer()), Qt::QueuedConnection);
......@@ -79,8 +80,13 @@ ServerSocketInterface::~ServerSocketInterface()
logger->logMessage("ServerSocketInterface destructor", this);
flushOutputBuffer();
delete socket;
socket = 0;
}
void ServerSocketInterface::initConnection(int socketDescriptor)
{
socket->setSocketDescriptor(socketDescriptor);
logger->logMessage(QString("Incoming connection: %1").arg(socket->peerAddress().toString()), this);
initSessionDeprecated();
}
void ServerSocketInterface::initSessionDeprecated()
......
......@@ -93,7 +93,7 @@ private:
Response::ResponseCode processExtendedModeratorCommand(int cmdType, const ModeratorCommand &cmd, ResponseContainer &rc);
Response::ResponseCode processExtendedAdminCommand(int cmdType, const AdminCommand &cmd, ResponseContainer &rc);
public:
ServerSocketInterface(Servatrice *_server, Servatrice_DatabaseInterface *_databaseInterface, QTcpSocket *_socket, QObject *parent = 0);
ServerSocketInterface(Servatrice *_server, Servatrice_DatabaseInterface *_databaseInterface, QObject *parent = 0);
~ServerSocketInterface();
void initSessionDeprecated();
bool initSession();
......@@ -101,6 +101,8 @@ public:
QString getAddress() const { return socket->peerAddress().toString(); }
void transmitProtocolItem(const ServerMessage &item);
public slots:
void initConnection(int socketDescriptor);
};
#endif
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment