Brief description of class still missing. More...
#include <PeerTracker.h>
Public Member Functions | |
| void | addFromMasterRoute (const Address &destination, const Address &gateway) |
| void | checkDeadPeers (double validityDelay) |
| virtual void | event (short type) |
| virtual short | eventTypes () |
| bool | listen (uint16_t udpPort) |
| const Address & | masterAddress () const |
| MasterTime | masterTime (const Address &peer) const |
| PeerTracker (const char *interface, uint16_t tcpPort) | |
| void | removeFromMasterRoute (const Address &destination) |
| void | resetFromMasterGateway () |
| void | resetToMasterGateway () |
| void | setAsMaster () |
| void | setLink (const Address &peer, const LinkInfo &link, const MasterTime &peerMasterTime) |
| void | setMasterAddress (const Address &masterAddress) |
| void | start (const Address &peer, LinkStream *link) |
| void | stop (const Address &peer) |
| ~PeerTracker () | |
Static Public Member Functions | |
| static PeerTracker * | instance () |
Brief description of class still missing.
Full description of class still missing
| PeerTracker::PeerTracker | ( | const char * | interface, |
| uint16_t | tcpPort | ||
| ) |
Description of constructor still missing
References GpCoreTools::Stream::fileDescriptor(), and GpCoreTools::Stream::setFileDescriptor().
: _routes(interface)
{
assert(!_self);
_self=this;
// Used for rate testing
_tcpPort=tcpPort;
setFileDescriptor(socket(AF_INET, SOCK_DGRAM, 0));
if(fileDescriptor()<=0) {
Log::write(0, "peer tracker socket: [%i] %s\n", errno, strerror(errno));
CoreApplication::exit(2);
}
int optval=1;
if(setsockopt(fileDescriptor(),SOL_SOCKET,SO_REUSEADDR,&optval,sizeof(optval)) < 0) {
Log::write(0, "peer tracker reuseaddr: [%i] %s\n", errno, strerror(errno));
CoreApplication::exit(2);
}
Log::write(0, "seeking for peers in %s/%i\n",
Address::me().subnet().toString().data(),
Address::maskSize());
int maximumPeerCount=1 << (32-Address::maskSize());
_peers=new PeerInfo *[maximumPeerCount];
memset(_peers, 0, maximumPeerCount*sizeof(PeerInfo *));
}
{
Log::write(8, "PeerTracker::~PeerTracker::begin\n");
std::list<PeerInfo *>::iterator it;
for(it=_activePeers.begin(); it!=_activePeers.end(); it++) {
delete *it;
}
delete [] _peers;
Log::write(8, "PeerTracker::~PeerTracker::end\n");
}
| void PeerTracker::addFromMasterRoute | ( | const Address & | destination, |
| const Address & | gateway | ||
| ) |
References KernelRoute::add(), RouteTcpHeader::ForwardAddRouteFromMaster, GpCoreTools::Address::index(), GpCoreTools::Address::isValid(), PeerInfo::stream(), GpCoreTools::Address::toString(), GpCoreTools::TcpClientStream::writeNoGateway(), and GpCoreTools::TcpClientStream::writePartialNoGateway().
Referenced by LinkBuffer::bytesAvailable().
{
Log::write(2, "add 'From Master' route with destination %s (gw %s)\n",
destination.toString().data(), gateway.toString().data());
if(Address::me()!=destination) {
_routes.add(destination, gateway);
}
if(Address::me()!=_masterAddress) {
if(_fromMasterGateway.isValid()) {
PeerInfo * p=_peers[_fromMasterGateway.index()];
if(p) {
const LinkStream * s=p->stream();
RouteTcpHeader hb(RouteTcpHeader::ForwardAddRouteFromMaster);
s->writePartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
s->writeNoGateway((const char *)&destination, sizeof(Address));
} else {
Log::write(0, "broken route 'from master' to [%s] (add)\n", destination.toString().data());
}
} else {
Log::write(0, "broken route 'from master' to [%s] (add)\n", destination.toString().data());
}
}
}
| void PeerTracker::checkDeadPeers | ( | double | validityDelay | ) |
References PeerInfo::address(), PeerInfo::lastBeaconDelay(), and stop().
Referenced by DeadPeersTimer::exec().
{
std::list<PeerInfo *>::iterator it;
Address toRemove[_activePeers.size()];
int nToRemove=0;
for(it=_activePeers.begin(); it!=_activePeers.end(); it++) {
PeerInfo * p=*it;
if(p->lastBeaconDelay()>validityDelay) {
toRemove[nToRemove]=p->address();
nToRemove++;
}
}
for(int i=0;i<nToRemove;i++) {
Log::write(1, "[%s]: timeout\n", toRemove[i].toString().data());
stop(toRemove[i]);
}
}
| void PeerTracker::event | ( | short | type | ) | [virtual] |
Implements GpCoreTools::Stream.
References PeerInfo::beaconReceived(), GpCoreTools::Stream::fileDescriptor(), GpCoreTools::Address::index(), and GpCoreTools::Address::toString().
{
if(type & POLLIN) {
sockaddr_in peerAddr;
socklen_t peerAddrLen=sizeof(sockaddr_in);
char buffer[4];
if((recvfrom(fileDescriptor(), buffer, 4, 0, (struct sockaddr *) &peerAddr,&peerAddrLen))<0) {
Log::write(1, "peer tracker recvfrom error: %s\n", strerror(errno));
} else {
if(strncmp(buffer, "WAUB", 4)==0) {
Address peer(peerAddr.sin_addr.s_addr);
if(peer!=Address::me()) {
Leds::flash(2, 100, true);
int index=peer.index();
PeerInfo * p=_peers[index];
if(p) {
p->beaconReceived();
} else if(peer<Address::me()) {
// All indexes above mine will initiate the link themselves, they will discover my own beacon.
// This is simpler to avoid double connections (from both side).
Log::write(1, "discovered %s\n",peer.toString().data());
p=new PeerInfo(peer);
_peers[index]=p;
_activePeers.push_back(p);
p->connect(peer, _tcpPort);
}
}
}
}
}
}
| virtual short PeerTracker::eventTypes | ( | ) | [inline, virtual] |
Implements GpCoreTools::Stream.
{return POLLIN;}
| static PeerTracker* PeerTracker::instance | ( | ) | [inline, static] |
Referenced by LinkBuffer::bytesAvailable(), LinkTimer::exec(), DeadPeersTimer::exec(), LinkStream::hungUp(), and LinkStream::LinkStream().
{return _self;}
| bool PeerTracker::listen | ( | uint16_t | udpPort | ) |
References GpCoreTools::Stream::fileDescriptor().
Referenced by main().
{
sockaddr_in addr;
addr.sin_family=AF_INET;
addr.sin_port=htons(udpPort);
addr.sin_addr.s_addr=htonl(INADDR_ANY);
if(bind(fileDescriptor(), (sockaddr *)&addr, sizeof(sockaddr_in))<0) {
Log::write(0, "peer tracker bind: [%i] %s\n", errno, strerror(errno));
return false;
}
return true;
}
| const Address& PeerTracker::masterAddress | ( | ) | const [inline] |
Referenced by setMasterAddress().
{return _masterAddress;}
| MasterTime PeerTracker::masterTime | ( | const Address & | peer | ) | const |
Returns current master time. If peer corresponds to one of the master gateways the time is set to invalid value (1e99) to avoid cycling time assignments.
References MasterTime::setTimeFromMaster(), and MasterTime::setTimeToMaster().
Referenced by LinkBuffer::bytesAvailable().
{
MasterTime t=_masterTime;
if(peer==_toMasterGateway) {
t.setTimeToMaster(1e99);
}
if(peer==_fromMasterGateway) {
t.setTimeFromMaster(1e99);
}
return t;
}
| void PeerTracker::removeFromMasterRoute | ( | const Address & | destination | ) |
References RouteTcpHeader::ForwardRemoveRouteFromMaster, GpCoreTools::Address::index(), GpCoreTools::Address::isValid(), KernelRoute::removeDestination(), PeerInfo::stream(), GpCoreTools::Address::toString(), GpCoreTools::TcpClientStream::writeNoGateway(), and GpCoreTools::TcpClientStream::writePartialNoGateway().
Referenced by LinkBuffer::bytesAvailable(), and stop().
{
Log::write(2, "remove 'From Master' route with destination %s\n",
destination.toString().data());
_routes.removeDestination(destination);
if(Address::me()!=_masterAddress) {
if(_fromMasterGateway.isValid()) {
PeerInfo * p=_peers[_fromMasterGateway.index()];
if(p) {
const LinkStream * s=p->stream();
RouteTcpHeader hb(RouteTcpHeader::ForwardRemoveRouteFromMaster);
s->writePartialNoGateway((const char *)&hb, sizeof(RouteTcpHeader));
s->writeNoGateway((const char *)&destination, sizeof(Address));
} else {
Log::write(0, "broken route 'from master' to [%s] (remove up)\n", destination.toString().data());
}
} else {
Log::write(0, "broken route 'from master' to [%s] (remove up)\n", destination.toString().data());
}
}
}
| void PeerTracker::resetFromMasterGateway | ( | ) |
References PeerInfo::address(), PeerInfo::masterTime(), and MasterTime::timeFromMaster().
Referenced by setLink(), setMasterAddress(), and stop().
{
Log::write(1, "reset gateway from master\n");
double timeFromMaster=1e99;
Address fromMasterGateway;
std::list<PeerInfo *>::iterator it;
for(it=_activePeers.begin(); it!=_activePeers.end(); it++) {
PeerInfo * p=*it;
if(p->masterTime().timeFromMaster()<timeFromMaster) {
fromMasterGateway=p->address();
timeFromMaster=p->masterTime().timeFromMaster();
}
}
setFromMasterGateway(timeFromMaster, fromMasterGateway);
}
| void PeerTracker::resetToMasterGateway | ( | ) |
References PeerInfo::address(), PeerInfo::masterTime(), and MasterTime::timeToMaster().
Referenced by setLink(), setMasterAddress(), and stop().
{
Log::write(1, "reset gateway to master\n");
double timeToMaster=1e99;
Address toMasterGateway;
std::list<PeerInfo *>::iterator it;
for(it=_activePeers.begin(); it!=_activePeers.end(); it++) {
PeerInfo * p=*it;
if(p->masterTime().timeToMaster()<timeToMaster) {
toMasterGateway=p->address();
timeToMaster=p->masterTime().timeToMaster();
}
}
setToMasterGateway(timeToMaster, toMasterGateway);
}
| void PeerTracker::setAsMaster | ( | ) |
References GpCoreTools::Address::isValid(), MasterTime::setTimeFromMaster(), and MasterTime::setTimeToMaster().
Referenced by main().
{
assert(!_masterAddress.isValid());
_masterTime.setTimeToMaster(0.0);
_masterTime.setTimeFromMaster(0.0);
_masterAddress=Address::me();
Log::write(0, "I'm the Master\n");
}
| void PeerTracker::setLink | ( | const Address & | peer, |
| const LinkInfo & | link, | ||
| const MasterTime & | peerMasterTime | ||
| ) |
References PeerInfo::calculate(), GpCoreTools::Address::index(), PeerInfo::masterTime(), resetFromMasterGateway(), resetToMasterGateway(), PeerInfo::setLink(), PeerInfo::setPeerMasterTime(), MasterTime::setTimeFromMaster(), MasterTime::setTimeToMaster(), MasterTime::timeFromMaster(), and MasterTime::timeToMaster().
Referenced by LinkBuffer::bytesAvailable().
{
PeerInfo * p=_peers[peer.index()];
assert(p);
p->setLink(link);
p->setPeerMasterTime(peerMasterTime);
p->calculate();
if(_toMasterGateway==peer) {
Log::write(5, "'to master': current=%.3lg new=%.3lg\n", _masterTime.timeToMaster(), p->masterTime().timeToMaster());
// Our best link gave bad results, maybe another link is suitable for gateway
if(p->masterTime().timeToMaster()>_masterTime.timeToMaster()) {
resetToMasterGateway();
} else { // Just a better flow but no change on routing
_masterTime.setTimeToMaster(p->masterTime().timeToMaster());
}
} else {
// It was not the best link but it will be...
if(p->masterTime().timeToMaster()<_masterTime.timeToMaster()) {
setToMasterGateway(p->masterTime().timeToMaster(), peer);
}
}
if(_fromMasterGateway==peer) {
Log::write(5, "'from master': current=%.3lg new=%.3lg\n", _masterTime.timeFromMaster(), p->masterTime().timeFromMaster());
// Our best link gave bad results, maybe another link is suitable for gateway
if(p->masterTime().timeFromMaster()>_masterTime.timeFromMaster()) {
resetFromMasterGateway();
} else { // Just a better flow but no change on routing
_masterTime.setTimeFromMaster(p->masterTime().timeFromMaster());
}
} else {
// It was not the best link but it will be...
if(p->masterTime().timeFromMaster()<_masterTime.timeFromMaster()) {
setFromMasterGateway(p->masterTime().timeFromMaster(), peer);
}
}
}
| void PeerTracker::setMasterAddress | ( | const Address & | masterAddress | ) |
References GpCoreTools::Address::isValid(), masterAddress(), resetFromMasterGateway(), resetToMasterGateway(), and GpCoreTools::Address::toString().
Referenced by LinkBuffer::bytesAvailable().
{
if(masterAddress.isValid() && masterAddress!=_masterAddress) {
_masterAddress=masterAddress;
resetToMasterGateway();
resetFromMasterGateway();
Log::write(0, "master address changed to %s\n", _masterAddress.toString().data());
updatePeers();
}
}
| void PeerTracker::start | ( | const Address & | peer, |
| LinkStream * | link | ||
| ) |
References GpCoreTools::Address::index(), and PeerInfo::setStream().
Referenced by LinkStream::LinkStream().
| void PeerTracker::stop | ( | const Address & | peer | ) |
References PeerInfo::address(), KernelRoute::destination(), GpCoreTools::Address::index(), removeFromMasterRoute(), resetFromMasterGateway(), resetToMasterGateway(), and GpCoreTools::Address::toString().
Referenced by checkDeadPeers(), LinkTimer::exec(), and LinkStream::hungUp().
{
int index=peer.index();
Log::write(7, "stop peer with index: %i\n", index);
PeerInfo * p=_peers[index];
if(p) {
std::list<PeerInfo *>::iterator it;
for(it=_activePeers.begin(); it!=_activePeers.end(); it++) {
PeerInfo * p=*it;
if(p->address()==peer) {
break;
}
}
assert(it!=_activePeers.end());
// Remove all routes related to the closing peer
Address destination;
while((destination=_routes.destination(peer)).isValid()) {
removeFromMasterRoute(destination);
}
_activePeers.erase(it);
Log::write(7, "number of active peers: %i\n", _activePeers.size());
for(it=_activePeers.begin(); it!=_activePeers.end(); it++) {
Log::write(7, " [%s]\n",(*it)->address().toString().data());
}
_peers[index]=0;
delete p;
Log::write(7, "peer %s removed\n", peer.toString().data());
// Peer was my connection to master... check for another route
if(peer==_toMasterGateway) {
resetToMasterGateway();
}
if(peer==_fromMasterGateway) {
resetFromMasterGateway();
}
Log::write(7, "route to/from master updated\n");
}
}