From d38c0f52ba2c9350280ee0c38138b001f1f86de4 Mon Sep 17 00:00:00 2001 From: John-Mark Gurney Date: Thu, 19 Sep 2019 12:09:03 -0700 Subject: [PATCH] add function to get all ports for a switch, this will be used to compare the switch's port list against the definition to make sure that there is a complete definition... Add the ability to skip the switch tests that can take a while to complete... --- vlanmang.py | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 80 insertions(+), 1 deletion(-) diff --git a/vlanmang.py b/vlanmang.py index 2ecefbc..9e3898c 100644 --- a/vlanmang.py +++ b/vlanmang.py @@ -5,12 +5,15 @@ from pysnmp.hlapi import * from pysnmp.smi.builder import MibBuilder from pysnmp.smi.view import MibViewController +import itertools import random import unittest _mbuilder = MibBuilder() _mvc = MibViewController(_mbuilder) +#import data + # received packages # pvid: dot1qPvid # @@ -29,7 +32,39 @@ _mvc = MibViewController(_mbuilder) # LLDP: # 1.0.8802.1.1.2.1.4.1.1 aka LLDP-MIB, lldpRemTable +def getpvidmapping(data, lookupfun): + '''Return a mapping from vlan based table to a port: vlan + dictionary.''' + + res = [] + for id in data: + for i in data[id]['u']: + if isinstance(i, str): + i = lookupfun(i) + res.append((i, id)) + + return dict(res) + +def getportlist(data, lookupfun): + '''Return a set of all the ports indexes in data.''' + + res = set() + + for id in data: + res.update(data[id]['u']) + res.update(data[id].get('t', [])) + + # filter out the strings + strports = set(x for x in res if isinstance(x, str)) + + res.update(lookupfun(x) for x in strports) + res.difference_update(strports) + + return res + class SNMPSwitch(object): + '''A class for manipulating switches via standard SNMP MIBs.''' + def __init__(self, host, community): self._eng = SnmpEngine() self._cd = CommunityData(community, mpModel=0) @@ -100,9 +135,14 @@ class SNMPSwitch(object): yield varBind def findport(self, name): + '''Look up a port name and return it's port index. This + looks up via the ifName table in IF-MIB.''' + return [ x[0][-1] for x in self._walk('IF-MIB', 'ifName') if str(x[1]) == name ][0] def getvlanname(self, vlan): + '''Return the name for the vlan.''' + v = self._get(('Q-BRIDGE-MIB', 'dot1qVlanStaticName', vlan)) return str(v).decode('utf-8') @@ -119,12 +159,49 @@ class SNMPSwitch(object): int(vlan)), 6) # destroy(6) def getvlans(self): + '''Return all the vlans.''' + return (x[0][-1] for x in self._walk('Q-BRIDGE-MIB', 'dot1qVlanStatus')) def staticvlans(self): + '''Return the staticly defined/configured vlans. This + sometimes excludes special built in vlans, like vlan 1.''' + return (x[0][-1] for x in self._walk('Q-BRIDGE-MIB', 'dot1qVlanStaticName')) -class Test(unittest.TestCase): +class _TestMisc(unittest.TestCase): + def test_pvid(self): + data = { + 1: { + 'u': [ 1, 5, 10 ] + range(13, 20) + }, + 10: { + 'u': [ 2, 3, 6, 7, 8, 'lag2' ], + }, + 13: { + 'u': [ 4, 9 ], + }, + } + lookup = { + 'lag2': 30 + } + + check = dict(itertools.chain(enumerate([ 1, 10, 10, 13, 1, 10, + 10, 10, 13, 1 ], 1), enumerate([ 1 ] * 7, 13), + [ (30, 10) ])) + + # That a pvid mapping + res = getpvidmapping(data, lookup.__getitem__) + + # is correct + self.assertEqual(res, check) + + self.assertEqual(getportlist(data, lookup.__getitem__), + set(xrange(1, 11)) | set(xrange(13, 20)) | set([30])) + +_skipSwitchTests = True + +class _TestSwitch(unittest.TestCase): def setUp(self): args = open('test.creds').read().split() self.switch = SNMPSwitch(*args) @@ -132,12 +209,14 @@ class Test(unittest.TestCase): def test_unpacktable(self): pass + @unittest.skipIf(_skipSwitchTests, 'slow') def test_misc(self): switch = self.switch self.assertEqual(switch.findport('g1'), 1) self.assertEqual(switch.findport('l1'), 14) + @unittest.skipIf(_skipSwitchTests, 'slow') def test_vlan(self): switch = self.switch