summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLorenzo Colitti <lorenzo@google.com>2015-12-14 15:41:30 +0900
committerLorenzo Colitti <lorenzo@google.com>2015-12-14 16:45:25 +0900
commit34e3b9fe8c25979f1a4aff76a5261568c2341019 (patch)
treecba0e3f41b9b8bd2e403fff4f0a6fc1920e1c705
parent68e30d38f69f0a94df886a57b334e836cc48050e (diff)
downloadextras-34e3b9fe8c25979f1a4aff76a5261568c2341019.tar.gz
Add utility functions to neighbour_test.
1. Clear the ND cache entries for the IPv6 routers before each test, so we can add tests and always start from a clean slate. 2. Support expecting multicast NS. 3. Support generating NAs with different source and destination addresses. Change-Id: I9f8bb37540efa2d237eb236059e859f42aacdadc
-rwxr-xr-xtests/net_test/neighbour_test.py50
1 files changed, 39 insertions, 11 deletions
diff --git a/tests/net_test/neighbour_test.py b/tests/net_test/neighbour_test.py
index ca39d4a3..c80a0d45 100755
--- a/tests/net_test/neighbour_test.py
+++ b/tests/net_test/neighbour_test.py
@@ -66,15 +66,22 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
def setUp(self):
super(NeighbourTest, self).setUp()
- self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE)
- self.sock.bind((0, RTMGRP_NEIGH))
- net_test.SetNonBlocking(self.sock)
-
for netid in self.tuns:
+ # Clear the ND cache entries for all routers, so each test starts with
+ # the IPv6 default router in state STALE.
+ addr = self._RouterAddress(netid, 6)
+ ifindex = self.ifindices[netid]
+ self.iproute.UpdateNeighbour(6, addr, None, ifindex, NUD_FAILED)
+
+ # Configure IPv6 by sending an RA.
self.SendRA(netid,
retranstimer=self.RETRANS_TIME_MS,
reachabletime=self.REACHABLE_TIME_MS)
+ self.sock = socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE)
+ self.sock.bind((0, RTMGRP_NEIGH))
+ net_test.SetNonBlocking(self.sock)
+
self.netid = random.choice(self.tuns.keys())
self.ifindex = self.ifindices[self.netid]
@@ -105,25 +112,46 @@ class NeighbourTest(multinetwork_base.MultiNetworkBaseTest):
for name in attrs:
self.assertEquals(attrs[name], actual_attrs[name])
- def ExpectUnicastProbe(self, addr):
+ def ExpectProbe(self, is_unicast, addr):
version = 6 if ":" in addr else 4
if version == 6:
+ llsrc = self.MyMacAddress(self.netid)
+ if is_unicast:
+ src = self.MyLinkLocalAddress(self.netid)
+ dst = addr
+ else:
+ solicited = inet_pton(AF_INET6, addr)
+ last3bytes = tuple([ord(b) for b in solicited[-3:]])
+ dst = "ff02::1:ff%02x:%02x%02x" % last3bytes
+ src = self.MyAddress(6, self.netid)
expected = (
- scapy.IPv6(src=self.MyLinkLocalAddress(self.netid), dst=addr) /
+ scapy.IPv6(src=src, dst=dst) /
scapy.ICMPv6ND_NS(tgt=addr) /
- scapy.ICMPv6NDOptSrcLLAddr(lladdr=self.MyMacAddress(self.netid))
+ scapy.ICMPv6NDOptSrcLLAddr(lladdr=llsrc)
)
- self.ExpectPacketOn(self.netid, "Unicast probe", expected)
+ msg = "%s probe" % ("Unicast" if is_unicast else "Multicast")
+ self.ExpectPacketOn(self.netid, msg, expected)
else:
raise NotImplementedError
- def ReceiveUnicastAdvertisement(self, addr, mac):
+ def ExpectUnicastProbe(self, addr):
+ self.ExpectProbe(True, addr)
+
+ def ExpectMulticastNS(self, addr):
+ self.ExpectProbe(False, addr)
+
+ def ReceiveUnicastAdvertisement(self, addr, mac, srcaddr=None, dstaddr=None,
+ S=1, O=0, R=1):
version = 6 if ":" in addr else 4
+ if srcaddr is None:
+ srcaddr = addr
+ if dstaddr is None:
+ dstaddr = self.MyLinkLocalAddress(self.netid)
if version == 6:
packet = (
scapy.Ether(src=mac, dst=self.MyMacAddress(self.netid)) /
- scapy.IPv6(src=addr, dst=self.MyLinkLocalAddress(self.netid)) /
- scapy.ICMPv6ND_NA(tgt=addr, S=1, O=0) /
+ scapy.IPv6(src=srcaddr, dst=dstaddr) /
+ scapy.ICMPv6ND_NA(tgt=addr, S=S, O=O, R=R) /
scapy.ICMPv6NDOptDstLLAddr(lladdr=mac)
)
self.ReceiveEtherPacketOn(self.netid, packet)