diff options
author | Lorenzo Colitti <lorenzo@google.com> | 2015-10-16 13:37:33 +0900 |
---|---|---|
committer | Lorenzo Colitti <lorenzo@google.com> | 2015-10-27 15:19:29 +0900 |
commit | d2344168138814630764df3da6b23dc839b4890d (patch) | |
tree | 4762f2fda0184b2bdcae5e7f2d1f242c83a9c325 | |
parent | c6fb2e938cfa03c5cccaad6ff87de61a8548e853 (diff) | |
download | extras-d2344168138814630764df3da6b23dc839b4890d.tar.gz |
Support reading /proc/net/tcp[6].
Support the two different TCP socket formats, and move the
parsing code to net_test.py so it can be used for other things
apart from ping.
Change-Id: Iaa10cc323d11834b6ac9429d42708f6cdf89f404
-rwxr-xr-x | tests/net_test/net_test.py | 49 | ||||
-rwxr-xr-x | tests/net_test/ping6_test.py | 37 |
2 files changed, 49 insertions, 37 deletions
diff --git a/tests/net_test/net_test.py b/tests/net_test/net_test.py index 6628d354..09a6a555 100755 --- a/tests/net_test/net_test.py +++ b/tests/net_test/net_test.py @@ -17,6 +17,7 @@ import fcntl import os import random +import re from socket import * # pylint: disable=wildcard-import import struct import unittest @@ -317,6 +318,54 @@ class NetworkTest(unittest.TestCase): msg = os.strerror(err_num) self.assertRaisesRegexp(EnvironmentError, msg, f, *args) + def ReadProcNetSocket(self, protocol): + # Read file. + filename = "/proc/net/%s" % protocol + lines = open(filename).readlines() + + # Possibly check, and strip, header. + if protocol in ["icmp6", "raw6", "udp6"]: + self.assertEqual(IPV6_SEQ_DGRAM_HEADER, lines[0]) + lines = lines[1:] + + # Check contents. + if protocol.endswith("6"): + addrlen = 32 + else: + addrlen = 8 + + if protocol.startswith("tcp"): + # Real sockets have 5 extra numbers, timewait sockets have none. + end_regexp = "(| +[0-9]+ [0-9]+ [0-9]+ [0-9]+ -?[0-9]+|)$" + elif re.match("icmp|udp|raw", protocol): + # Drops. + end_regexp = " +([0-9]+) *$" + else: + raise ValueError("Don't know how to parse %s" % filename) + + regexp = re.compile(r" *(\d+): " # bucket + "([0-9A-F]{%d}:[0-9A-F]{4}) " # srcaddr, port + "([0-9A-F]{%d}:[0-9A-F]{4}) " # dstaddr, port + "([0-9A-F][0-9A-F]) " # state + "([0-9A-F]{8}:[0-9A-F]{8}) " # mem + "([0-9A-F]{2}:[0-9A-F]{8}) " # ? + "([0-9A-F]{8}) +" # ? + "([0-9]+) +" # uid + "([0-9]+) +" # timeout + "([0-9]+) +" # inode + "([0-9]+) +" # refcnt + "([0-9a-f]+)" # sp + "%s" # icmp has spaces + % (addrlen, addrlen, end_regexp)) + # Return a list of lists with only source / dest addresses for now. + # TODO: consider returning a dict or namedtuple instead. + out = [] + for line in lines: + (_, src, dst, state, mem, + _, _, uid, _, _, refcnt, _, extra) = regexp.match(line).groups() + out.append([src, dst, state, mem, uid, refcnt, extra]) + return out + if __name__ == "__main__": unittest.main() diff --git a/tests/net_test/ping6_test.py b/tests/net_test/ping6_test.py index d6efa4ef..bf51cfa1 100755 --- a/tests/net_test/ping6_test.py +++ b/tests/net_test/ping6_test.py @@ -20,7 +20,6 @@ import errno import os import posix import random -import re from socket import * # pylint: disable=wildcard-import import threading import time @@ -254,42 +253,6 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest): self.assertEqual(len(data), len(rcvd)) self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex")) - def ReadProcNetSocket(self, protocol): - # Read file. - lines = open("/proc/net/%s" % protocol).readlines() - - # Possibly check, and strip, header. - if protocol in ["icmp6", "raw6", "udp6"]: - self.assertEqual(net_test.IPV6_SEQ_DGRAM_HEADER, lines[0]) - lines = lines[1:] - - # Check contents. - if protocol.endswith("6"): - addrlen = 32 - else: - addrlen = 8 - regexp = re.compile(r" *(\d+): " # bucket - "([0-9A-F]{%d}:[0-9A-F]{4}) " # srcaddr, port - "([0-9A-F]{%d}:[0-9A-F]{4}) " # dstaddr, port - "([0-9A-F][0-9A-F]) " # state - "([0-9A-F]{8}:[0-9A-F]{8}) " # mem - "([0-9A-F]{2}:[0-9A-F]{8}) " # ? - "([0-9A-F]{8}) +" # ? - "([0-9]+) +" # uid - "([0-9]+) +" # ? - "([0-9]+) +" # inode - "([0-9]+) +" # refcnt - "([0-9a-f]+) +" # sp - "([0-9]+) *$" # drops, icmp has spaces - % (addrlen, addrlen)) - # Return a list of lists with only source / dest addresses for now. - out = [] - for line in lines: - (_, src, dst, state, mem, - _, _, uid, _, _, refcnt, _, drops) = regexp.match(line).groups() - out.append([src, dst, state, mem, uid, refcnt, drops]) - return out - def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state, txmem=0, rxmem=0): expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport), |