Files
HL2Overcharged/utils/common/mpi_stats.cpp
2025-05-21 21:20:08 +03:00

839 lines
20 KiB
C++

//========= Copyright Valve Corporation, All rights reserved. ============//
//
// Purpose:
//
// $NoKeywords: $
//=============================================================================//
// Nasty headers!
#include "MySqlDatabase.h"
#include "tier1/strtools.h"
#include "vmpi.h"
#include "vmpi_dispatch.h"
#include "mpi_stats.h"
#include "cmdlib.h"
#include "imysqlwrapper.h"
#include "threadhelpers.h"
#include "vmpi_tools_shared.h"
#include "tier0/icommandline.h"
/*
-- MySQL code to create the databases, create the users, and set access privileges.
-- You only need to ever run this once.
create database vrad;
use mysql;
create user vrad_worker;
create user vmpi_browser;
-- This updates the "user" table, which is checked when someone tries to connect to the database.
grant select,insert,update on vrad.* to vrad_worker;
grant select on vrad.* to vmpi_browser;
flush privileges;
/*
-- SQL code to (re)create the tables.
-- Master generates a unique job ID (in job_master_start) and sends it to workers.
-- Each worker (and the master) make a job_worker_start, link it to the primary job ID,
-- get their own unique ID, which represents that process in that job.
-- All JobWorkerID fields link to the JobWorkerID field in job_worker_start.
-- NOTE: do a "use vrad" or "use vvis" first, depending on the DB you want to create.
use vrad;
drop table job_master_start;
create table job_master_start (
JobID INTEGER UNSIGNED NOT NULL AUTO_INCREMENT, index id( JobID, MachineName(5) ),
BSPFilename TINYTEXT NOT NULL,
StartTime TIMESTAMP NOT NULL,
MachineName TEXT NOT NULL,
RunningTimeMS INTEGER UNSIGNED NOT NULL,
NumWorkers INTEGER UNSIGNED NOT NULL default 0
);
drop table job_master_end;
create table job_master_end (
JobID INTEGER UNSIGNED NOT NULL, PRIMARY KEY ( JobID ),
NumWorkersConnected SMALLINT UNSIGNED NOT NULL,
NumWorkersDisconnected SMALLINT UNSIGNED NOT NULL,
ErrorText TEXT NOT NULL
);
drop table job_worker_start;
create table job_worker_start (
JobWorkerID INTEGER UNSIGNED NOT NULL AUTO_INCREMENT,
index index_jobid( JobID ),
index index_jobworkerid( JobWorkerID ),
JobID INTEGER UNSIGNED NOT NULL, -- links to job_master_start::JobID
IsMaster BOOL NOT NULL, -- Set to 1 if this "worker" is the master process.
RunningTimeMS INTEGER UNSIGNED NOT NULL default 0,
MachineName TEXT NOT NULL,
WorkerState SMALLINT UNSIGNED NOT NULL default 0, -- 0 = disconnected, 1 = connected
NumWorkUnits INTEGER UNSIGNED NOT NULL default 0, -- how many work units this worker has completed
CurrentStage TINYTEXT NOT NULL, -- which compile stage is it on
Thread0WU INTEGER NOT NULL default 0, -- which WU thread 0 is on
Thread1WU INTEGER NOT NULL default 0, -- which WU thread 1 is on
Thread2WU INTEGER NOT NULL default 0, -- which WU thread 2 is on
Thread3WU INTEGER NOT NULL default 0 -- which WU thread 3 is on
);
drop table text_messages;
create table text_messages (
JobWorkerID INTEGER UNSIGNED NOT NULL, index id( JobWorkerID, MessageIndex ),
MessageIndex INTEGER UNSIGNED NOT NULL,
Text TEXT NOT NULL
);
drop table graph_entry;
create table graph_entry (
JobWorkerID INTEGER UNSIGNED NOT NULL, index id( JobWorkerID ),
MSSinceJobStart INTEGER UNSIGNED NOT NULL,
BytesSent INTEGER UNSIGNED NOT NULL,
BytesReceived INTEGER UNSIGNED NOT NULL
);
drop table events;
create table events (
JobWorkerID INTEGER UNSIGNED NOT NULL, index id( JobWorkerID ),
Text TEXT NOT NULL
);
*/
// Stats set by the app.
int g_nWorkersConnected = 0;
int g_nWorkersDisconnected = 0;
DWORD g_StatsStartTime;
CMySqlDatabase *g_pDB = NULL;
IMySQL *g_pSQL = NULL;
CSysModule *g_hMySQLDLL = NULL;
char g_BSPFilename[256];
bool g_bMaster = false;
unsigned long g_JobPrimaryID = 0; // This represents this job, but doesn't link to a particular machine.
unsigned long g_JobWorkerID = 0; // A unique key in the DB that represents this machine in this job.
char g_MachineName[MAX_COMPUTERNAME_LENGTH + 1] = { 0 };
unsigned long g_CurrentMessageIndex = 0;
HANDLE g_hPerfThread = NULL;
DWORD g_PerfThreadID = 0xFEFEFEFE;
HANDLE g_hPerfThreadExitEvent = NULL;
// These are set by the app and they go into the database.
extern uint64 g_ThreadWUs[4];
extern uint64 VMPI_GetNumWorkUnitsCompleted(int iProc);
// ---------------------------------------------------------------------------------------------------- //
// This is a helper class to build queries like the stream IO.
// ---------------------------------------------------------------------------------------------------- //
class CMySQLQuery
{
friend class CMySQL;
public:
// This is like a sprintf, but it will grow the string as necessary.
void Format(const char *pFormat, ...);
int Execute(IMySQL *pDB);
private:
CUtlVector<char> m_QueryText;
};
void CMySQLQuery::Format(const char *pFormat, ...)
{
#define QUERYTEXT_GROWSIZE 1024
// This keeps growing the buffer and calling _vsnprintf until the buffer is
// large enough to hold all the data.
m_QueryText.SetSize(QUERYTEXT_GROWSIZE);
while (1)
{
va_list marker;
va_start(marker, pFormat);
int ret = _vsnprintf(m_QueryText.Base(), m_QueryText.Count(), pFormat, marker);
va_end(marker);
if (ret < 0)
{
m_QueryText.SetSize(m_QueryText.Count() + QUERYTEXT_GROWSIZE);
}
else
{
m_QueryText[m_QueryText.Count() - 1] = 0;
break;
}
}
}
int CMySQLQuery::Execute(IMySQL *pDB)
{
int ret = pDB->Execute(m_QueryText.Base());
m_QueryText.Purge();
return ret;
}
// ---------------------------------------------------------------------------------------------------- //
// This inserts the necessary backslashes in front of backslashes or quote characters.
// ---------------------------------------------------------------------------------------------------- //
char* FormatStringForSQL(const char *pText)
{
// First, count the quotes in the string. We need to put a backslash in front of each one.
int nChars = 0;
const char *pCur = pText;
while (*pCur != 0)
{
if (*pCur == '\"' || *pCur == '\\')
++nChars;
++pCur;
++nChars;
}
pCur = pText;
char *pRetVal = new char[nChars + 1];
for (int i = 0; i < nChars;)
{
if (*pCur == '\"' || *pCur == '\\')
pRetVal[i++] = '\\';
pRetVal[i++] = *pCur;
++pCur;
}
pRetVal[nChars] = 0;
return pRetVal;
}
// -------------------------------------------------------------------------------- //
// Commands to add data to the database.
// -------------------------------------------------------------------------------- //
class CSQLDBCommandBase : public ISQLDBCommand
{
public:
virtual ~CSQLDBCommandBase()
{
}
virtual void deleteThis()
{
delete this;
}
};
class CSQLDBCommand_WorkerStats : public CSQLDBCommandBase
{
public:
virtual int RunCommand()
{
int nCurConnections = VMPI_GetCurrentNumberOfConnections();
// Update the NumWorkers entry.
char query[2048];
Q_snprintf(query, sizeof(query), "update job_master_start set NumWorkers=%d where JobID=%lu",
nCurConnections,
g_JobPrimaryID);
g_pSQL->Execute(query);
// Update the job_master_worker_stats stuff.
for (int i = 1; i < nCurConnections; i++)
{
unsigned long jobWorkerID = VMPI_GetJobWorkerID(i);
if (jobWorkerID != 0xFFFFFFFF)
{
Q_snprintf(query, sizeof(query), "update "
"job_worker_start set WorkerState=%d, NumWorkUnits=%d where JobWorkerID=%lu",
VMPI_IsProcConnected(i),
(int)VMPI_GetNumWorkUnitsCompleted(i),
VMPI_GetJobWorkerID(i)
);
g_pSQL->Execute(query);
}
}
return 1;
}
};
class CSQLDBCommand_JobMasterEnd : public CSQLDBCommandBase
{
public:
virtual int RunCommand()
{
CMySQLQuery query;
query.Format("insert into job_master_end values ( %lu, %d, %d, \"no errors\" )", g_JobPrimaryID, g_nWorkersConnected, g_nWorkersDisconnected);
query.Execute(g_pSQL);
// Now set RunningTimeMS.
unsigned long runningTimeMS = GetTickCount() - g_StatsStartTime;
query.Format("update job_master_start set RunningTimeMS=%lu where JobID=%lu", runningTimeMS, g_JobPrimaryID);
query.Execute(g_pSQL);
return 1;
}
};
void UpdateJobWorkerRunningTime()
{
unsigned long runningTimeMS = GetTickCount() - g_StatsStartTime;
char curStage[256];
VMPI_GetCurrentStage(curStage, sizeof(curStage));
CMySQLQuery query;
query.Format("update job_worker_start set RunningTimeMS=%lu, CurrentStage=\"%s\", "
"Thread0WU=%d, Thread1WU=%d, Thread2WU=%d, Thread3WU=%d where JobWorkerID=%lu",
runningTimeMS,
curStage,
(int)g_ThreadWUs[0],
(int)g_ThreadWUs[1],
(int)g_ThreadWUs[2],
(int)g_ThreadWUs[3],
g_JobWorkerID);
query.Execute(g_pSQL);
}
class CSQLDBCommand_GraphEntry : public CSQLDBCommandBase
{
public:
CSQLDBCommand_GraphEntry(DWORD msTime, DWORD nBytesSent, DWORD nBytesReceived)
{
m_msTime = msTime;
m_nBytesSent = nBytesSent;
m_nBytesReceived = nBytesReceived;
}
virtual int RunCommand()
{
CMySQLQuery query;
query.Format("insert into graph_entry (JobWorkerID, MSSinceJobStart, BytesSent, BytesReceived) "
"values ( %lu, %lu, %lu, %lu )",
g_JobWorkerID,
m_msTime,
m_nBytesSent,
m_nBytesReceived);
query.Execute(g_pSQL);
UpdateJobWorkerRunningTime();
++g_CurrentMessageIndex;
return 1;
}
DWORD m_nBytesSent;
DWORD m_nBytesReceived;
DWORD m_msTime;
};
class CSQLDBCommand_TextMessage : public CSQLDBCommandBase
{
public:
CSQLDBCommand_TextMessage(const char *pText)
{
m_pText = FormatStringForSQL(pText);
}
virtual ~CSQLDBCommand_TextMessage()
{
delete[] m_pText;
}
virtual int RunCommand()
{
CMySQLQuery query;
query.Format("insert into text_messages (JobWorkerID, MessageIndex, Text) values ( %lu, %lu, \"%s\" )", g_JobWorkerID, g_CurrentMessageIndex, m_pText);
query.Execute(g_pSQL);
++g_CurrentMessageIndex;
return 1;
}
char *m_pText;
};
// -------------------------------------------------------------------------------- //
// Internal helpers.
// -------------------------------------------------------------------------------- //
// This is the spew output before it has connected to the MySQL database.
CCriticalSection g_SpewTextCS;
CUtlVector<char> g_SpewText(1024);
void VMPI_Stats_SpewHook(const char *pMsg)
{
CCriticalSectionLock csLock(&g_SpewTextCS);
csLock.Lock();
// Queue the text up so we can send it to the DB right away when we connect.
g_SpewText.AddMultipleToTail(strlen(pMsg), pMsg);
}
void PerfThread_SendSpewText()
{
// Send the spew text to the database.
CCriticalSectionLock csLock(&g_SpewTextCS);
csLock.Lock();
if (g_SpewText.Count() > 0)
{
g_SpewText.AddToTail(0);
if (g_bMPI_StatsTextOutput)
{
g_pDB->AddCommandToQueue(new CSQLDBCommand_TextMessage(g_SpewText.Base()), NULL);
}
else
{
// Just show one message in the vmpi_job_watch window to let them know that they need
// to use a command line option to get the output.
static bool bFirst = true;
if (bFirst)
{
char msg[512];
V_snprintf(msg, sizeof(msg), "%s not enabled", VMPI_GetParamString(mpi_Stats_TextOutput));
bFirst = false;
g_pDB->AddCommandToQueue(new CSQLDBCommand_TextMessage(msg), NULL);
}
}
g_SpewText.RemoveAll();
}
csLock.Unlock();
}
void PerfThread_AddGraphEntry(DWORD startTicks, DWORD &lastSent, DWORD &lastReceived)
{
// Send the graph entry with data transmission info.
DWORD curSent = g_nBytesSent + g_nMulticastBytesSent;
DWORD curReceived = g_nBytesReceived + g_nMulticastBytesReceived;
g_pDB->AddCommandToQueue(
new CSQLDBCommand_GraphEntry(
GetTickCount() - startTicks,
curSent - lastSent,
curReceived - lastReceived),
NULL);
lastSent = curSent;
lastReceived = curReceived;
}
// This function adds a graph_entry into the database periodically.
DWORD WINAPI PerfThreadFn(LPVOID pParameter)
{
DWORD lastSent = 0;
DWORD lastReceived = 0;
DWORD startTicks = GetTickCount();
while (WaitForSingleObject(g_hPerfThreadExitEvent, 1000) != WAIT_OBJECT_0)
{
PerfThread_AddGraphEntry(startTicks, lastSent, lastReceived);
// Send updates for text output.
PerfThread_SendSpewText();
// If we're the master, update all the worker stats.
if (g_bMaster)
{
g_pDB->AddCommandToQueue(
new CSQLDBCommand_WorkerStats,
NULL);
}
}
// Add the remaining text and one last graph entry (which will include the current stage info).
PerfThread_SendSpewText();
PerfThread_AddGraphEntry(startTicks, lastSent, lastReceived);
SetEvent(g_hPerfThreadExitEvent);
return 0;
}
// -------------------------------------------------------------------------------- //
// VMPI_Stats interface.
// -------------------------------------------------------------------------------- //
void VMPI_Stats_InstallSpewHook()
{
InstallExtraSpewHook(VMPI_Stats_SpewHook);
}
void UnloadMySQLWrapper()
{
if (g_hMySQLDLL)
{
if (g_pSQL)
{
g_pSQL->Release();
g_pSQL = NULL;
}
Sys_UnloadModule(g_hMySQLDLL);
g_hMySQLDLL = NULL;
}
}
bool LoadMySQLWrapper(
const char *pHostName,
const char *pDBName,
const char *pUserName
)
{
UnloadMySQLWrapper();
// Load the DLL and the interface.
if (!Sys_LoadInterface("mysql_wrapper", MYSQL_WRAPPER_VERSION_NAME, &g_hMySQLDLL, (void**)&g_pSQL))
return false;
// Try to init the database.
if (!g_pSQL->InitMySQL(pDBName, pHostName, pUserName))
{
UnloadMySQLWrapper();
return false;
}
return true;
}
bool VMPI_Stats_Init_Master(
const char *pHostName,
const char *pDBName,
const char *pUserName,
const char *pBSPFilename,
unsigned long *pDBJobID)
{
Assert(!g_pDB);
g_bMaster = true;
// Connect the database.
g_pDB = new CMySqlDatabase;
if (!g_pDB || !g_pDB->Initialize() || !LoadMySQLWrapper(pHostName, pDBName, pUserName))
{
delete g_pDB;
g_pDB = NULL;
return false;
}
DWORD size = sizeof(g_MachineName);
GetComputerName(g_MachineName, &size);
// Create the job_master_start row.
Q_FileBase(pBSPFilename, g_BSPFilename, sizeof(g_BSPFilename));
g_JobPrimaryID = 0;
CMySQLQuery query;
query.Format("insert into job_master_start ( BSPFilename, StartTime, MachineName, RunningTimeMS ) values ( \"%s\", null, \"%s\", %lu )", g_BSPFilename, g_MachineName, RUNNINGTIME_MS_SENTINEL);
query.Execute(g_pSQL);
g_JobPrimaryID = g_pSQL->InsertID();
if (g_JobPrimaryID == 0)
{
delete g_pDB;
g_pDB = NULL;
return false;
}
// Now init the worker portion.
*pDBJobID = g_JobPrimaryID;
return VMPI_Stats_Init_Worker(NULL, NULL, NULL, g_JobPrimaryID);
}
bool VMPI_Stats_Init_Worker(const char *pHostName, const char *pDBName, const char *pUserName, unsigned long DBJobID)
{
g_StatsStartTime = GetTickCount();
// If pDBServerName is null, then we're the master and we just want to make the job_worker_start entry.
if (pHostName)
{
Assert(!g_pDB);
// Connect the database.
g_pDB = new CMySqlDatabase;
if (!g_pDB || !g_pDB->Initialize() || !LoadMySQLWrapper(pHostName, pDBName, pUserName))
{
delete g_pDB;
g_pDB = NULL;
return false;
}
// Get our machine name to store in the database.
DWORD size = sizeof(g_MachineName);
GetComputerName(g_MachineName, &size);
}
g_JobPrimaryID = DBJobID;
g_JobWorkerID = 0;
CMySQLQuery query;
query.Format("insert into job_worker_start ( JobID, CurrentStage, IsMaster, MachineName ) values ( %lu, \"none\", %d, \"%s\" )",
g_JobPrimaryID, g_bMaster, g_MachineName);
query.Execute(g_pSQL);
g_JobWorkerID = g_pSQL->InsertID();
if (g_JobWorkerID == 0)
{
delete g_pDB;
g_pDB = NULL;
return false;
}
// Now create a thread that samples perf data and stores it in the database.
g_hPerfThreadExitEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
g_hPerfThread = CreateThread(
NULL,
0,
PerfThreadFn,
NULL,
0,
&g_PerfThreadID);
return true;
}
void VMPI_Stats_Term()
{
if (!g_pDB)
return;
// Stop the thread.
SetEvent(g_hPerfThreadExitEvent);
WaitForSingleObject(g_hPerfThread, INFINITE);
CloseHandle(g_hPerfThreadExitEvent);
g_hPerfThreadExitEvent = NULL;
CloseHandle(g_hPerfThread);
g_hPerfThread = NULL;
if (g_bMaster)
{
// (Write a job_master_end entry here).
g_pDB->AddCommandToQueue(new CSQLDBCommand_JobMasterEnd, NULL);
}
// Wait for up to a second for the DB to finish writing its data.
DWORD startTime = GetTickCount();
while (GetTickCount() - startTime < 1000)
{
if (g_pDB->QueriesInOutQueue() == 0)
break;
}
delete g_pDB;
g_pDB = NULL;
UnloadMySQLWrapper();
}
static bool ReadStringFromFile(FILE *fp, char *pStr, int strSize)
{
int i = 0;
for (i; i < strSize - 2; i++)
{
if (fread(&pStr[i], 1, 1, fp) != 1 ||
pStr[i] == '\n')
{
break;
}
}
pStr[i] = 0;
return i != 0;
}
// This looks for pDBInfoFilename in the same path as pBaseExeFilename.
// The file has 3 lines: machine name (with database), database name, username
void GetDBInfo(const char *pDBInfoFilename, CDBInfo *pInfo)
{
char baseExeFilename[512];
if (!GetModuleFileName(GetModuleHandle(NULL), baseExeFilename, sizeof(baseExeFilename)))
Error("GetModuleFileName failed.");
// Look for the info file in the same directory as the exe.
char dbInfoFilename[512];
Q_strncpy(dbInfoFilename, baseExeFilename, sizeof(dbInfoFilename));
Q_StripFilename(dbInfoFilename);
if (dbInfoFilename[0] == 0)
Q_strncpy(dbInfoFilename, ".", sizeof(dbInfoFilename));
Q_strncat(dbInfoFilename, "/", sizeof(dbInfoFilename), COPY_ALL_CHARACTERS);
Q_strncat(dbInfoFilename, pDBInfoFilename, sizeof(dbInfoFilename), COPY_ALL_CHARACTERS);
FILE *fp = fopen(dbInfoFilename, "rt");
if (!fp)
{
Error("Can't open %s for database info.\n", dbInfoFilename);
}
if (!ReadStringFromFile(fp, pInfo->m_HostName, sizeof(pInfo->m_HostName)) ||
!ReadStringFromFile(fp, pInfo->m_DBName, sizeof(pInfo->m_DBName)) ||
!ReadStringFromFile(fp, pInfo->m_UserName, sizeof(pInfo->m_UserName))
)
{
Error("%s is not a valid database info file.\n", dbInfoFilename);
}
fclose(fp);
}
void RunJobWatchApp(char *pCmdLine)
{
STARTUPINFO si;
memset(&si, 0, sizeof(si));
si.cb = sizeof(si);
PROCESS_INFORMATION pi;
memset(&pi, 0, sizeof(pi));
// Working directory should be the same as our exe's directory.
char dirName[512];
if (GetModuleFileName(NULL, dirName, sizeof(dirName)) != 0)
{
char *s1 = V_strrchr(dirName, '\\');
char *s2 = V_strrchr(dirName, '/');
if (s1 || s2)
{
// Get rid of the last slash.
s1 = max(s1, s2);
s1[0] = 0;
if (!CreateProcess(
NULL,
pCmdLine,
NULL, // security
NULL,
TRUE,
0, // flags
NULL, // environment
dirName, // current directory
&si,
&pi))
{
Warning("%s - error launching '%s'\n", VMPI_GetParamString(mpi_Job_Watch), pCmdLine);
}
}
}
}
void StatsDB_InitStatsDatabase(
int argc,
char **argv,
const char *pDBInfoFilename)
{
// Did they disable the stats database?
if (!g_bMPI_Stats && !VMPI_IsParamUsed(mpi_Job_Watch))
return;
unsigned long jobPrimaryID;
// Now open the DB.
if (g_bMPIMaster)
{
CDBInfo dbInfo;
GetDBInfo(pDBInfoFilename, &dbInfo);
if (!VMPI_Stats_Init_Master(dbInfo.m_HostName, dbInfo.m_DBName, dbInfo.m_UserName, argv[argc - 1], &jobPrimaryID))
{
Warning("VMPI_Stats_Init_Master( %s, %s, %s ) failed.\n", dbInfo.m_HostName, dbInfo.m_DBName, dbInfo.m_UserName);
// Tell the workers not to use stats.
dbInfo.m_HostName[0] = 0;
}
char cmdLine[2048];
Q_snprintf(cmdLine, sizeof(cmdLine), "vmpi_job_watch -JobID %d", jobPrimaryID);
Msg("\nTo watch this job, run this command line:\n%s\n\n", cmdLine);
if (VMPI_IsParamUsed(mpi_Job_Watch))
{
// Convenience thing to automatically launch the job watch for this job.
RunJobWatchApp(cmdLine);
}
// Send the database info to all the workers.
SendDBInfo(&dbInfo, jobPrimaryID);
}
else
{
// Wait to get DB info so we can connect to the MySQL database.
CDBInfo dbInfo;
unsigned long jobPrimaryID;
RecvDBInfo(&dbInfo, &jobPrimaryID);
if (dbInfo.m_HostName[0] != 0)
{
if (!VMPI_Stats_Init_Worker(dbInfo.m_HostName, dbInfo.m_DBName, dbInfo.m_UserName, jobPrimaryID))
Error("VMPI_Stats_Init_Worker( %s, %s, %s, %d ) failed.\n", dbInfo.m_HostName, dbInfo.m_DBName, dbInfo.m_UserName, jobPrimaryID);
}
}
}
unsigned long StatsDB_GetUniqueJobID()
{
return g_JobPrimaryID;
}
unsigned long VMPI_Stats_GetJobWorkerID()
{
return g_JobWorkerID;
}