summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLorenzo Colitti <lorenzo@google.com>2015-10-16 13:37:33 +0900
committerLorenzo Colitti <lorenzo@google.com>2015-10-27 15:19:29 +0900
commitd2344168138814630764df3da6b23dc839b4890d (patch)
tree4762f2fda0184b2bdcae5e7f2d1f242c83a9c325
parentc6fb2e938cfa03c5cccaad6ff87de61a8548e853 (diff)
downloadextras-d2344168138814630764df3da6b23dc839b4890d.tar.gz
Support reading /proc/net/tcp[6].
Support the two different TCP socket formats, and move the parsing code to net_test.py so it can be used for other things apart from ping. Change-Id: Iaa10cc323d11834b6ac9429d42708f6cdf89f404
-rwxr-xr-xtests/net_test/net_test.py49
-rwxr-xr-xtests/net_test/ping6_test.py37
2 files changed, 49 insertions, 37 deletions
diff --git a/tests/net_test/net_test.py b/tests/net_test/net_test.py
index 6628d354..09a6a555 100755
--- a/tests/net_test/net_test.py
+++ b/tests/net_test/net_test.py
@@ -17,6 +17,7 @@
import fcntl
import os
import random
+import re
from socket import * # pylint: disable=wildcard-import
import struct
import unittest
@@ -317,6 +318,54 @@ class NetworkTest(unittest.TestCase):
msg = os.strerror(err_num)
self.assertRaisesRegexp(EnvironmentError, msg, f, *args)
+ def ReadProcNetSocket(self, protocol):
+ # Read file.
+ filename = "/proc/net/%s" % protocol
+ lines = open(filename).readlines()
+
+ # Possibly check, and strip, header.
+ if protocol in ["icmp6", "raw6", "udp6"]:
+ self.assertEqual(IPV6_SEQ_DGRAM_HEADER, lines[0])
+ lines = lines[1:]
+
+ # Check contents.
+ if protocol.endswith("6"):
+ addrlen = 32
+ else:
+ addrlen = 8
+
+ if protocol.startswith("tcp"):
+ # Real sockets have 5 extra numbers, timewait sockets have none.
+ end_regexp = "(| +[0-9]+ [0-9]+ [0-9]+ [0-9]+ -?[0-9]+|)$"
+ elif re.match("icmp|udp|raw", protocol):
+ # Drops.
+ end_regexp = " +([0-9]+) *$"
+ else:
+ raise ValueError("Don't know how to parse %s" % filename)
+
+ regexp = re.compile(r" *(\d+): " # bucket
+ "([0-9A-F]{%d}:[0-9A-F]{4}) " # srcaddr, port
+ "([0-9A-F]{%d}:[0-9A-F]{4}) " # dstaddr, port
+ "([0-9A-F][0-9A-F]) " # state
+ "([0-9A-F]{8}:[0-9A-F]{8}) " # mem
+ "([0-9A-F]{2}:[0-9A-F]{8}) " # ?
+ "([0-9A-F]{8}) +" # ?
+ "([0-9]+) +" # uid
+ "([0-9]+) +" # timeout
+ "([0-9]+) +" # inode
+ "([0-9]+) +" # refcnt
+ "([0-9a-f]+)" # sp
+ "%s" # icmp has spaces
+ % (addrlen, addrlen, end_regexp))
+ # Return a list of lists with only source / dest addresses for now.
+ # TODO: consider returning a dict or namedtuple instead.
+ out = []
+ for line in lines:
+ (_, src, dst, state, mem,
+ _, _, uid, _, _, refcnt, _, extra) = regexp.match(line).groups()
+ out.append([src, dst, state, mem, uid, refcnt, extra])
+ return out
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/net_test/ping6_test.py b/tests/net_test/ping6_test.py
index d6efa4ef..bf51cfa1 100755
--- a/tests/net_test/ping6_test.py
+++ b/tests/net_test/ping6_test.py
@@ -20,7 +20,6 @@ import errno
import os
import posix
import random
-import re
from socket import * # pylint: disable=wildcard-import
import threading
import time
@@ -254,42 +253,6 @@ class Ping6Test(multinetwork_base.MultiNetworkBaseTest):
self.assertEqual(len(data), len(rcvd))
self.assertEqual(data[6:].encode("hex"), rcvd[6:].encode("hex"))
- def ReadProcNetSocket(self, protocol):
- # Read file.
- lines = open("/proc/net/%s" % protocol).readlines()
-
- # Possibly check, and strip, header.
- if protocol in ["icmp6", "raw6", "udp6"]:
- self.assertEqual(net_test.IPV6_SEQ_DGRAM_HEADER, lines[0])
- lines = lines[1:]
-
- # Check contents.
- if protocol.endswith("6"):
- addrlen = 32
- else:
- addrlen = 8
- regexp = re.compile(r" *(\d+): " # bucket
- "([0-9A-F]{%d}:[0-9A-F]{4}) " # srcaddr, port
- "([0-9A-F]{%d}:[0-9A-F]{4}) " # dstaddr, port
- "([0-9A-F][0-9A-F]) " # state
- "([0-9A-F]{8}:[0-9A-F]{8}) " # mem
- "([0-9A-F]{2}:[0-9A-F]{8}) " # ?
- "([0-9A-F]{8}) +" # ?
- "([0-9]+) +" # uid
- "([0-9]+) +" # ?
- "([0-9]+) +" # inode
- "([0-9]+) +" # refcnt
- "([0-9a-f]+) +" # sp
- "([0-9]+) *$" # drops, icmp has spaces
- % (addrlen, addrlen))
- # Return a list of lists with only source / dest addresses for now.
- out = []
- for line in lines:
- (_, src, dst, state, mem,
- _, _, uid, _, _, refcnt, _, drops) = regexp.match(line).groups()
- out.append([src, dst, state, mem, uid, refcnt, drops])
- return out
-
def CheckSockStatFile(self, name, srcaddr, srcport, dstaddr, dstport, state,
txmem=0, rxmem=0):
expected = ["%s:%04X" % (net_test.FormatSockStatAddress(srcaddr), srcport),