diff options
author | Lorenzo Colitti <lorenzo@google.com> | 2015-12-14 15:41:30 +0900 |
---|---|---|
committer | Lorenzo Colitti <lorenzo@google.com> | 2015-12-14 16:45:25 +0900 |
commit | 34e3b9fe8c25979f1a4aff76a5261568c2341019 (patch) | |
tree | cba0e3f41b9b8bd2e403fff4f0a6fc1920e1c705 | |
parent | 68e30d38f69f0a94df886a57b334e836cc48050e (diff) | |
download | extras-34e3b9fe8c25979f1a4aff76a5261568c2341019.tar.gz |
Add utility functions to neighbour_test.
1. Clear the ND cache entries for the IPv6 routers before each
test, so we can add tests and always start from a clean slate.
2. Support expecting multicast NS.
3. Support generating NAs with different source and destination
addresses.
Change-Id: I9f8bb37540efa2d237eb236059e859f42aacdadc
-rwxr-xr-x | tests/net_test/neighbour_test.py | 50 |
1 files changed, 39 insertions, 11 deletions
diff --git a/tests/net_test/neighbour_test.py b/tests/net_test/neighbour_test.py index ca39d4a3..c80a0d45 100755 --- a/tests/net_test/neighbour_test.py +++ b/tests/net_test/neighbour_test.py @@ -66,15 +66,22 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest): def setUp(self): super(NeighbourTest, self).setUp() - self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE) - self.sock.bind((0, RTMGRP_NEIGH)) - net_test.SetNonBlocking(self.sock) - for netid in self.tuns: + # Clear the ND cache entries for all routers, so each test starts with + # the IPv6 default router in state STALE. + addr = self._RouterAddress(netid, 6) + ifindex = self.ifindices[netid] + self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED) + + # Configure IPv6 by sending an RA. self.SendRA(netid, retranstimer=self.RETRANS_TIME_MS, reachabletime=self.REACHABLE_TIME_MS) + self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE) + self.sock.bind((0, RTMGRP_NEIGH)) + net_test.SetNonBlocking(self.sock) + self.netid = random.choice(self.tuns.keys()) self.ifindex = self.ifindices[self.netid] @@ -105,25 +112,46 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest): for name in attrs: self.assertEquals(attrs[name], actual_attrs[name]) - def ExpectUnicastProbe(self, addr): + def ExpectProbe(self, is_unicast, addr): version = 6 if ":" in addr else 4 if version == 6: + llsrc = self.MyMacAddress(self.netid) + if is_unicast: + src = self.MyLinkLocalAddress(self.netid) + dst = addr + else: + solicited = inet_pton(AF_INET6, addr) + last3bytes = tuple([ord(b) for b in solicited[-3:]]) + dst = "ff02::1:ff%02x:%02x%02x" % last3bytes + src = self.MyAddress(6, self.netid) expected = ( - scapy.IPv6(src=self.MyLinkLocalAddress(self.netid), dst=addr) / + scapy.IPv6(src=src, dst=dst) / scapy.ICMPv6ND_NS(tgt=addr) / - scapy.ICMPv6NDOptSrcLLAddr(lladdr=self.MyMacAddress(self.netid)) + scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc) ) - self.ExpectPacketOn(self.netid, "Unicast probe", expected) + msg = "%s probe" % ("Unicast" if is_unicast else "Multicast") + self.ExpectPacketOn(self.netid, msg, expected) else: raise NotImplementedError - def ReceiveUnicastAdvertisement(self, addr, mac): + def ExpectUnicastProbe(self, addr): + self.ExpectProbe(True, addr) + + def ExpectMulticastNS(self, addr): + self.ExpectProbe(False, addr) + + def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None, + S=1, O=0, R=1): version = 6 if ":" in addr else 4 + if srcaddr is None: + srcaddr = addr + if dstaddr is None: + dstaddr = self.MyLinkLocalAddress(self.netid) if version == 6: packet = ( scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) / - scapy.IPv6(src=addr, dst=self.MyLinkLocalAddress(self.netid)) / - scapy.ICMPv6ND_NA(tgt=addr, S=1, O=0) / + scapy.IPv6(src=srcaddr, dst=dstaddr) / + scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) / scapy.ICMPv6NDOptDstLLAddr(lladdr=mac) ) self.ReceiveEtherPacketOn(self.netid, packet) |