Ignition Transport

API Reference

4.0.0
Discovery.hh
Go to the documentation of this file.
1/*
2 * Copyright (C) 2014 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18#ifndef IGNITION_TRANSPORT_DISCOVERY_HH_
19#define IGNITION_TRANSPORT_DISCOVERY_HH_
20
21#ifdef _WIN32
22 // For socket(), connect(), send(), and recv().
23 #include <Winsock2.h>
24 #include <Ws2def.h>
25 #include <Ws2ipdef.h>
26 #include <Ws2tcpip.h>
27 // Type used for raw data on this platform.
28 using raw_type = char;
29#else
30 // For data types
31 #include <sys/types.h>
32 // For socket(), connect(), send(), and recv()
33 #include <sys/socket.h>
34 // For gethostbyname()
35 #include <netdb.h>
36 // For inet_addr()
37 #include <arpa/inet.h>
38 // For close()
39 #include <unistd.h>
40 // For sockaddr_in
41 #include <netinet/in.h>
42 // Type used for raw data on this platform
43 using raw_type = void;
44#endif
45
46#ifdef _WIN32
47 #pragma warning(push, 0)
48#endif
49#ifdef _WIN32
50 #pragma warning(pop)
51 // Suppress "decorated name length exceed" warning in STL.
52 #pragma warning(disable: 4503)
53 // Suppress "depreted API warnings" in WINSOCK.
54 #pragma warning(disable: 4996)
55#endif
56
57#include <algorithm>
58#include <condition_variable>
59#include <map>
60#include <memory>
61#include <mutex>
62#include <string>
63#include <thread>
64#include <vector>
65
66#include "ignition/transport/Export.hh"
73
74namespace ignition
75{
76 namespace transport
77 {
83 bool IGNITION_TRANSPORT_VISIBLE pollSockets(
84 const std::vector<int> &_sockets,
85 const int _timeout);
86
95 template<typename Pub>
97 {
103 public: Discovery(const std::string &_pUuid,
104 const int _port,
105 const bool _verbose = false)
106 : port(_port),
107 hostAddr(determineHost()),
108 pUuid(_pUuid),
109 silenceInterval(kDefSilenceInterval),
110 activityInterval(kDefActivityInterval),
111 heartbeatInterval(kDefHeartbeatInterval),
112 connectionCb(nullptr),
113 disconnectionCb(nullptr),
114 verbose(_verbose),
115 initialized(false),
116 numHeartbeatsUninitialized(0),
117 exit(false),
118 enabled(false)
119 {
120 std::string ignIp;
121 if (env("IGN_IP", ignIp) && !ignIp.empty())
122 this->hostInterfaces = {ignIp};
123 else
124 {
125 // Get the list of network interfaces in this host.
126 this->hostInterfaces = determineInterfaces();
127 }
128
129#ifdef _WIN32
130 WORD wVersionRequested;
131 WSADATA wsaData;
132
133 // Request WinSock v2.2.
134 wVersionRequested = MAKEWORD(2, 2);
135 // Load WinSock DLL.
136 if (WSAStartup(wVersionRequested, &wsaData) != 0)
137 {
138 std::cerr << "Unable to load WinSock DLL" << std::endl;
139 return;
140 }
141#endif
142 for (const auto &netIface : this->hostInterfaces)
143 {
144 auto succeed = this->RegisterNetIface(netIface);
145
146 // If the IP address that we're selecting as the main IP address of
147 // the host is invalid, we change it to 127.0.0.1 .
148 // This is probably because IGN_IP is set to a wrong value.
149 if (netIface == this->hostAddr && !succeed)
150 {
151 this->RegisterNetIface("127.0.0.1");
152 std::cerr << "Did you set the environment variable IGN_IP with a "
153 << "correct IP address? " << std::endl
154 << " [" << netIface << "] seems an invalid local IP "
155 << "address." << std::endl
156 << " Using 127.0.0.1 as hostname." << std::endl;
157 this->hostAddr = "127.0.0.1";
158 }
159 }
160
161 // Socket option: SO_REUSEADDR. This options is used only for receiving
162 // data. We can reuse the same socket for receiving multicast data from
163 // multiple interfaces. We will use the socket at position 0 for
164 // receiving data.
165 int reuseAddr = 1;
166 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEADDR,
167 reinterpret_cast<const char *>(&reuseAddr), sizeof(reuseAddr)) != 0)
168 {
169 std::cerr << "Error setting socket option (SO_REUSEADDR)."
170 << std::endl;
171 return;
172 }
173
174#ifdef SO_REUSEPORT
175 // Socket option: SO_REUSEPORT. This options is used only for receiving
176 // data. We can reuse the same socket for receiving multicast data from
177 // multiple interfaces. We will use the socket at position 0 for
178 // receiving data.
179 int reusePort = 1;
180 if (setsockopt(this->sockets.at(0), SOL_SOCKET, SO_REUSEPORT,
181 reinterpret_cast<const char *>(&reusePort), sizeof(reusePort)) != 0)
182 {
183 std::cerr << "Error setting socket option (SO_REUSEPORT)."
184 << std::endl;
185 return;
186 }
187#endif
188 // Bind the first socket to the discovery port.
189 sockaddr_in localAddr;
190 memset(&localAddr, 0, sizeof(localAddr));
191 localAddr.sin_family = AF_INET;
192 localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
193 localAddr.sin_port = htons(static_cast<u_short>(this->port));
194
195 if (bind(this->sockets.at(0),
196 reinterpret_cast<sockaddr *>(&localAddr), sizeof(sockaddr_in)) < 0)
197 {
198 std::cerr << "Binding to a local port failed." << std::endl;
199 return;
200 }
201
202 // Set 'mcastAddr' to the multicast discovery group.
203 memset(&this->mcastAddr, 0, sizeof(this->mcastAddr));
204 this->mcastAddr.sin_family = AF_INET;
205 this->mcastAddr.sin_addr.s_addr =
206 inet_addr(this->kMulticastGroup.c_str());
207 this->mcastAddr.sin_port = htons(static_cast<u_short>(this->port));
208
209 if (this->verbose)
210 this->PrintCurrentState();
211 }
212
214 public: virtual ~Discovery()
215 {
216 // Tell the service thread to terminate.
217 this->exitMutex.lock();
218 this->exit = true;
219 this->exitMutex.unlock();
220
221 // Wait for the service threads to finish before exit.
222 if (this->threadReception.joinable())
223 this->threadReception.join();
224
225 // Broadcast a BYE message to trigger the remote cancellation of
226 // all our advertised topics.
227 this->SendMsg(ByeType,
228 Publisher("", "", this->pUuid, "", AdvertiseOptions()));
229
230 // Close sockets.
231 for (const auto &sock : this->sockets)
232 {
233#ifdef _WIN32
234 closesocket(sock);
235 WSACleanup();
236#else
237 close(sock);
238#endif
239 }
240 }
241
245 public: void Start()
246 {
247 {
248 std::lock_guard<std::mutex> lock(this->mutex);
249
250 // The service is already running.
251 if (this->enabled)
252 return;
253
254 this->enabled = true;
255 }
256
258 this->timeNextHeartbeat = now;
259 this->timeNextActivity = now;
260
261 // Start the thread that receives discovery information.
262 this->threadReception = std::thread(&Discovery::RecvMessages, this);
263 }
264
269 public: bool Advertise(const Pub &_publisher)
270 {
271 {
272 std::lock_guard<std::mutex> lock(this->mutex);
273
274 if (!this->enabled)
275 return false;
276
277 // Add the addressing information (local publisher).
278 if (!this->info.AddPublisher(_publisher))
279 return false;
280 }
281
282 // Only advertise a message outside this process if the scope
283 // is not 'Process'
284 if (_publisher.Options().Scope() != Scope_t::PROCESS)
285 this->SendMsg(AdvType, _publisher);
286
287 return true;
288 }
289
300 public: bool Discover(const std::string &_topic) const
301 {
303 bool found;
305
306 {
307 std::lock_guard<std::mutex> lock(this->mutex);
308
309 if (!this->enabled)
310 return false;
311
312 cb = this->connectionCb;
313 }
314
315 Pub pub;
316 pub.SetTopic(_topic);
317 pub.SetPUuid(this->pUuid);
318
319 // Send a discovery request.
320 this->SendMsg(SubType, pub);
321
322 {
323 std::lock_guard<std::mutex> lock(this->mutex);
324 found = this->info.Publishers(_topic, addresses);
325 }
326
327 if (found)
328 {
329 // I already have information about this topic.
330 for (const auto &proc : addresses)
331 {
332 for (const auto &node : proc.second)
333 {
334 if (cb)
335 {
336 // Execute the user's callback for a service request. Notice
337 // that we only execute one callback for preventing receive
338 // multiple service responses for a single request.
339 cb(node);
340 }
341 }
342 }
343 }
344
345 return true;
346 }
347
350 public: const TopicStorage<Pub> &Info() const
351 {
352 std::lock_guard<std::mutex> lock(this->mutex);
353 return this->info;
354 }
355
360 public: bool Publishers(const std::string &_topic,
361 Addresses_M<Pub> &_publishers) const
362 {
363 std::lock_guard<std::mutex> lock(this->mutex);
364 return this->info.Publishers(_topic, _publishers);
365 }
366
374 public: bool Unadvertise(const std::string &_topic,
375 const std::string &_nUuid)
376 {
377 Pub inf;
378 {
379 std::lock_guard<std::mutex> lock(this->mutex);
380
381 if (!this->enabled)
382 return false;
383
384 // Don't do anything if the topic is not advertised by any of my nodes
385 if (!this->info.Publisher(_topic, this->pUuid, _nUuid, inf))
386 return true;
387
388 // Remove the topic information.
389 this->info.DelPublisherByNode(_topic, this->pUuid, _nUuid);
390 }
391
392 // Only unadvertise a message outside this process if the scope
393 // is not 'Process'.
394 if (inf.Options().Scope() != Scope_t::PROCESS)
395 this->SendMsg(UnadvType, inf);
396
397 return true;
398 }
399
402 public: std::string HostAddr() const
403 {
404 std::lock_guard<std::mutex> lock(this->mutex);
405 return this->hostAddr;
406 }
407
412 public: unsigned int ActivityInterval() const
413 {
414 std::lock_guard<std::mutex> lock(this->mutex);
415 return this->activityInterval;
416 }
417
423 public: unsigned int HeartbeatInterval() const
424 {
425 std::lock_guard<std::mutex> lock(this->mutex);
426 return this->heartbeatInterval;
427 }
428
433 public: unsigned int SilenceInterval() const
434 {
435 std::lock_guard<std::mutex> lock(this->mutex);
436 return this->silenceInterval;
437 }
438
442 public: void SetActivityInterval(const unsigned int _ms)
443 {
444 std::lock_guard<std::mutex> lock(this->mutex);
445 this->activityInterval = _ms;
446 }
447
451 public: void SetHeartbeatInterval(const unsigned int _ms)
452 {
453 std::lock_guard<std::mutex> lock(this->mutex);
454 this->heartbeatInterval = _ms;
455 }
456
460 public: void SetSilenceInterval(const unsigned int _ms)
461 {
462 std::lock_guard<std::mutex> lock(this->mutex);
463 this->silenceInterval = _ms;
464 }
465
470 public: void ConnectionsCb(const DiscoveryCallback<Pub> &_cb)
471 {
472 std::lock_guard<std::mutex> lock(this->mutex);
473 this->connectionCb = _cb;
474 }
475
481 {
482 std::lock_guard<std::mutex> lock(this->mutex);
483 this->disconnectionCb = _cb;
484 }
485
487 public: void PrintCurrentState() const
488 {
489 std::lock_guard<std::mutex> lock(this->mutex);
490
491 std::cout << "---------------" << std::endl;
492 std::cout << std::boolalpha << "Enabled: "
493 << this->enabled << std::endl;
494 std::cout << "Discovery state" << std::endl;
495 std::cout << "\tUUID: " << this->pUuid << std::endl;
496 std::cout << "Settings" << std::endl;
497 std::cout << "\tActivity: " << this->activityInterval
498 << " ms." << std::endl;
499 std::cout << "\tHeartbeat: " << this->heartbeatInterval
500 << "ms." << std::endl;
501 std::cout << "\tSilence: " << this->silenceInterval
502 << " ms." << std::endl;
503 std::cout << "Known information:" << std::endl;
504 this->info.Print();
505
506 // Used to calculate the elapsed time.
508
509 std::cout << "Activity" << std::endl;
510 if (this->activity.empty())
511 std::cout << "\t<empty>" << std::endl;
512 else
513 {
514 for (auto &proc : this->activity)
515 {
516 // Elapsed time since the last update from this publisher.
517 std::chrono::duration<double> elapsed = now - proc.second;
518
519 std::cout << "\t" << proc.first << std::endl;
520 std::cout << "\t\t" << "Since: " << std::chrono::duration_cast<
521 std::chrono::milliseconds>(elapsed).count() << " ms. ago. "
522 << std::endl;
523 }
524 }
525 std::cout << "---------------" << std::endl;
526 }
527
530 public: void TopicList(std::vector<std::string> &_topics) const
531 {
532 this->WaitForInit();
533 std::lock_guard<std::mutex> lock(this->mutex);
534 this->info.TopicList(_topics);
535 }
536
539 public: void WaitForInit() const
540 {
541 std::unique_lock<std::mutex> lk(this->mutex);
542
543 if (!this->initialized)
544 {
545 this->initializedCv.wait(lk, [this]{return this->initialized;});
546 }
547 }
548
552 private: void UpdateActivity()
553 {
555
556 std::lock_guard<std::mutex> lock(this->mutex);
557
558 if (now < this->timeNextActivity)
559 return;
560
561 for (auto it = this->activity.cbegin(); it != this->activity.cend();)
562 {
563 // Elapsed time since the last update from this publisher.
564 auto elapsed = now - it->second;
565
566 // This publisher has expired.
567 if (std::chrono::duration_cast<std::chrono::milliseconds>
568 (elapsed).count() > this->silenceInterval)
569 {
570 // Remove all the info entries for this process UUID.
571 this->info.DelPublishersByProc(it->first);
572
573 // Notify without topic information. This is useful to inform the
574 // client that a remote node is gone, even if we were not
575 // interested in its topics.
576 Pub publisher;
577 publisher.SetPUuid(it->first);
578 this->disconnectionCb(publisher);
579
580 // Remove the activity entry.
581 this->activity.erase(it++);
582 }
583 else
584 ++it;
585 }
586
587 this->timeNextActivity = std::chrono::steady_clock::now() +
588 std::chrono::milliseconds(this->activityInterval);
589 }
590
592 private: void UpdateHeartbeat()
593 {
595
596 {
598
599 if (now < this->timeNextHeartbeat)
600 return;
601 }
602
603 Publisher pub("", "", this->pUuid, "", AdvertiseOptions());
604 this->SendMsg(HeartbeatType, pub);
605
607 {
609
610 // Re-advertise topics that are advertised inside this process.
611 this->info.PublishersByProc(this->pUuid, nodes);
612 }
613
614 for (const auto &topic : nodes)
615 {
616 for (const auto &node : topic.second)
617 this->SendMsg(AdvType, node);
618 }
619
620 {
622 if (!this->initialized)
623 {
624 ++this->numHeartbeatsUninitialized;
625 if (this->numHeartbeatsUninitialized == 2)
626 {
627 // We consider the discovery initialized after two cycles of
628 // heartbeats sent.
629 this->initialized = true;
630
631 // Notify anyone waiting for the initialization phase to finish.
632 this->initializedCv.notify_all();
633 }
634 }
635
636 this->timeNextHeartbeat = std::chrono::steady_clock::now() +
637 std::chrono::milliseconds(this->heartbeatInterval);
638 }
639 }
640
650 private: int NextTimeout() const
651 {
653 auto timeUntilNextHeartbeat = this->timeNextHeartbeat - now;
654 auto timeUntilNextActivity = this->timeNextActivity - now;
655
656 int t = static_cast<int>(
657 std::chrono::duration_cast<std::chrono::milliseconds>
658 (std::min(timeUntilNextHeartbeat, timeUntilNextActivity)).count());
659 int t2 = std::min(t, this->kTimeout);
660 return std::max(t2, 0);
661 }
662
664 private: void RecvMessages()
665 {
666 bool timeToExit = false;
667 while (!timeToExit)
668 {
669 // Calculate the timeout.
670 int timeout = this->NextTimeout();
671
672 if (pollSockets(this->sockets, timeout))
673 {
674 this->RecvDiscoveryUpdate();
675
676 if (this->verbose)
677 this->PrintCurrentState();
678 }
679
680 this->UpdateHeartbeat();
681 this->UpdateActivity();
682
683 // Is it time to exit?
684 {
685 std::lock_guard<std::mutex> lock(this->exitMutex);
686 if (this->exit)
687 timeToExit = true;
688 }
689 }
690 }
691
693 private: void RecvDiscoveryUpdate()
694 {
695 char rcvStr[Discovery::kMaxRcvStr];
696 std::string srcAddr;
697 uint16_t srcPort;
698 sockaddr_in clntAddr;
699 socklen_t addrLen = sizeof(clntAddr);
700
701 if ((recvfrom(this->sockets.at(0),
702 reinterpret_cast<raw_type *>(rcvStr),
703 this->kMaxRcvStr, 0,
704 reinterpret_cast<sockaddr *>(&clntAddr),
705 reinterpret_cast<socklen_t *>(&addrLen))) < 0)
706 {
707 std::cerr << "Discovery::RecvDiscoveryUpdate() recvfrom error"
708 << std::endl;
709 return;
710 }
711 srcAddr = inet_ntoa(clntAddr.sin_addr);
712 srcPort = ntohs(clntAddr.sin_port);
713
714 if (this->verbose)
715 {
716 std::cout << "\nReceived discovery update from " << srcAddr << ": "
717 << srcPort << std::endl;
718 }
719
720 this->DispatchDiscoveryMsg(srcAddr, rcvStr);
721 }
722
723
727 private: void DispatchDiscoveryMsg(const std::string &_fromIp,
728 char *_msg)
729 {
730 Header header;
731 char *pBody = _msg;
732
733 // Create the header from the raw bytes.
734 header.Unpack(_msg);
735 pBody += header.HeaderLength();
736
737 // Discard the message if the wire protocol is different than mine.
738 if (this->kWireVersion != header.Version())
739 return;
740
741 auto recvPUuid = header.PUuid();
742
743 // Discard our own discovery messages.
744 if (recvPUuid == this->pUuid)
745 return;
746
747 // Update timestamp and cache the callbacks.
748 DiscoveryCallback<Pub> connectCb;
749 DiscoveryCallback<Pub> disconnectCb;
750 {
752 this->activity[recvPUuid] = std::chrono::steady_clock::now();
753 connectCb = this->connectionCb;
754 disconnectCb = this->disconnectionCb;
755 }
756
757 switch (header.Type())
758 {
759 case AdvType:
760 {
761 // Read the rest of the fields.
762 transport::AdvertiseMessage<Pub> advMsg;
763 advMsg.Unpack(pBody);
764
765 // Check scope of the topic.
766 if ((advMsg.Publisher().Options().Scope() == Scope_t::PROCESS) ||
767 (advMsg.Publisher().Options().Scope() == Scope_t::HOST &&
768 _fromIp != this->hostAddr))
769 {
770 return;
771 }
772
773 // Register an advertised address for the topic.
774 bool added;
775 {
777 added = this->info.AddPublisher(advMsg.Publisher());
778 }
779
780 if (added && connectCb)
781 {
782 // Execute the client's callback.
783 connectCb(advMsg.Publisher());
784 }
785
786 break;
787 }
788 case SubType:
789 {
790 // Read the rest of the fields.
791 SubscriptionMsg subMsg;
792 subMsg.Unpack(pBody);
793 auto recvTopic = subMsg.Topic();
794
795 // Check if at least one of my nodes advertises the topic requested.
796 Addresses_M<Pub> addresses;
797 {
799 if (!this->info.HasAnyPublishers(recvTopic, this->pUuid))
800 {
801 break;
802 }
803
804 if (!this->info.Publishers(recvTopic, addresses))
805 break;
806 }
807
808 for (const auto &nodeInfo : addresses[this->pUuid])
809 {
810 // Check scope of the topic.
811 if ((nodeInfo.Options().Scope() == Scope_t::PROCESS) ||
812 (nodeInfo.Options().Scope() == Scope_t::HOST &&
813 _fromIp != this->hostAddr))
814 {
815 continue;
816 }
817
818 // Answer an ADVERTISE message.
819 this->SendMsg(AdvType, nodeInfo);
820 }
821
822 break;
823 }
824 case HeartbeatType:
825 {
826 // The timestamp has already been updated.
827 break;
828 }
829 case ByeType:
830 {
831 // Remove the activity entry for this publisher.
832 {
834 this->activity.erase(recvPUuid);
835 }
836
837 if (disconnectCb)
838 {
839 Pub pub;
840 pub.SetPUuid(recvPUuid);
841 // Notify the new disconnection.
842 disconnectCb(pub);
843 }
844
845 // Remove the address entry for this topic.
846 {
848 this->info.DelPublishersByProc(recvPUuid);
849 }
850
851 break;
852 }
853 case UnadvType:
854 {
855 // Read the address.
856 transport::AdvertiseMessage<Pub> advMsg;
857 advMsg.Unpack(pBody);
858
859 // Check scope of the topic.
860 if ((advMsg.Publisher().Options().Scope() == Scope_t::PROCESS) ||
861 (advMsg.Publisher().Options().Scope() == Scope_t::HOST &&
862 _fromIp != this->hostAddr))
863 {
864 return;
865 }
866
867 if (disconnectCb)
868 {
869 // Notify the new disconnection.
870 disconnectCb(advMsg.Publisher());
871 }
872
873 // Remove the address entry for this topic.
874 {
876 this->info.DelPublisherByNode(advMsg.Publisher().Topic(),
877 advMsg.Publisher().PUuid(), advMsg.Publisher().NUuid());
878 }
879
880 break;
881 }
882 default:
883 {
884 std::cerr << "Unknown message type [" << header.Type() << "]\n";
885 break;
886 }
887 }
888 }
889
896 private: template<typename T>
897 void SendMsg(const uint8_t _type,
898 const T &_pub,
899 const uint16_t _flags = 0) const
900 {
901 // Create the header.
902 Header header(this->Version(), _pub.PUuid(), _type, _flags);
903 auto msgLength = 0;
904 std::vector<char> buffer;
905
906 std::string topic = _pub.Topic();
907
908 switch (_type)
909 {
910 case AdvType:
911 case UnadvType:
912 {
913 // Create the [UN]ADVERTISE message.
914 transport::AdvertiseMessage<T> advMsg(header, _pub);
915
916 // Allocate a buffer and serialize the message.
917 buffer.resize(advMsg.MsgLength());
918 advMsg.Pack(reinterpret_cast<char*>(&buffer[0]));
919 msgLength = static_cast<int>(advMsg.MsgLength());
920 break;
921 }
922 case SubType:
923 {
924 // Create the [UN]SUBSCRIBE message.
925 SubscriptionMsg subMsg(header, topic);
926
927 // Allocate a buffer and serialize the message.
928 buffer.resize(subMsg.MsgLength());
929 subMsg.Pack(reinterpret_cast<char*>(&buffer[0]));
930 msgLength = static_cast<int>(subMsg.MsgLength());
931 break;
932 }
933 case HeartbeatType:
934 case ByeType:
935 {
936 // Allocate a buffer and serialize the message.
937 buffer.resize(header.HeaderLength());
938 header.Pack(reinterpret_cast<char*>(&buffer[0]));
939 msgLength = header.HeaderLength();
940 break;
941 }
942 default:
943 std::cerr << "Discovery::SendMsg() error: Unrecognized message"
944 << " type [" << _type << "]" << std::endl;
945 return;
946 }
947
948 // Send the discovery message to the multicast group through all the
949 // sockets.
950 for (const auto &sock : this->Sockets())
951 {
952 if (sendto(sock, reinterpret_cast<const raw_type *>(
953 reinterpret_cast<unsigned char*>(&buffer[0])),
954 msgLength, 0,
955 reinterpret_cast<const sockaddr *>(this->MulticastAddr()),
956 sizeof(*(this->MulticastAddr()))) != msgLength)
957 {
958 std::cerr << "Exception sending a message" << std::endl;
959 return;
960 }
961 }
962
963 if (this->Verbose())
964 {
965 std::cout << "\t* Sending " << MsgTypesStr[_type]
966 << " msg [" << topic << "]" << std::endl;
967 }
968 }
969
972 private: const std::vector<int> &Sockets() const
973 {
974 return this->sockets;
975 }
976
979 private: const sockaddr_in *MulticastAddr() const
980 {
981 return &this->mcastAddr;
982 }
983
986 private: bool Verbose() const
987 {
988 return this->verbose;
989 }
990
993 private: uint8_t Version() const
994 {
995 return this->kWireVersion;
996 }
997
1002 private: bool RegisterNetIface(const std::string &_ip)
1003 {
1004 // Make a new socket for sending discovery information.
1005 int sock = static_cast<int>(socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP));
1006 if (sock < 0)
1007 {
1008 std::cerr << "Socket creation failed." << std::endl;
1009 return false;
1010 }
1011
1012 // Socket option: IP_MULTICAST_IF.
1013 // This socket option needs to be applied to each socket used to send
1014 // data. This option selects the source interface for outgoing messages.
1015 struct in_addr ifAddr;
1016 ifAddr.s_addr = inet_addr(_ip.c_str());
1017 if (setsockopt(sock, IPPROTO_IP, IP_MULTICAST_IF,
1018 reinterpret_cast<const char*>(&ifAddr), sizeof(ifAddr)) != 0)
1019 {
1020 std::cerr << "Error setting socket option (IP_MULTICAST_IF)."
1021 << std::endl;
1022 return false;
1023 }
1024
1025 this->sockets.push_back(sock);
1026
1027 // Join the multicast group. We have to do it for each network interface
1028 // but we can do it on the same socket. We will use the socket at
1029 // position 0 for receiving multicast information.
1030 struct ip_mreq group;
1031 group.imr_multiaddr.s_addr =
1032 inet_addr(this->kMulticastGroup.c_str());
1033 group.imr_interface.s_addr = inet_addr(_ip.c_str());
1034 if (setsockopt(this->sockets.at(0), IPPROTO_IP, IP_ADD_MEMBERSHIP,
1035 reinterpret_cast<const char*>(&group), sizeof(group)) != 0)
1036 {
1037 std::cerr << "Error setting socket option (IP_ADD_MEMBERSHIP)."
1038 << std::endl;
1039 return false;
1040 }
1041
1042 return true;
1043 }
1044
1048 private: static const unsigned int kDefActivityInterval = 100;
1049
1053 private: static const unsigned int kDefHeartbeatInterval = 1000;
1054
1058 private: static const unsigned int kDefSilenceInterval = 3000;
1059
1061 private: const std::string kMulticastGroup = "224.0.0.7";
1062
1064 private: const int kTimeout = 250;
1065
1067 private: static const int kMaxRcvStr = 65536;
1068
1071 private: static const uint8_t kWireVersion = 8;
1072
1074 private: int port;
1075
1077 private: std::string hostAddr;
1078
1080 private: std::vector<std::string> hostInterfaces;
1081
1083 private: std::string pUuid;
1084
1088 private: unsigned int silenceInterval;
1089
1093 private: unsigned int activityInterval;
1094
1098 private: unsigned int heartbeatInterval;
1099
1101 private: DiscoveryCallback<Pub> connectionCb;
1102
1104 private: DiscoveryCallback<Pub> disconnectionCb;
1105
1107 private: TopicStorage<Pub> info;
1108
1114
1116 private: bool verbose;
1117
1119 private: std::vector<int> sockets;
1120
1122 private: sockaddr_in mcastAddr;
1123
1125 private: mutable std::mutex mutex;
1126
1128 private: std::thread threadReception;
1129
1131 private: Timestamp timeNextHeartbeat;
1132
1134 private: Timestamp timeNextActivity;
1135
1137 private: std::mutex exitMutex;
1138
1143 private: bool initialized;
1144
1146 private: unsigned int numHeartbeatsUninitialized;
1147
1149 private: mutable std::condition_variable initializedCv;
1150
1152 private: bool exit;
1153
1155 private: bool enabled;
1156 };
1157
1161
1165 }
1166}
1167
1168#endif
void raw_type
Definition Discovery.hh:43
T at(T... args)
T cbegin(T... args)
T boolalpha(T... args)
T c_str(T... args)
A class that is used to store information about an advertised publisher. An instance of this class is...
Definition Node.hh:521
A class for customizing the publication options for a topic or service advertised....
Definition AdvertiseOptions.hh:55
A discovery class that implements a distributed topic discovery protocol. It uses UDP multicast for s...
Definition Discovery.hh:97
void WaitForInit() const
Check if ready/initialized. If not, then wait on the initializedCv condition variable.
Definition Discovery.hh:539
void Start()
Start the discovery service. You probably want to register the callbacks for receiving discovery noti...
Definition Discovery.hh:245
bool Advertise(const Pub &_publisher)
Advertise a new message.
Definition Discovery.hh:269
unsigned int ActivityInterval() const
The discovery checks the validity of the topic information every 'activity interval' milliseconds.
Definition Discovery.hh:412
unsigned int HeartbeatInterval() const
Each node broadcasts periodic heartbeats to keep its topic information alive in other nodes....
Definition Discovery.hh:423
std::string HostAddr() const
Get the IP address of this host.
Definition Discovery.hh:402
std::map< std::string, Timestamp > activity
Activity information. Every time there is a message from a remote node, its activity information is u...
Definition Discovery.hh:1113
bool Unadvertise(const std::string &_topic, const std::string &_nUuid)
Unadvertise a new message. Broadcast a discovery message that will cancel all the discovery informati...
Definition Discovery.hh:374
void SetHeartbeatInterval(const unsigned int _ms)
Set the heartbeat interval.
Definition Discovery.hh:451
unsigned int SilenceInterval() const
Get the maximum time allowed without receiving any discovery information from a node before canceling...
Definition Discovery.hh:433
void PrintCurrentState() const
Print the current discovery state.
Definition Discovery.hh:487
void SetSilenceInterval(const unsigned int _ms)
Set the maximum silence interval.
Definition Discovery.hh:460
const TopicStorage< Pub > & Info() const
Get the discovery information.
Definition Discovery.hh:350
bool Publishers(const std::string &_topic, Addresses_M< Pub > &_publishers) const
Get all the publishers' information known for a given topic.
Definition Discovery.hh:360
void ConnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery connection events. Each time a new topic is connected,...
Definition Discovery.hh:470
bool Discover(const std::string &_topic) const
Request discovery information about a topic. When using this method, the user might want to use SetCo...
Definition Discovery.hh:300
void SetActivityInterval(const unsigned int _ms)
Set the activity interval.
Definition Discovery.hh:442
void DisconnectionsCb(const DiscoveryCallback< Pub > &_cb)
Register a callback to receive discovery disconnection events. Each time a topic is no longer active,...
Definition Discovery.hh:480
virtual ~Discovery()
Destructor.
Definition Discovery.hh:214
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently advertised in the network.
Definition Discovery.hh:530
Discovery(const std::string &_pUuid, const int _port, const bool _verbose=false)
Constructor.
Definition Discovery.hh:103
This class stores all the information about a publisher. It stores the topic name that publishes,...
Definition Publisher.hh:38
Store address information about topics and provide convenient methods for adding new topics,...
Definition TopicStorage.hh:38
bool HasAnyPublishers(const std::string &_topic, const std::string &_pUuid) const
Return if there is any publisher stored for the given topic and process UUID.
Definition TopicStorage.hh:130
bool DelPublisherByNode(const std::string &_topic, const std::string &_pUuid, const std::string &_nUuid)
Remove a publisher associated to a given topic and UUID pair.
Definition TopicStorage.hh:218
bool Publisher(const std::string &_topic, const std::string &_pUuid, const std::string &_nUuid, T &_publisher) const
Get the address information for a given topic and node UUID.
Definition TopicStorage.hh:165
bool DelPublishersByProc(const std::string &_pUuid)
Remove all the publishers associated to a given process.
Definition TopicStorage.hh:258
bool AddPublisher(const T &_publisher)
Add a new address associated to a given topic and node UUID.
Definition TopicStorage.hh:49
bool Publishers(const std::string &_topic, std::map< std::string, std::vector< T > > &_info) const
Get the map of publishers stored for a given topic.
Definition TopicStorage.hh:203
void PublishersByProc(const std::string &_pUuid, std::map< std::string, std::vector< T > > &_pubs) const
Given a process UUID, the function returns the list of publishers contained in this process UUID with...
Definition TopicStorage.hh:282
void Print() const
Print all the information for debugging purposes.
Definition TopicStorage.hh:342
void TopicList(std::vector< std::string > &_topics) const
Get the list of topics currently stored.
Definition TopicStorage.hh:335
T duration_cast(T... args)
T empty(T... args)
T cend(T... args)
T endl(T... args)
T erase(T... args)
T join(T... args)
T joinable(T... args)
T lock(T... args)
T max(T... args)
T min(T... args)
SrvAddresses_M addresses
Definition Node.hh:986
static const uint8_t ByeType
Definition Packet.hh:39
std::chrono::steady_clock::time_point Timestamp
Definition TransportTypes.hh:151
std::string determineHost()
Determine IP or hostname. Reference: https://github.com/ros/ros_comm/blob/hydro-devel/clients/ roscpp...
bool pollSockets(const std::vector< int > &_sockets, const int _timeout)
std::unique_lock< std::recursive_mutex > lk(this->Shared() ->mutex)
static const uint8_t UnadvType
Definition Packet.hh:37
*brief Advertise a new service without any output parameter *In this version the callback is a free function *param[in] _topic Topic name associated to the service *param[in] _cb Callback to handle the service request with the *following void(* _cb)(const RequestT &_req)
Definition Node.hh:527
static const uint8_t SubType
Definition Packet.hh:36
@ HOST
Topic/service only available to subscribers in the same machine as the publisher.
@ PROCESS
Topic/service only available to subscribers in the same process as the publisher.
bool env(const std::string &_name, std::string &_value)
Find the environment variable '_name' and return its value.
static const std::vector< std::string > MsgTypesStr
Used for debugging the message type received/send.
Definition Packet.hh:44
static const uint8_t HeartbeatType
Definition Packet.hh:38
std::vector< std::string > determineInterfaces()
Determine the list of network interfaces for this machine. Reference: https://github....
static const uint8_t AdvType
Definition Packet.hh:35
*brief Advertise a new service without any output parameter *In this version the callback is a free function *param[in] _topic Topic name associated to the service *param[in] _cb Callback to handle the service request with the *following void(*) const AdvertiseServiceOptions ReplyT const std::string _topic)
Definition Node.hh:558
cb(_internalRep, _internalResult)
Definition AdvertiseOptions.hh:28
T push_back(T... args)
T resize(T... args)
T unlock(T... args)