Brief description of class still missing. More...
#include <LinkBuffer.h>
Public Member Functions | |
| virtual int | bytesAvailable (char *buffer, int byteCount) |
| LinkBuffer (const Address &peer, int fd) | |
| void | startMeasure (RouteTcpHeader::BlockType type) |
| void | startTest () |
| ~LinkBuffer () | |
Brief description of class still missing.
Full description of class still missing
Block types:
0 Negociation, which side is going to calculate test results 1 Reset ballast factor to default and start test 2 Keep ballast factor as is and start test 3 Increase ballast factor and start test 4 First test block send by peer 5 Second test block send to peer 6 Raw results send by peer 7 Link info send to peer 8 Share pseudo time to master
| LinkBuffer::LinkBuffer | ( | const Address & | peer, |
| int | fd | ||
| ) |
Description of constructor still missing
: DynamicBuffer(fd), _testTimer(this), _measureTimer(this) { _peer=peer; }
References GpCoreTools::Address::toString().
{
Log::write(1, "[%s]: close link\n", _peer.toString().data());
}
| int LinkBuffer::bytesAvailable | ( | char * | buffer, |
| int | byteCount | ||
| ) | [virtual] |
Implements GpCoreTools::DynamicBuffer.
References PeerTracker::addFromMasterRoute(), Flowmeter::addPacket(), Flowmeter::ballastFactor(), RouteTcpHeader::ForwardAddRouteFromMaster, RouteTcpHeader::ForwardRemoveRouteFromMaster, RouteTcpHeader::InfoToLeader, RouteTcpHeader::InfoToSecond, PeerTracker::instance(), WaranCore::TcpHeader::isCompatible(), WaranCore::TcpHeader::isValid(), PeerTracker::masterTime(), RouteTcpHeader::MeasureIncreaseBallast, RouteTcpHeader::MeasureKeepBallast, Flowmeter::measurement(), RouteTcpHeader::MeasureResetBallast, RouteTcpHeader::MeasureResultToLeader, PeerTracker::removeFromMasterRoute(), GpCoreTools::Average::reset(), ROUTE_TCP_VERSION, RouteTcpHeader::SendLoadToLeader, RouteTcpHeader::SendLoadToSecond, GpCoreTools::DynamicBuffer::sendNoGateway(), GpCoreTools::DynamicBuffer::sendPartialNoGateway(), Flowmeter::setBallastFactor(), PeerTracker::setLink(), PeerTracker::setMasterAddress(), Flowmeter::setReceiveByteCount(), GpCoreTools::Timer::start(), Flowmeter::startGlobal(), startMeasure(), Flowmeter::stopGlobal(), LinkInfo::swap(), RouteTcpHeader::TestLeaderNegociation, RouteTcpHeader::TestNow, RouteTcpHeader::time(), LinkInfo::toLog(), GpCoreTools::Address::toString(), and WaranCore::TcpHeader::type().
{
if(byteCount<(int)sizeof(RouteTcpHeader)) {
return 0;
}
// Check block header
const RouteTcpHeader * hb=reinterpret_cast<const RouteTcpHeader *>(buffer);
std::string peerTime=hb->time();
if(!hb->isValid()) {
Log::write(2, "[%s%s]: bad header\n", _peer.toString().data(), peerTime.data());
return 1; // shift by one until matching a good header
}
if(!hb->isCompatible(ROUTE_TCP_VERSION)) {
Log::write(2, "[%s:%05hu]: bad block version\n", _peer.toString().data(), peerTime.data());
return 0;
}
int readBytes=sizeof(RouteTcpHeader);
buffer+=sizeof(RouteTcpHeader);
byteCount-=sizeof(RouteTcpHeader);
switch(static_cast<RouteTcpHeader::BlockType>(hb->type())) {
case RouteTcpHeader::TestLeaderNegociation: // task negociations
if(rand()%2==1) { // Ok I take it
Log::write(2, "[%s%s]: ok thanks!\n", _peer.toString().data(), peerTime.data());
_rateToPeer.reset();
_rateFromPeer.reset();
_latency.reset();
_flowmeter.setBallastFactor(16);
startMeasure(RouteTcpHeader::MeasureResetBallast);
} else {
Log::write(2, "[%s%s]: after you...\n", _peer.toString().data(), peerTime.data());
RouteTcpHeader hb(RouteTcpHeader::TestLeaderNegociation);
sendNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
}
return readBytes;
case RouteTcpHeader::MeasureResetBallast: // second
_flowmeter.setBallastFactor(16);
_flowmeter.startGlobal();
sendTestBlock(RouteTcpHeader::SendLoadToLeader);
return readBytes;
case RouteTcpHeader::MeasureKeepBallast: // leader
_flowmeter.startGlobal();
sendTestBlock(RouteTcpHeader::SendLoadToLeader);
return readBytes;
case RouteTcpHeader::MeasureIncreaseBallast: // second
_flowmeter.setBallastFactor(_flowmeter.ballastFactor()*2);
Log::write(2, "[%s%s]: increase ballast factor to %i\n",
_peer.toString().data(), peerTime.data(), _flowmeter.ballastFactor());
_flowmeter.startGlobal();
sendTestBlock(RouteTcpHeader::SendLoadToLeader);
return readBytes;
case RouteTcpHeader::SendLoadToLeader: { // leader
byteCount-=sizeof(int);
if(byteCount<0) {
return 0;
}
const int& blockSize=*reinterpret_cast<const int *>(buffer);
_flowmeter.setReceiveByteCount(blockSize);
_flowmeter.addPacket(byteCount);
if(byteCount<blockSize) {
return 0;
} else {
sendTestBlock(RouteTcpHeader::SendLoadToSecond);
return readBytes+sizeof(int)+byteCount;
}
}
break;
case RouteTcpHeader::SendLoadToSecond: { // second
byteCount-=sizeof(int);
if(byteCount<0) {
return 0;
}
const int& blockSize=*reinterpret_cast<const int *>(buffer);
_flowmeter.setReceiveByteCount(blockSize);
_flowmeter.addPacket(byteCount);
if(byteCount<blockSize) {
return 0;
} else {
_flowmeter.stopGlobal();
RouteTcpHeader hb(RouteTcpHeader::MeasureResultToLeader);
sendPartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
sendNoGateway((const char *)&_flowmeter.measurement(), sizeof(FlowMeasurement));
return readBytes+sizeof(int)+byteCount;
}
}
break;
case RouteTcpHeader::MeasureResultToLeader: { // leader
if(byteCount<(int)sizeof(FlowMeasurement)) {
return 0;
}
_flowmeter.stopGlobal();
switch(addMeasurement(*reinterpret_cast<const FlowMeasurement *>(buffer))) {
case Accepted:
_measureTimer.start();
break;
case Commit: {
_linkInfo.toLog(_peer);
MasterTime t=PeerTracker::instance()->masterTime(_peer);
RouteTcpHeader hb(RouteTcpHeader::InfoToSecond);
sendPartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
sendPartialNoGateway((const char *)&_linkInfo, sizeof(LinkInfo));
sendPartialNoGateway((const char *)&t, sizeof(MasterTime));
sendNoGateway((const char *)&PeerTracker::instance()->masterAddress(), sizeof(Address));
}
break;
case IncreaseBallast1:
_flowmeter.setBallastFactor(_flowmeter.ballastFactor()*2);
Log::write(2, "[%s%s]: increase ballast factor to %i\n",
_peer.toString().data(), peerTime.data(), _flowmeter.ballastFactor());
startMeasure(RouteTcpHeader::MeasureKeepBallast);
break;
case IncreaseBallast2:
startMeasure(RouteTcpHeader::MeasureIncreaseBallast);
break;
case IncreaseBothBallast:
_flowmeter.setBallastFactor(_flowmeter.ballastFactor()*2);
Log::write(2, "[%s%s]: increase ballast factor to %i\n",
_peer.toString().data(), peerTime.data(), _flowmeter.ballastFactor());
startMeasure(RouteTcpHeader::MeasureIncreaseBallast);
break;
}
return readBytes+sizeof(FlowMeasurement);
}
break;
case RouteTcpHeader::InfoToSecond: {// Link test finished, received updated information from leader
if(byteCount<(int)(sizeof(LinkInfo)+sizeof(MasterTime)+sizeof(Address))) {
return 0;
}
_linkInfo=*reinterpret_cast<const LinkInfo *>(buffer);
_linkInfo.swap();
_linkInfo.toLog(_peer);
const MasterTime& peerMasterTime=*reinterpret_cast<const MasterTime *>(buffer+sizeof(LinkInfo));
const Address& masterAddress=*reinterpret_cast<const Address *>(buffer+sizeof(LinkInfo)+sizeof(MasterTime));
PeerTracker::instance()->setMasterAddress(masterAddress);
PeerTracker::instance()->setLink(_peer, _linkInfo, peerMasterTime);
MasterTime t=PeerTracker::instance()->masterTime(_peer);
RouteTcpHeader hb(RouteTcpHeader::InfoToLeader);
sendPartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
sendPartialNoGateway((const char *)&t, sizeof(MasterTime));
sendNoGateway((const char *)&PeerTracker::instance()->masterAddress(), sizeof(Address));
return readBytes+sizeof(LinkInfo)+sizeof(MasterTime)+sizeof(Address);
}
break;
case RouteTcpHeader::InfoToLeader: { // master
if(byteCount<(int)(sizeof(MasterTime)+sizeof(Address))) {
return 0;
}
const MasterTime& peerMasterTime=*reinterpret_cast<const MasterTime *>(buffer);
const Address& masterAddress=*reinterpret_cast<const Address *>(buffer+sizeof(MasterTime));
PeerTracker::instance()->setMasterAddress(masterAddress);
PeerTracker::instance()->setLink(_peer, _linkInfo, peerMasterTime);
_testTimer.start();
return readBytes+sizeof(MasterTime)+sizeof(Address);
}
break;
case RouteTcpHeader::TestNow:
return readBytes;
case RouteTcpHeader::ForwardAddRouteFromMaster: { // forward add 'FromMaster' route to Master
if(byteCount<(int)sizeof(Address)) {
return 0;
}
const Address& destination=*reinterpret_cast<const Address *>(buffer);
Log::write(0, "[%s%s]: received add route 'FromMaster' to [%s]\n",
_peer.toString().data(), peerTime.data(), destination.toString().data());
PeerTracker::instance()->addFromMasterRoute(destination, _peer);
return readBytes+sizeof(Address);
}
break;
case RouteTcpHeader::ForwardRemoveRouteFromMaster: { // forward remove up 'FromMaster' route to Master
if(byteCount<(int)sizeof(Address)) {
return 0;
}
const Address& destination=*reinterpret_cast<const Address *>(buffer);
Log::write(0, "[%s%s]: received remove up route 'FromMaster' to [%s]\n",
_peer.toString().data(), peerTime.data(), destination.toString().data());
PeerTracker::instance()->removeFromMasterRoute(destination);
return readBytes+sizeof(Address);
}
break;
default:
break;
}
Log::write(2, "[%s%s]: bad block type: %hhu\n", _peer.toString().data(), peerTime.data(), hb->type());
return 0;
}
| void LinkBuffer::startMeasure | ( | RouteTcpHeader::BlockType | type | ) |
References GpCoreTools::DynamicBuffer::sendNoGateway(), and Flowmeter::startGlobal().
Referenced by bytesAvailable(), and LinkMeasureTimer::exec().
{
_flowmeter.startGlobal();
RouteTcpHeader hb(type);
sendNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
}
| void LinkBuffer::startTest | ( | ) |
Called only through test timer
References GpCoreTools::DynamicBuffer::sendNoGateway(), RouteTcpHeader::TestLeaderNegociation, and GpCoreTools::Address::toString().
Referenced by LinkStream::connect(), and LinkTestTimer::exec().
{
Log::write(2, "[%s]: start test\n", _peer.toString().data());
RouteTcpHeader hb(RouteTcpHeader::TestLeaderNegociation);
sendNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
}