Files
mcpe/thirdparty/raknet/CloudServer.cpp
iProgramInCpp 9642818a88 * Initial commit.
:)
2023-07-30 22:22:02 +03:00

1686 lines
57 KiB
C++

/*
* Copyright (c) 2014, Oculus VR, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*
*/
#include "NativeFeatureIncludes.h"
#if _RAKNET_SUPPORT_CloudServer==1
#include "CloudServer.h"
#include "GetTime.h"
#include "MessageIdentifiers.h"
#include "BitStream.h"
#include "RakPeerInterface.h"
enum ServerToServerCommands
{
STSC_PROCESS_GET_REQUEST,
STSC_PROCESS_GET_RESPONSE,
STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS,
STSC_ADD_UPLOADED_KEY,
STSC_ADD_SUBSCRIBED_KEY,
STSC_REMOVE_UPLOADED_KEY,
STSC_REMOVE_SUBSCRIBED_KEY,
STSC_DATA_CHANGED,
};
using namespace RakNet;
int CloudServer::RemoteServerComp(const RakNetGUID &key, RemoteServer* const &data )
{
if (key < data->serverAddress)
return -1;
if (key > data->serverAddress)
return 1;
return 0;
}
int CloudServer::KeySubscriberIDComp(const CloudKey &key, KeySubscriberID * const &data )
{
if (key.primaryKey < data->key.primaryKey)
return -1;
if (key.primaryKey > data->key.primaryKey)
return 1;
if (key.secondaryKey < data->key.secondaryKey)
return -1;
if (key.secondaryKey > data->key.secondaryKey)
return 1;
return 0;
}
int CloudServer::KeyDataPtrComp( const RakNetGUID &key, CloudData* const &data )
{
if (key < data->clientGUID)
return -1;
if (key > data->clientGUID)
return 1;
return 0;
}
int CloudServer::KeyDataListComp( const CloudKey &key, CloudDataList * const &data )
{
if (key.primaryKey < data->key.primaryKey)
return -1;
if (key.primaryKey > data->key.primaryKey)
return 1;
if (key.secondaryKey < data->key.secondaryKey)
return -1;
if (key.secondaryKey > data->key.secondaryKey)
return 1;
return 0;
}
int CloudServer::BufferedGetResponseFromServerComp(const RakNetGUID &key, CloudServer::BufferedGetResponseFromServer* const &data )
{
if (key < data->serverAddress)
return -1;
if (key > data->serverAddress)
return 1;
return 0;
}
int CloudServer::GetRequestComp(const uint32_t &key, CloudServer::GetRequest* const &data )
{
if (key < data->requestId)
return -1;
if (key > data->requestId)
return -1;
return 0;
}
void CloudServer::CloudQueryWithAddresses::Serialize(bool writeToBitstream, BitStream *bitStream)
{
cloudQuery.Serialize(writeToBitstream, bitStream);
if (writeToBitstream)
{
bitStream->WriteCasted<uint16_t>(specificSystems.Size());
RakAssert(specificSystems.Size() < (uint16_t)-1 );
for (uint16_t i=0; i < specificSystems.Size(); i++)
{
bitStream->Write(specificSystems[i]);
}
}
else
{
uint16_t specificSystemsCount;
RakNetGUID addressOrGuid;
bitStream->Read(specificSystemsCount);
for (uint16_t i=0; i < specificSystemsCount; i++)
{
bitStream->Read(addressOrGuid);
specificSystems.Push(addressOrGuid, _FILE_AND_LINE_);
}
}
}
bool CloudServer::GetRequest::AllRemoteServersHaveResponded(void) const
{
unsigned int i;
for (i=0; i < remoteServerResponses.Size(); i++)
if (remoteServerResponses[i]->gotResult==false)
return false;
return true;
}
void CloudServer::GetRequest::Clear(CloudAllocator *allocator)
{
unsigned int i;
for (i=0; i < remoteServerResponses.Size(); i++)
{
remoteServerResponses[i]->Clear(allocator);
RakNet::OP_DELETE(remoteServerResponses[i], _FILE_AND_LINE_);
}
remoteServerResponses.Clear(false, _FILE_AND_LINE_);
}
void CloudServer::BufferedGetResponseFromServer::Clear(CloudAllocator *allocator)
{
unsigned int i;
for (i=0; i < queryResult.rowsReturned.Size(); i++)
{
allocator->DeallocateRowData(queryResult.rowsReturned[i]->data);
allocator->DeallocateCloudQueryRow(queryResult.rowsReturned[i]);
}
queryResult.rowsReturned.Clear(false, _FILE_AND_LINE_);
}
CloudServer::CloudServer()
{
maxUploadBytesPerClient=0;
maxBytesPerDowload=0;
nextGetRequestId=0;
nextGetRequestsCheck=0;
}
CloudServer::~CloudServer()
{
Clear();
}
void CloudServer::SetMaxUploadBytesPerClient(uint64_t bytes)
{
maxUploadBytesPerClient=bytes;
}
void CloudServer::SetMaxBytesPerDownload(uint64_t bytes)
{
maxBytesPerDowload=bytes;
}
void CloudServer::Update(void)
{
// Timeout getRequests
RakNet::Time time = RakNet::Time();
if (time > nextGetRequestsCheck)
{
nextGetRequestsCheck=time+1000;
unsigned int i=0;
while (i < getRequests.Size())
{
if (time - getRequests[i]->requestStartTime > 3000)
{
// Remote server is not responding, just send back data with whoever did respond
ProcessAndTransmitGetRequest(getRequests[i]);
getRequests[i]->Clear(this);
RakNet::OP_DELETE(getRequests[i],_FILE_AND_LINE_);
getRequests.RemoveAtIndex(i);
}
else
{
i++;
}
}
}
}
PluginReceiveResult CloudServer::OnReceive(Packet *packet)
{
switch (packet->data[0])
{
case ID_CLOUD_POST_REQUEST:
OnPostRequest(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case ID_CLOUD_RELEASE_REQUEST:
OnReleaseRequest(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case ID_CLOUD_GET_REQUEST:
OnGetRequest(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case ID_CLOUD_UNSUBSCRIBE_REQUEST:
OnUnsubscribeRequest(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case ID_CLOUD_SERVER_TO_SERVER_COMMAND:
if (packet->length>1)
{
switch (packet->data[1])
{
case STSC_PROCESS_GET_REQUEST:
OnServerToServerGetRequest(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case STSC_PROCESS_GET_RESPONSE:
OnServerToServerGetResponse(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS:
OnSendUploadedAndSubscribedKeysToServer(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case STSC_ADD_UPLOADED_KEY:
OnSendUploadedKeyToServers(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case STSC_ADD_SUBSCRIBED_KEY:
OnSendSubscribedKeyToServers(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case STSC_REMOVE_UPLOADED_KEY:
OnRemoveUploadedKeyFromServers(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case STSC_REMOVE_SUBSCRIBED_KEY:
OnRemoveSubscribedKeyFromServers(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
case STSC_DATA_CHANGED:
OnServerDataChanged(packet);
return RR_STOP_PROCESSING_AND_DEALLOCATE;
}
}
return RR_STOP_PROCESSING_AND_DEALLOCATE;
}
return RR_CONTINUE_PROCESSING;
}
void CloudServer::OnPostRequest(Packet *packet)
{
RakNet::BitStream bsIn(packet->data, packet->length, false);
bsIn.IgnoreBytes(sizeof(MessageID));
CloudKey key;
key.Serialize(false,&bsIn);
uint32_t dataLengthBytes;
bsIn.Read(dataLengthBytes);
if (maxUploadBytesPerClient>0 && dataLengthBytes>maxUploadBytesPerClient)
return; // Exceeded max upload bytes
bsIn.AlignReadToByteBoundary();
for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
{
if (queryFilters[filterIndex]->OnPostRequest(packet->guid, packet->systemAddress, key, dataLengthBytes, (const char*) bsIn.GetData()+BITS_TO_BYTES(bsIn.GetReadOffset()))==false)
return;
}
unsigned char *data;
if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE)
{
data = (unsigned char *) rakMalloc_Ex(dataLengthBytes,_FILE_AND_LINE_);
if (data==0)
{
notifyOutOfMemory(_FILE_AND_LINE_);
return;
}
bsIn.ReadAlignedBytes(data,dataLengthBytes);
}
else
data=0;
// Add this system to remoteSystems if they aren't there already
DataStructures::HashIndex remoteSystemsHashIndex = remoteSystems.GetIndexOf(packet->guid);
RemoteCloudClient *remoteCloudClient;
if (remoteSystemsHashIndex.IsInvalid())
{
remoteCloudClient = RakNet::OP_NEW<RemoteCloudClient>(_FILE_AND_LINE_);
remoteCloudClient->uploadedKeys.Insert(key,key,true,_FILE_AND_LINE_);
remoteCloudClient->uploadedBytes=0;
remoteSystems.Push(packet->guid, remoteCloudClient, _FILE_AND_LINE_);
}
else
{
remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemsHashIndex);
bool objectExists;
// Add to RemoteCloudClient::uploadedKeys if it isn't there already
unsigned int uploadedKeysIndex = remoteCloudClient->uploadedKeys.GetIndexFromKey(key,&objectExists);
if (objectExists==false)
{
remoteCloudClient->uploadedKeys.InsertAtIndex(key, uploadedKeysIndex, _FILE_AND_LINE_);
}
}
bool cloudDataAlreadyUploaded;
unsigned int dataRepositoryIndex;
bool dataRepositoryExists;
CloudDataList* cloudDataList = GetOrAllocateCloudDataList(key, &dataRepositoryExists, dataRepositoryIndex);
if (dataRepositoryExists==false)
{
cloudDataList->uploaderCount=1;
cloudDataAlreadyUploaded=false;
}
else
{
cloudDataAlreadyUploaded=cloudDataList->uploaderCount>0;
cloudDataList->uploaderCount++;
}
CloudData *cloudData;
bool keyDataListExists;
unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(packet->guid, &keyDataListExists);
if (keyDataListExists==false)
{
if (maxUploadBytesPerClient>0 && remoteCloudClient->uploadedBytes+dataLengthBytes>maxUploadBytesPerClient)
{
// Undo prior insertion of cloudDataList into cloudData if needed
if (keyDataListExists==false)
{
RakNet::OP_DELETE(cloudDataList,_FILE_AND_LINE_);
dataRepository.RemoveAtIndex(dataRepositoryIndex);
}
if (remoteCloudClient->IsUnused())
{
RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
remoteSystems.Remove(packet->guid, _FILE_AND_LINE_);
}
if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE)
rakFree_Ex(data, _FILE_AND_LINE_);
return;
}
cloudData = RakNet::OP_NEW<CloudData>(_FILE_AND_LINE_);
cloudData->dataLengthBytes=dataLengthBytes;
cloudData->isUploaded=true;
if (forceAddress!=UNASSIGNED_SYSTEM_ADDRESS)
{
cloudData->serverSystemAddress=forceAddress;
cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetExternalID(packet->systemAddress).GetPort());
}
else
{
cloudData->serverSystemAddress=rakPeerInterface->GetExternalID(packet->systemAddress);
if (cloudData->serverSystemAddress.IsLoopback())
cloudData->serverSystemAddress.FromString(rakPeerInterface->GetLocalIP(0));
}
if (cloudData->serverSystemAddress.GetPort()==0)
{
// Fix localhost port
cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetSocket(UNASSIGNED_SYSTEM_ADDRESS)->GetBoundAddress().GetPort());
}
cloudData->clientSystemAddress=packet->systemAddress;
cloudData->serverGUID=rakPeerInterface->GetMyGUID();
cloudData->clientGUID=packet->guid;
cloudDataList->keyData.Insert(packet->guid,cloudData,true,_FILE_AND_LINE_);
}
else
{
cloudData = cloudDataList->keyData[keyDataListIndex];
if (cloudDataAlreadyUploaded==false)
{
if (forceAddress!=UNASSIGNED_SYSTEM_ADDRESS)
{
cloudData->serverSystemAddress=forceAddress;
cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetExternalID(packet->systemAddress).GetPort());
}
else
{
cloudData->serverSystemAddress=rakPeerInterface->GetExternalID(packet->systemAddress);
}
if (cloudData->serverSystemAddress.GetPort()==0)
{
// Fix localhost port
cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetSocket(UNASSIGNED_SYSTEM_ADDRESS)->GetBoundAddress().GetPort());
}
cloudData->clientSystemAddress=packet->systemAddress;
}
if (maxUploadBytesPerClient>0 && remoteCloudClient->uploadedBytes-cloudData->dataLengthBytes+dataLengthBytes>maxUploadBytesPerClient)
{
// Undo prior insertion of cloudDataList into cloudData if needed
if (dataRepositoryExists==false)
{
RakNet::OP_DELETE(cloudDataList,_FILE_AND_LINE_);
dataRepository.RemoveAtIndex(dataRepositoryIndex);
}
return;
}
else
{
// Subtract already used bytes we are overwriting
remoteCloudClient->uploadedBytes-=cloudData->dataLengthBytes;
}
if (cloudData->allocatedData!=0)
rakFree_Ex(cloudData->allocatedData,_FILE_AND_LINE_);
}
if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE)
{
// Data already allocated
cloudData->allocatedData=data;
cloudData->dataPtr=data;
}
else
{
// Read to stack
if (dataLengthBytes>0)
bsIn.ReadAlignedBytes(cloudData->stackData,dataLengthBytes);
cloudData->allocatedData=0;
cloudData->dataPtr=cloudData->stackData;
}
// Update how many bytes were written for this data
cloudData->dataLengthBytes=dataLengthBytes;
remoteCloudClient->uploadedBytes+=dataLengthBytes;
if (cloudDataAlreadyUploaded==false)
{
// New data field
SendUploadedKeyToServers(cloudDataList->key);
}
// Existing data field changed
NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, true );
NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, true );
// Send update to all remote servers that subscribed to this key
NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, true);
// I could have also subscribed to a key not yet updated locally
// This means I have to go through every RemoteClient that wants this key
// Seems like cloudData->specificSubscribers is unnecessary in that case
}
void CloudServer::OnReleaseRequest(Packet *packet)
{
RakNet::BitStream bsIn(packet->data, packet->length, false);
bsIn.IgnoreBytes(sizeof(MessageID));
uint16_t keyCount;
bsIn.Read(keyCount);
if (keyCount==0)
return;
DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(packet->guid);
if (remoteSystemIndex.IsInvalid()==true)
return;
RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex);
CloudKey key;
// Read all in a list first so I can run filter on it
DataStructures::List<CloudKey> cloudKeys;
for (uint16_t keyCountIndex=0; keyCountIndex < keyCount; keyCountIndex++)
{
key.Serialize(false, &bsIn);
cloudKeys.Push(key, _FILE_AND_LINE_);
}
for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
{
if (queryFilters[filterIndex]->OnReleaseRequest(packet->guid, packet->systemAddress, cloudKeys)==false)
return;
}
for (uint16_t keyCountIndex=0; keyCountIndex < keyCount; keyCountIndex++)
{
// Serialize in list above so I can run the filter on it
// key.Serialize(false, &bsIn);
key=cloudKeys[keyCountIndex];
// Remove remote systems uploaded keys
bool objectExists;
unsigned int uploadedKeysIndex = remoteCloudClient->uploadedKeys.GetIndexFromKey(key,&objectExists);
if (objectExists)
{
bool dataRepositoryExists;
unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(key, &dataRepositoryExists);
CloudDataList* cloudDataList = dataRepository[dataRepositoryIndex];
RakAssert(cloudDataList);
CloudData *cloudData;
bool keyDataListExists;
unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(packet->guid, &keyDataListExists);
cloudData = cloudDataList->keyData[keyDataListIndex];
remoteCloudClient->uploadedKeys.RemoveAtIndex(uploadedKeysIndex);
remoteCloudClient->uploadedBytes-=cloudData->dataLengthBytes;
cloudDataList->uploaderCount--;
// Broadcast destruction of this key to subscribers
NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, false );
NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, false );
NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, false );
cloudData->Clear();
if (cloudData->IsUnused())
{
RakNet::OP_DELETE(cloudData, _FILE_AND_LINE_);
cloudDataList->keyData.RemoveAtIndex(keyDataListIndex);
if (cloudDataList->IsNotUploaded())
{
// Tell other servers that this key is no longer uploaded, so they do not request it from us
RemoveUploadedKeyFromServers(cloudDataList->key);
}
if (cloudDataList->IsUnused())
{
RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
dataRepository.RemoveAtIndex(dataRepositoryIndex);
}
}
if (remoteCloudClient->IsUnused())
{
RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_);
break;
}
}
}
}
void CloudServer::OnGetRequest(Packet *packet)
{
RakNet::BitStream bsIn(packet->data, packet->length, false);
bsIn.IgnoreBytes(sizeof(MessageID));
uint16_t specificSystemsCount;
CloudKey cloudKey;
// Create a new GetRequest
GetRequest *getRequest;
getRequest = RakNet::OP_NEW<GetRequest>(_FILE_AND_LINE_);
getRequest->cloudQueryWithAddresses.cloudQuery.Serialize(false, &bsIn);
getRequest->requestingClient=packet->guid;
RakNetGUID addressOrGuid;
bsIn.Read(specificSystemsCount);
for (uint16_t i=0; i < specificSystemsCount; i++)
{
bsIn.Read(addressOrGuid);
getRequest->cloudQueryWithAddresses.specificSystems.Push(addressOrGuid, _FILE_AND_LINE_);
}
if (getRequest->cloudQueryWithAddresses.cloudQuery.keys.Size()==0)
{
RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
return;
}
for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
{
if (queryFilters[filterIndex]->OnGetRequest(packet->guid, packet->systemAddress, getRequest->cloudQueryWithAddresses.cloudQuery, getRequest->cloudQueryWithAddresses.specificSystems )==false)
return;
}
getRequest->requestStartTime=RakNet::GetTime();
getRequest->requestId=nextGetRequestId++;
// Send request to servers that have this data
DataStructures::List<RemoteServer*> remoteServersWithData;
GetServersWithUploadedKeys(getRequest->cloudQueryWithAddresses.cloudQuery.keys, remoteServersWithData);
if (remoteServersWithData.Size()==0)
{
ProcessAndTransmitGetRequest(getRequest);
}
else
{
RakNet::BitStream bsOut;
bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
bsOut.Write((MessageID)STSC_PROCESS_GET_REQUEST);
getRequest->cloudQueryWithAddresses.Serialize(true, &bsOut);
bsOut.Write(getRequest->requestId);
for (unsigned int remoteServerIndex=0; remoteServerIndex < remoteServersWithData.Size(); remoteServerIndex++)
{
BufferedGetResponseFromServer* bufferedGetResponseFromServer = RakNet::OP_NEW<BufferedGetResponseFromServer>(_FILE_AND_LINE_);
bufferedGetResponseFromServer->serverAddress=remoteServersWithData[remoteServerIndex]->serverAddress;
bufferedGetResponseFromServer->gotResult=false;
getRequest->remoteServerResponses.Insert(remoteServersWithData[remoteServerIndex]->serverAddress, bufferedGetResponseFromServer, true, _FILE_AND_LINE_);
SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServersWithData[remoteServerIndex]->serverAddress, false);
}
// Record that this system made this request
getRequests.Insert(getRequest->requestId, getRequest, true, _FILE_AND_LINE_);
}
if (getRequest->cloudQueryWithAddresses.cloudQuery.subscribeToResults)
{
// Add to key subscription list for the client, which contains a keyId / specificUploaderList pair
DataStructures::HashIndex remoteSystemsHashIndex = remoteSystems.GetIndexOf(packet->guid);
RemoteCloudClient *remoteCloudClient;
if (remoteSystemsHashIndex.IsInvalid())
{
remoteCloudClient = RakNet::OP_NEW<RemoteCloudClient>(_FILE_AND_LINE_);
remoteCloudClient->uploadedBytes=0;
remoteSystems.Push(packet->guid, remoteCloudClient, _FILE_AND_LINE_);
}
else
{
remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemsHashIndex);
}
unsigned int keyIndex;
for (keyIndex=0; keyIndex < getRequest->cloudQueryWithAddresses.cloudQuery.keys.Size(); keyIndex++)
{
cloudKey = getRequest->cloudQueryWithAddresses.cloudQuery.keys[keyIndex];
unsigned int keySubscriberIndex;
bool hasKeySubscriber;
keySubscriberIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudKey, &hasKeySubscriber);
KeySubscriberID* keySubscriberId;
if (hasKeySubscriber)
{
DataStructures::List<RakNetGUID> specificSystems;
UnsubscribeFromKey(remoteCloudClient, packet->guid, keySubscriberIndex, cloudKey, specificSystems);
}
keySubscriberId = RakNet::OP_NEW<KeySubscriberID>(_FILE_AND_LINE_);
keySubscriberId->key=cloudKey;
unsigned int specificSystemIndex;
for (specificSystemIndex=0; specificSystemIndex < getRequest->cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++)
{
keySubscriberId->specificSystemsSubscribedTo.Insert(getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex], getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex], true, _FILE_AND_LINE_);
}
remoteCloudClient->subscribedKeys.InsertAtIndex(keySubscriberId, keySubscriberIndex, _FILE_AND_LINE_);
// Add CloudData in a similar way
unsigned int dataRepositoryIndex;
bool dataRepositoryExists;
CloudDataList* cloudDataList = GetOrAllocateCloudDataList(cloudKey, &dataRepositoryExists, dataRepositoryIndex);
// If this is the first local client to subscribe to this key, call SendSubscribedKeyToServers
if (cloudDataList->subscriberCount==0)
SendSubscribedKeyToServers(cloudKey);
// If the subscription is specific, may have to also allocate CloudData
if (getRequest->cloudQueryWithAddresses.specificSystems.Size())
{
CloudData *cloudData;
bool keyDataListExists;
unsigned int specificSystemIndex;
for (specificSystemIndex=0; specificSystemIndex < getRequest->cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++)
{
RakNetGUID specificSystem = getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex];
unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(specificSystem, &keyDataListExists);
if (keyDataListExists==false)
{
cloudData = RakNet::OP_NEW<CloudData>(_FILE_AND_LINE_);
cloudData->dataLengthBytes=0;
cloudData->allocatedData=0;
cloudData->isUploaded=false;
cloudData->dataPtr=0;
cloudData->serverSystemAddress=UNASSIGNED_SYSTEM_ADDRESS;
cloudData->clientSystemAddress=UNASSIGNED_SYSTEM_ADDRESS;
cloudData->serverGUID=rakPeerInterface->GetMyGUID();
cloudData->clientGUID=specificSystem;
cloudDataList->keyData.Insert(specificSystem,cloudData,true,_FILE_AND_LINE_);
}
else
{
cloudData = cloudDataList->keyData[keyDataListIndex];
}
++cloudDataList->subscriberCount;
cloudData->specificSubscribers.Insert(packet->guid, packet->guid, true, _FILE_AND_LINE_);
}
}
else
{
++cloudDataList->subscriberCount;
cloudDataList->nonSpecificSubscribers.Insert(packet->guid, packet->guid, true, _FILE_AND_LINE_);
// Remove packet->guid from CloudData::specificSubscribers among all instances of cloudDataList->keyData
unsigned int subscribedKeysIndex;
bool subscribedKeysIndexExists;
subscribedKeysIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudDataList->key, &subscribedKeysIndexExists);
if (subscribedKeysIndexExists)
{
KeySubscriberID* keySubscriberId;
keySubscriberId = remoteCloudClient->subscribedKeys[subscribedKeysIndex];
unsigned int specificSystemIndex;
for (specificSystemIndex=0; specificSystemIndex < keySubscriberId->specificSystemsSubscribedTo.Size(); specificSystemIndex++)
{
bool keyDataExists;
unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(keySubscriberId->specificSystemsSubscribedTo[specificSystemIndex], &keyDataExists);
if (keyDataExists)
{
CloudData *keyData = cloudDataList->keyData[keyDataIndex];
keyData->specificSubscribers.Remove(packet->guid);
--cloudDataList->subscriberCount;
}
}
}
}
}
if (remoteCloudClient->subscribedKeys.Size()==0)
{
// Didn't do anything
remoteSystems.Remove(packet->guid, _FILE_AND_LINE_);
RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
}
}
if (remoteServersWithData.Size()==0)
RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
}
void CloudServer::OnUnsubscribeRequest(Packet *packet)
{
RakNet::BitStream bsIn(packet->data, packet->length, false);
bsIn.IgnoreBytes(sizeof(MessageID));
DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(packet->guid);
if (remoteSystemIndex.IsInvalid()==true)
return;
RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex);
uint16_t keyCount, specificSystemCount;
DataStructures::List<CloudKey> cloudKeys;
DataStructures::List<RakNetGUID> specificSystems;
uint16_t index;
CloudKey cloudKey;
bsIn.Read(keyCount);
for (index=0; index < keyCount; index++)
{
cloudKey.Serialize(false, &bsIn);
cloudKeys.Push(cloudKey, _FILE_AND_LINE_);
}
RakNetGUID specificSystem;
bsIn.Read(specificSystemCount);
for (index=0; index < specificSystemCount; index++)
{
bsIn.Read(specificSystem);
specificSystems.Push(specificSystem, _FILE_AND_LINE_);
}
for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
{
if (queryFilters[filterIndex]->OnUnsubscribeRequest(packet->guid, packet->systemAddress, cloudKeys, specificSystems )==false)
return;
}
// CloudDataList *cloudDataList;
bool dataRepositoryExists;
// unsigned int dataRepositoryIndex;
for (index=0; index < keyCount; index++)
{
CloudKey cloudKey = cloudKeys[index];
// dataRepositoryIndex =
dataRepository.GetIndexFromKey(cloudKey, &dataRepositoryExists);
if (dataRepositoryExists==false)
continue;
// cloudDataList = dataRepository[dataRepositoryIndex];
unsigned int keySubscriberIndex;
bool hasKeySubscriber;
keySubscriberIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudKey, &hasKeySubscriber);
if (hasKeySubscriber==false)
continue;
UnsubscribeFromKey(remoteCloudClient, packet->guid, keySubscriberIndex, cloudKey, specificSystems);
}
if (remoteCloudClient->IsUnused())
{
RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_);
}
}
void CloudServer::OnServerToServerGetRequest(Packet *packet)
{
// unsigned int remoteServerIndex;
bool objectExists;
//remoteServerIndex =
remoteServers.GetIndexFromKey(packet->guid, &objectExists);
if (objectExists==false)
return;
RakNet::BitStream bsIn(packet->data, packet->length, false);
bsIn.IgnoreBytes(sizeof(MessageID)*2);
CloudQueryWithAddresses cloudQueryWithAddresses;
uint32_t requestId;
cloudQueryWithAddresses.Serialize(false, &bsIn);
bsIn.Read(requestId);
DataStructures::List<CloudData*> cloudDataResultList;
DataStructures::List<CloudKey> cloudKeyResultList;
ProcessCloudQueryWithAddresses(cloudQueryWithAddresses, cloudDataResultList, cloudKeyResultList);
RakNet::BitStream bsOut;
bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
bsOut.Write((MessageID)STSC_PROCESS_GET_RESPONSE);
bsOut.Write(requestId);
WriteCloudQueryRowFromResultList(cloudDataResultList, cloudKeyResultList, &bsOut);
SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, packet->guid, false);
}
void CloudServer::OnServerToServerGetResponse(Packet *packet)
{
unsigned int remoteServerIndex;
bool objectExists;
remoteServerIndex = remoteServers.GetIndexFromKey(packet->guid, &objectExists);
if (objectExists==false)
return;
RemoteServer *remoteServer = remoteServers[remoteServerIndex];
if (remoteServer==0)
return;
RakNet::BitStream bsIn(packet->data, packet->length, false);
bsIn.IgnoreBytes(sizeof(MessageID)*2);
uint32_t requestId;
bsIn.Read(requestId);
// Lookup request id
bool hasGetRequest;
unsigned int getRequestIndex;
getRequestIndex = getRequests.GetIndexFromKey(requestId, &hasGetRequest);
if (hasGetRequest==false)
return;
GetRequest *getRequest = getRequests[getRequestIndex];
bool hasRemoteServer;
unsigned int remoteServerResponsesIndex;
remoteServerResponsesIndex = getRequest->remoteServerResponses.GetIndexFromKey(packet->guid, &hasRemoteServer);
if (hasRemoteServer==false)
return;
BufferedGetResponseFromServer *bufferedGetResponseFromServer;
bufferedGetResponseFromServer = getRequest->remoteServerResponses[remoteServerResponsesIndex];
if (bufferedGetResponseFromServer->gotResult==true)
return;
bufferedGetResponseFromServer->gotResult=true;
uint32_t numRows;
bufferedGetResponseFromServer->queryResult.SerializeNumRows(false, numRows, &bsIn);
bufferedGetResponseFromServer->queryResult.SerializeCloudQueryRows(false, numRows, &bsIn, this);
// If all results returned, then also process locally, and return to user
if (getRequest->AllRemoteServersHaveResponded())
{
ProcessAndTransmitGetRequest(getRequest);
getRequest->Clear(this);
RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
getRequests.RemoveAtIndex(getRequestIndex);
}
}
void CloudServer::OnClosedConnection(const SystemAddress &systemAddress, RakNetGUID rakNetGUID, PI2_LostConnectionReason lostConnectionReason )
{
(void) lostConnectionReason;
(void) systemAddress;
unsigned int remoteServerIndex;
bool objectExists;
remoteServerIndex = remoteServers.GetIndexFromKey(rakNetGUID, &objectExists);
if (objectExists)
{
// Update remoteServerResponses by removing this server and sending the response if it is now complete
unsigned int getRequestIndex=0;
while (getRequestIndex < getRequests.Size())
{
GetRequest *getRequest = getRequests[getRequestIndex];
bool waitingForThisServer;
unsigned int remoteServerResponsesIndex = getRequest->remoteServerResponses.GetIndexFromKey(rakNetGUID, &waitingForThisServer);
if (waitingForThisServer)
{
getRequest->remoteServerResponses[remoteServerResponsesIndex]->Clear(this);
RakNet::OP_DELETE(getRequest->remoteServerResponses[remoteServerResponsesIndex], _FILE_AND_LINE_);
getRequest->remoteServerResponses.RemoveAtIndex(remoteServerResponsesIndex);
if (getRequest->AllRemoteServersHaveResponded())
{
ProcessAndTransmitGetRequest(getRequest);
getRequest->Clear(this);
RakNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
getRequests.RemoveAtIndex(getRequestIndex);
}
else
getRequestIndex++;
}
else
getRequestIndex++;
}
RakNet::OP_DELETE(remoteServers[remoteServerIndex],_FILE_AND_LINE_);
remoteServers.RemoveAtIndex(remoteServerIndex);
}
DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(rakNetGUID);
if (remoteSystemIndex.IsInvalid()==false)
{
RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex);
unsigned int uploadedKeysIndex;
for (uploadedKeysIndex=0; uploadedKeysIndex < remoteCloudClient->uploadedKeys.Size(); uploadedKeysIndex++)
{
// Delete keys this system has uploaded
bool keyDataRepositoryExists;
unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(remoteCloudClient->uploadedKeys[uploadedKeysIndex], &keyDataRepositoryExists);
if (keyDataRepositoryExists)
{
CloudDataList* cloudDataList = dataRepository[dataRepositoryIndex];
bool keyDataExists;
unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(rakNetGUID, &keyDataExists);
if (keyDataExists)
{
CloudData *cloudData = cloudDataList->keyData[keyDataIndex];
cloudDataList->uploaderCount--;
NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, false );
NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, false );
NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, false );
cloudData->Clear();
if (cloudData->IsUnused())
{
RakNet::OP_DELETE(cloudData,_FILE_AND_LINE_);
cloudDataList->keyData.RemoveAtIndex(keyDataIndex);
if (cloudDataList->IsNotUploaded())
{
// Tell other servers that this key is no longer uploaded, so they do not request it from us
RemoveUploadedKeyFromServers(cloudDataList->key);
}
if (cloudDataList->IsUnused())
{
// Tell other servers that this key is no longer uploaded, so they do not request it from us
RemoveUploadedKeyFromServers(cloudDataList->key);
RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
dataRepository.RemoveAtIndex(dataRepositoryIndex);
}
}
}
}
}
unsigned int subscribedKeysIndex;
for (subscribedKeysIndex=0; subscribedKeysIndex < remoteCloudClient->subscribedKeys.Size(); subscribedKeysIndex++)
{
KeySubscriberID* keySubscriberId;
keySubscriberId = remoteCloudClient->subscribedKeys[subscribedKeysIndex];
bool keyDataRepositoryExists;
unsigned int keyDataRepositoryIndex = dataRepository.GetIndexFromKey(remoteCloudClient->subscribedKeys[subscribedKeysIndex]->key, &keyDataRepositoryExists);
if (keyDataRepositoryExists)
{
CloudDataList* cloudDataList = dataRepository[keyDataRepositoryIndex];
if (keySubscriberId->specificSystemsSubscribedTo.Size()==0)
{
cloudDataList->nonSpecificSubscribers.Remove(rakNetGUID);
--cloudDataList->subscriberCount;
}
else
{
unsigned int specificSystemIndex;
for (specificSystemIndex=0; specificSystemIndex < keySubscriberId->specificSystemsSubscribedTo.Size(); specificSystemIndex++)
{
bool keyDataExists;
unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(keySubscriberId->specificSystemsSubscribedTo[specificSystemIndex], &keyDataExists);
if (keyDataExists)
{
CloudData *keyData = cloudDataList->keyData[keyDataIndex];
keyData->specificSubscribers.Remove(rakNetGUID);
--cloudDataList->subscriberCount;
}
}
}
}
RakNet::OP_DELETE(keySubscriberId, _FILE_AND_LINE_);
}
// Delete and remove from remoteSystems
RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_);
}
}
void CloudServer::OnRakPeerShutdown(void)
{
Clear();
}
void CloudServer::Clear(void)
{
unsigned int i,j;
for (i=0; i < dataRepository.Size(); i++)
{
CloudDataList *cloudDataList = dataRepository[i];
for (j=0; j < cloudDataList->keyData.Size(); j++)
{
cloudDataList->keyData[j]->Clear();
RakNet::OP_DELETE(cloudDataList->keyData[j], _FILE_AND_LINE_);
}
RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
}
dataRepository.Clear(false, _FILE_AND_LINE_);
for (i=0; i < remoteServers.Size(); i++)
{
RakNet::OP_DELETE(remoteServers[i], _FILE_AND_LINE_);
}
remoteServers.Clear(false, _FILE_AND_LINE_);
for (i=0; i < getRequests.Size(); i++)
{
GetRequest *getRequest = getRequests[i];
getRequest->Clear(this);
RakNet::OP_DELETE(getRequests[i], _FILE_AND_LINE_);
}
getRequests.Clear(false, _FILE_AND_LINE_);
DataStructures::List<RakNetGUID> keyList;
DataStructures::List<RemoteCloudClient*> itemList;
remoteSystems.GetAsList(itemList, keyList, _FILE_AND_LINE_);
for (i=0; i < itemList.Size(); i++)
{
RemoteCloudClient* remoteCloudClient = itemList[i];
for (j=0; j < remoteCloudClient->subscribedKeys.Size(); j++)
{
RakNet::OP_DELETE(remoteCloudClient->subscribedKeys[j], _FILE_AND_LINE_);
}
RakNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
}
remoteSystems.Clear(_FILE_AND_LINE_);
}
void CloudServer::WriteCloudQueryRowFromResultList(DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut)
{
bsOut->WriteCasted<uint32_t>(cloudKeyResultList.Size());
unsigned int i;
for (i=0; i < cloudKeyResultList.Size(); i++)
{
WriteCloudQueryRowFromResultList(i, cloudDataResultList, cloudKeyResultList, bsOut);
}
}
void CloudServer::WriteCloudQueryRowFromResultList(unsigned int i, DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut)
{
CloudQueryRow cloudQueryRow;
CloudData *cloudData = cloudDataResultList[i];
cloudQueryRow.key=cloudKeyResultList[i];
cloudQueryRow.data=cloudData->dataPtr;
cloudQueryRow.length=cloudData->dataLengthBytes;
cloudQueryRow.serverSystemAddress=cloudData->serverSystemAddress;
cloudQueryRow.clientSystemAddress=cloudData->clientSystemAddress;
cloudQueryRow.serverGUID=cloudData->serverGUID;
cloudQueryRow.clientGUID=cloudData->clientGUID;
cloudQueryRow.Serialize(true, bsOut, 0);
}
void CloudServer::NotifyClientSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated )
{
RakNet::BitStream bsOut;
bsOut.Write((MessageID) ID_CLOUD_SUBSCRIPTION_NOTIFICATION);
bsOut.Write(wasUpdated);
CloudQueryRow row;
row.key=key;
row.data=cloudData->dataPtr;
row.length=cloudData->dataLengthBytes;
row.serverSystemAddress=cloudData->serverSystemAddress;
row.clientSystemAddress=cloudData->clientSystemAddress;
row.serverGUID=cloudData->serverGUID;
row.clientGUID=cloudData->clientGUID;
row.Serialize(true,&bsOut,0);
unsigned int i;
for (i=0; i < subscribers.Size(); i++)
{
SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, subscribers[i], false);
}
}
void CloudServer::NotifyClientSubscribersOfDataChange( CloudQueryRow *row, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated )
{
RakNet::BitStream bsOut;
bsOut.Write((MessageID) ID_CLOUD_SUBSCRIPTION_NOTIFICATION);
bsOut.Write(wasUpdated);
row->Serialize(true,&bsOut,0);
unsigned int i;
for (i=0; i < subscribers.Size(); i++)
{
SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, subscribers[i], false);
}
}
void CloudServer::NotifyServerSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, bool wasUpdated )
{
// Find every server that has subscribed
// Send them change notifications
RakNet::BitStream bsOut;
bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
bsOut.Write((MessageID)STSC_DATA_CHANGED);
bsOut.Write(wasUpdated);
CloudQueryRow row;
row.key=key;
row.data=cloudData->dataPtr;
row.length=cloudData->dataLengthBytes;
row.serverSystemAddress=cloudData->serverSystemAddress;
row.clientSystemAddress=cloudData->clientSystemAddress;
row.serverGUID=cloudData->serverGUID;
row.clientGUID=cloudData->clientGUID;
row.Serialize(true,&bsOut,0);
unsigned int i;
for (i=0; i < remoteServers.Size(); i++)
{
if (remoteServers[i]->gotSubscribedAndUploadedKeys==false || remoteServers[i]->subscribedKeys.HasData(key))
{
SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
}
}
}
void CloudServer::AddServer(RakNetGUID systemIdentifier)
{
ConnectionState cs = rakPeerInterface->GetConnectionState(systemIdentifier);
if (cs==IS_DISCONNECTED || cs==IS_NOT_CONNECTED)
return;
bool objectExists;
unsigned int index = remoteServers.GetIndexFromKey(systemIdentifier,&objectExists);
if (objectExists==false)
{
RemoteServer *remoteServer = RakNet::OP_NEW<RemoteServer>(_FILE_AND_LINE_);
remoteServer->gotSubscribedAndUploadedKeys=false;
remoteServer->serverAddress=systemIdentifier;
remoteServers.InsertAtIndex(remoteServer, index, _FILE_AND_LINE_);
SendUploadedAndSubscribedKeysToServer(systemIdentifier);
}
}
void CloudServer::RemoveServer(RakNetGUID systemAddress)
{
bool objectExists;
unsigned int index = remoteServers.GetIndexFromKey(systemAddress,&objectExists);
if (objectExists==true)
{
RakNet::OP_DELETE(remoteServers[index],_FILE_AND_LINE_);
remoteServers.RemoveAtIndex(index);
}
}
void CloudServer::GetRemoteServers(DataStructures::List<RakNetGUID> &remoteServersOut)
{
remoteServersOut.Clear(true, _FILE_AND_LINE_);
unsigned int i;
for (i=0; i < remoteServers.Size(); i++)
{
remoteServersOut.Push(remoteServers[i]->serverAddress, _FILE_AND_LINE_);
}
}
void CloudServer::ProcessAndTransmitGetRequest(GetRequest *getRequest)
{
RakNet::BitStream bsOut;
bsOut.Write((MessageID) ID_CLOUD_GET_RESPONSE);
// BufferedGetResponseFromServer getResponse;
CloudQueryResult cloudQueryResult;
cloudQueryResult.cloudQuery=getRequest->cloudQueryWithAddresses.cloudQuery;
cloudQueryResult.subscribeToResults=getRequest->cloudQueryWithAddresses.cloudQuery.subscribeToResults;
cloudQueryResult.SerializeHeader(true, &bsOut);
DataStructures::List<CloudData*> cloudDataResultList;
DataStructures::List<CloudKey> cloudKeyResultList;
ProcessCloudQueryWithAddresses(getRequest->cloudQueryWithAddresses, cloudDataResultList, cloudKeyResultList);
bool unlimitedRows=getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn==0;
uint32_t localNumRows = (uint32_t) cloudDataResultList.Size();
if (unlimitedRows==false &&
localNumRows > getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex &&
localNumRows - getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex > getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn )
localNumRows=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex + getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn;
BitSize_t bitStreamOffset = bsOut.GetWriteOffset();
uint32_t localRowsToWrite;
unsigned int skipRows;
if (localNumRows>getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex)
{
localRowsToWrite=localNumRows-getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex;
skipRows=0;
}
else
{
localRowsToWrite=0;
skipRows=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex-localNumRows;
}
cloudQueryResult.SerializeNumRows(true, localRowsToWrite, &bsOut);
for (unsigned int i=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex; i < localNumRows; i++)
{
WriteCloudQueryRowFromResultList(i, cloudDataResultList, cloudKeyResultList, &bsOut);
}
// Append remote systems for remaining rows
if (unlimitedRows==true || getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn>localRowsToWrite)
{
uint32_t remainingRows=0;
uint32_t additionalRowsWritten=0;
if (unlimitedRows==false)
remainingRows=getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn-localRowsToWrite;
unsigned int remoteServerResponseIndex;
for (remoteServerResponseIndex=0; remoteServerResponseIndex < getRequest->remoteServerResponses.Size(); remoteServerResponseIndex++)
{
BufferedGetResponseFromServer *bufferedGetResponseFromServer = getRequest->remoteServerResponses[remoteServerResponseIndex];
unsigned int cloudQueryRowIndex;
for (cloudQueryRowIndex=0; cloudQueryRowIndex < bufferedGetResponseFromServer->queryResult.rowsReturned.Size(); cloudQueryRowIndex++)
{
if (skipRows>0)
{
--skipRows;
continue;
}
bufferedGetResponseFromServer->queryResult.rowsReturned[cloudQueryRowIndex]->Serialize(true, &bsOut, this);
++additionalRowsWritten;
if (unlimitedRows==false && --remainingRows==0)
break;
}
if (unlimitedRows==false && remainingRows==0)
break;
}
if (additionalRowsWritten>0)
{
BitSize_t curOffset = bsOut.GetWriteOffset();
bsOut.SetWriteOffset(bitStreamOffset);
localRowsToWrite+=additionalRowsWritten;
cloudQueryResult.SerializeNumRows(true, localRowsToWrite, &bsOut);
bsOut.SetWriteOffset(curOffset);
}
}
SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, getRequest->requestingClient, false);
}
void CloudServer::ProcessCloudQueryWithAddresses( CloudServer::CloudQueryWithAddresses &cloudQueryWithAddresses, DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList )
{
CloudQueryResult cloudQueryResult;
CloudQueryRow cloudQueryRow;
unsigned int queryIndex;
bool dataRepositoryExists;
CloudDataList* cloudDataList;
unsigned int keyDataIndex;
// If specificSystems list empty, applies to all systems
// For each of keys in cloudQueryWithAddresses, return that data, limited by maxRowsToReturn
for (queryIndex=0; queryIndex < cloudQueryWithAddresses.cloudQuery.keys.Size(); queryIndex++)
{
const CloudKey &key = cloudQueryWithAddresses.cloudQuery.keys[queryIndex];
unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(key, &dataRepositoryExists);
if (dataRepositoryExists)
{
cloudDataList=dataRepository[dataRepositoryIndex];
if (cloudDataList->uploaderCount>0)
{
// Return all keyData that was uploaded by specificSystems, or all if not specified
if (cloudQueryWithAddresses.specificSystems.Size()>0)
{
// Return data for matching systems
unsigned int specificSystemIndex;
for (specificSystemIndex=0; specificSystemIndex < cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++)
{
bool uploaderExists;
keyDataIndex = cloudDataList->keyData.GetIndexFromKey(cloudQueryWithAddresses.specificSystems[specificSystemIndex], &uploaderExists);
if (uploaderExists)
{
cloudDataResultList.Push(cloudDataList->keyData[keyDataIndex], _FILE_AND_LINE_);
cloudKeyResultList.Push(key, _FILE_AND_LINE_);
}
}
}
else
{
// Return data for all systems
for (keyDataIndex=0; keyDataIndex < cloudDataList->keyData.Size(); keyDataIndex++)
{
cloudDataResultList.Push(cloudDataList->keyData[keyDataIndex], _FILE_AND_LINE_);
cloudKeyResultList.Push(key, _FILE_AND_LINE_);
}
}
}
}
}
}
void CloudServer::SendUploadedAndSubscribedKeysToServer( RakNetGUID systemAddress )
{
RakNet::BitStream bsOut;
bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
bsOut.Write((MessageID)STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS);
bsOut.WriteCasted<uint16_t>(dataRepository.Size());
for (unsigned int i=0; i < dataRepository.Size(); i++)
dataRepository[i]->key.Serialize(true, &bsOut);
BitSize_t startOffset, endOffset;
uint16_t subscribedKeyCount=0;
startOffset=bsOut.GetWriteOffset();
bsOut.WriteCasted<uint16_t>(subscribedKeyCount);
for (unsigned int i=0; i < dataRepository.Size(); i++)
{
if (dataRepository[i]->subscriberCount>0)
{
dataRepository[i]->key.Serialize(true, &bsOut);
subscribedKeyCount++;
}
}
endOffset=bsOut.GetWriteOffset();
bsOut.SetWriteOffset(startOffset);
bsOut.WriteCasted<uint16_t>(subscribedKeyCount);
bsOut.SetWriteOffset(endOffset);
if (dataRepository.Size()>0 || subscribedKeyCount>0)
SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, systemAddress, false);
}
void CloudServer::SendUploadedKeyToServers( CloudKey &cloudKey )
{
RakNet::BitStream bsOut;
bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
bsOut.Write((MessageID)STSC_ADD_UPLOADED_KEY);
cloudKey.Serialize(true, &bsOut);
for (unsigned int i=0; i < remoteServers.Size(); i++)
SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
}
void CloudServer::SendSubscribedKeyToServers( CloudKey &cloudKey )
{
RakNet::BitStream bsOut;
bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
bsOut.Write((MessageID)STSC_ADD_SUBSCRIBED_KEY);
cloudKey.Serialize(true, &bsOut);
for (unsigned int i=0; i < remoteServers.Size(); i++)
SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
}
void CloudServer::RemoveUploadedKeyFromServers( CloudKey &cloudKey )
{
RakNet::BitStream bsOut;
bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
bsOut.Write((MessageID)STSC_REMOVE_UPLOADED_KEY);
cloudKey.Serialize(true, &bsOut);
for (unsigned int i=0; i < remoteServers.Size(); i++)
SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
}
void CloudServer::RemoveSubscribedKeyFromServers( CloudKey &cloudKey )
{
RakNet::BitStream bsOut;
bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
bsOut.Write((MessageID)STSC_REMOVE_SUBSCRIBED_KEY);
cloudKey.Serialize(true, &bsOut);
for (unsigned int i=0; i < remoteServers.Size(); i++)
SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
}
void CloudServer::OnSendUploadedAndSubscribedKeysToServer( Packet *packet )
{
RakNet::BitStream bsIn(packet->data, packet->length, false);
bsIn.IgnoreBytes(sizeof(MessageID)*2);
bool objectExists;
unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
if (objectExists==false)
return;
RemoteServer *remoteServer = remoteServers[index];
remoteServer->gotSubscribedAndUploadedKeys=true;
// unsigned int insertionIndex;
bool alreadyHasKey;
uint16_t numUploadedKeys, numSubscribedKeys;
bsIn.Read(numUploadedKeys);
for (uint16_t i=0; i < numUploadedKeys; i++)
{
CloudKey cloudKey;
cloudKey.Serialize(false, &bsIn);
// insertionIndex =
remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
if (alreadyHasKey==false)
remoteServer->uploadedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
}
bsIn.Read(numSubscribedKeys);
for (uint16_t i=0; i < numSubscribedKeys; i++)
{
CloudKey cloudKey;
cloudKey.Serialize(false, &bsIn);
//insertionIndex =
remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
if (alreadyHasKey==false)
remoteServer->subscribedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
}
// Potential todo - join servers
// For each uploaded key that we subscribe to, query it
// For each subscribed key that we have, send it
}
void CloudServer::OnSendUploadedKeyToServers( Packet *packet )
{
RakNet::BitStream bsIn(packet->data, packet->length, false);
bsIn.IgnoreBytes(sizeof(MessageID)*2);
bool objectExists;
unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
if (objectExists==false)
return;
RemoteServer *remoteServer = remoteServers[index];
CloudKey cloudKey;
cloudKey.Serialize(false, &bsIn);
// unsigned int insertionIndex;
bool alreadyHasKey;
// insertionIndex =
remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
if (alreadyHasKey==false)
remoteServer->uploadedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
}
void CloudServer::OnSendSubscribedKeyToServers( Packet *packet )
{
RakNet::BitStream bsIn(packet->data, packet->length, false);
bsIn.IgnoreBytes(sizeof(MessageID)*2);
bool objectExists;
unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
if (objectExists==false)
return;
RemoteServer *remoteServer = remoteServers[index];
CloudKey cloudKey;
cloudKey.Serialize(false, &bsIn);
// unsigned int insertionIndex;
bool alreadyHasKey;
// insertionIndex =
remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
// Do not need to send current values, the Get request will do that as the Get request is sent at the same time
if (alreadyHasKey==false)
remoteServer->subscribedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
}
void CloudServer::OnRemoveUploadedKeyFromServers( Packet *packet )
{
RakNet::BitStream bsIn(packet->data, packet->length, false);
bsIn.IgnoreBytes(sizeof(MessageID)*2);
bool objectExists;
unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
if (objectExists==false)
return;
RemoteServer *remoteServer = remoteServers[index];
CloudKey cloudKey;
cloudKey.Serialize(false, &bsIn);
unsigned int insertionIndex;
bool alreadyHasKey;
insertionIndex = remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
if (alreadyHasKey==true)
remoteServer->uploadedKeys.RemoveAtIndex(insertionIndex);
}
void CloudServer::OnRemoveSubscribedKeyFromServers( Packet *packet )
{
RakNet::BitStream bsIn(packet->data, packet->length, false);
bsIn.IgnoreBytes(sizeof(MessageID)*2);
bool objectExists;
unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
if (objectExists==false)
return;
RemoteServer *remoteServer = remoteServers[index];
CloudKey cloudKey;
cloudKey.Serialize(false, &bsIn);
unsigned int insertionIndex;
bool alreadyHasKey;
insertionIndex = remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
if (alreadyHasKey==true)
remoteServer->subscribedKeys.RemoveAtIndex(insertionIndex);
}
void CloudServer::OnServerDataChanged( Packet *packet )
{
RakNet::BitStream bsIn(packet->data, packet->length, false);
bsIn.IgnoreBytes(sizeof(MessageID)*2);
bool objectExists;
remoteServers.GetIndexFromKey(packet->guid,&objectExists);
if (objectExists==false)
return;
// Find everyone that cares about this change and relay
bool wasUpdated=false;
bsIn.Read(wasUpdated);
CloudQueryRow row;
row.Serialize(false, &bsIn, this);
CloudDataList *cloudDataList;
bool dataRepositoryExists;
unsigned int dataRepositoryIndex;
dataRepositoryIndex = dataRepository.GetIndexFromKey(row.key, &dataRepositoryExists);
if (dataRepositoryExists==false)
{
DeallocateRowData(row.data);
return;
}
cloudDataList = dataRepository[dataRepositoryIndex];
CloudData *cloudData;
bool keyDataListExists;
unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(row.clientGUID, &keyDataListExists);
if (keyDataListExists==true)
{
cloudData = cloudDataList->keyData[keyDataListIndex];
NotifyClientSubscribersOfDataChange(&row, cloudData->specificSubscribers, wasUpdated );
}
NotifyClientSubscribersOfDataChange(&row, cloudDataList->nonSpecificSubscribers, wasUpdated );
DeallocateRowData(row.data);
}
void CloudServer::GetServersWithUploadedKeys(
DataStructures::List<CloudKey> &keys,
DataStructures::List<CloudServer::RemoteServer*> &remoteServersWithData
)
{
remoteServersWithData.Clear(true, _FILE_AND_LINE_);
unsigned int i,j;
for (i=0; i < remoteServers.Size(); i++)
{
remoteServers[i]->workingFlag=false;
}
for (i=0; i < remoteServers.Size(); i++)
{
if (remoteServers[i]->workingFlag==false)
{
if (remoteServers[i]->gotSubscribedAndUploadedKeys==false)
{
remoteServers[i]->workingFlag=true;
remoteServersWithData.Push(remoteServers[i], _FILE_AND_LINE_);
}
else
{
remoteServers[i]->workingFlag=false;
for (j=0; j < keys.Size(); j++)
{
if (remoteServers[i]->workingFlag==false && remoteServers[i]->uploadedKeys.HasData(keys[j]))
{
remoteServers[i]->workingFlag=true;
remoteServersWithData.Push(remoteServers[i], _FILE_AND_LINE_);
break;
}
}
}
}
}
}
CloudServer::CloudDataList *CloudServer::GetOrAllocateCloudDataList(CloudKey key, bool *dataRepositoryExists, unsigned int &dataRepositoryIndex)
{
CloudDataList *cloudDataList;
dataRepositoryIndex = dataRepository.GetIndexFromKey(key, dataRepositoryExists);
if (*dataRepositoryExists==false)
{
cloudDataList = RakNet::OP_NEW<CloudDataList>(_FILE_AND_LINE_);
cloudDataList->key=key;
cloudDataList->uploaderCount=0;
cloudDataList->subscriberCount=0;
dataRepository.InsertAtIndex(cloudDataList,dataRepositoryIndex,_FILE_AND_LINE_);
}
else
{
cloudDataList = dataRepository[dataRepositoryIndex];
}
return cloudDataList;
}
void CloudServer::UnsubscribeFromKey(RemoteCloudClient *remoteCloudClient, RakNetGUID remoteCloudClientGuid, unsigned int keySubscriberIndex, CloudKey &cloudKey, DataStructures::List<RakNetGUID> &specificSystems)
{
KeySubscriberID* keySubscriberId = remoteCloudClient->subscribedKeys[keySubscriberIndex];
// If removing specific systems, but global subscription, fail
if (keySubscriberId->specificSystemsSubscribedTo.Size()==0 && specificSystems.Size()>0)
return;
bool dataRepositoryExists;
CloudDataList *cloudDataList;
unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(cloudKey, &dataRepositoryExists);
if (dataRepositoryExists==false)
return;
unsigned int i,j;
cloudDataList = dataRepository[dataRepositoryIndex];
if (specificSystems.Size()==0)
{
// Remove global subscriber. If returns false, have to remove specific subscribers
if (cloudDataList->RemoveSubscriber(remoteCloudClientGuid)==false)
{
for (i=0; i < keySubscriberId->specificSystemsSubscribedTo.Size(); i++)
{
RemoveSpecificSubscriber(keySubscriberId->specificSystemsSubscribedTo[i], cloudDataList, remoteCloudClientGuid);
}
}
keySubscriberId->specificSystemsSubscribedTo.Clear(true, _FILE_AND_LINE_);
}
else
{
for (j=0; j < specificSystems.Size(); j++)
{
unsigned int specificSystemsSubscribedToIndex;
bool hasSpecificSystemsSubscribedTo;
specificSystemsSubscribedToIndex=keySubscriberId->specificSystemsSubscribedTo.GetIndexFromKey(specificSystems[j], &hasSpecificSystemsSubscribedTo);
if (hasSpecificSystemsSubscribedTo)
{
RemoveSpecificSubscriber(specificSystems[j], cloudDataList, remoteCloudClientGuid);
keySubscriberId->specificSystemsSubscribedTo.RemoveAtIndex(specificSystemsSubscribedToIndex);
}
}
}
if (keySubscriberId->specificSystemsSubscribedTo.Size()==0)
{
RakNet::OP_DELETE(keySubscriberId, _FILE_AND_LINE_);
remoteCloudClient->subscribedKeys.RemoveAtIndex(keySubscriberIndex);
}
if (cloudDataList->subscriberCount==0)
RemoveSubscribedKeyFromServers(cloudKey);
if (cloudDataList->IsUnused())
{
RakNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
dataRepository.RemoveAtIndex(dataRepositoryIndex);
}
}
void CloudServer::RemoveSpecificSubscriber(RakNetGUID specificSubscriber, CloudDataList *cloudDataList, RakNetGUID remoteCloudClientGuid)
{
bool keyDataListExists;
unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(specificSubscriber, &keyDataListExists);
if (keyDataListExists==false)
return;
CloudData *cloudData = cloudDataList->keyData[keyDataListIndex];
bool hasSpecificSubscriber;
unsigned int specificSubscriberIndex = cloudData->specificSubscribers.GetIndexFromKey(remoteCloudClientGuid, &hasSpecificSubscriber);
if (hasSpecificSubscriber)
{
cloudData->specificSubscribers.RemoveAtIndex(specificSubscriberIndex);
cloudDataList->subscriberCount--;
if (cloudData->IsUnused())
{
RakNet::OP_DELETE(cloudData, _FILE_AND_LINE_);
cloudDataList->keyData.RemoveAtIndex(keyDataListIndex);
}
}
}
void CloudServer::ForceExternalSystemAddress(SystemAddress forcedAddress)
{
forceAddress=forcedAddress;
}
void CloudServer::AddQueryFilter(CloudServerQueryFilter* filter)
{
if (queryFilters.GetIndexOf(filter)!=(unsigned int) -1)
return;
queryFilters.Push(filter, _FILE_AND_LINE_);
}
void CloudServer::RemoveQueryFilter(CloudServerQueryFilter* filter)
{
unsigned int index;
index = queryFilters.GetIndexOf(filter);
if (index != (unsigned int) -1)
queryFilters.RemoveAtIndex(index);
}
void CloudServer::RemoveAllQueryFilters(void)
{
queryFilters.Clear(true, _FILE_AND_LINE_);
}
#endif