From 55d9a7cda84d2522a99838b61d103ad5c79e90fe Mon Sep 17 00:00:00 2001 From: John-Mark Gurney Date: Mon, 23 Sep 2019 12:41:40 -0700 Subject: [PATCH] add docs.. refactor to use a helper function instead of rolling it's own.. also make sure getegress fully works.. more tests need to be written.. --- vlanmang.py | 92 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 78 insertions(+), 14 deletions(-) diff --git a/vlanmang.py b/vlanmang.py index 9d37474..8e78dac 100644 --- a/vlanmang.py +++ b/vlanmang.py @@ -35,6 +35,29 @@ _mvc = MibViewController(_mbuilder) # 1.0.8802.1.1.2.1.4.1.1 aka LLDP-MIB, lldpRemTable class SwitchConfig(object): + '''This is a simple object to store switch configuration for + the checkchanges() function. + + host -- The host of the switch you are maintaining configuration of. + + community -- Either the SNMPv1 community name or a + pysnmp.hlapi.UsmUserData object, either of which can write to the + necessary MIBs to configure the VLANs of the switch. + + vlanconf -- This is a dictionary w/ vlans as the key. Each value has + a dictionary that contains keys, 'u' or 't', each of which + contains the port that traffic should be sent untagged ('u') or + tagged ('t'). Note that the Pvid (vlan of traffic that is + received when untagged), is set to match the 'u' definition. The + port is either an integer, which maps directly to the switch's + index number, or it can be a string, which will be looked up via + the IF-MIB::ifName table. + + ignports -- Ports that will be ignored and not required to be + configured. List any ports that will not be active here, such as + any unused lag ports. + ''' + def __init__(self, host, community, vlanconf, ignports): self._host = host self._community = community @@ -58,33 +81,40 @@ class SwitchConfig(object): return self._ignports def getportlist(self, lookupfun): - '''Return a set of all the ports indexes in data.''' + '''Return a set of all the ports indexes in data. This + includes, both vlanconf and ignports. Any ports using names + will be resolved by being passed to the provided lookupfun.''' - res = set() + res = [] for id in self._vlanconf: - res.update(self._vlanconf[id].get('u', [])) - res.update(self._vlanconf[id].get('t', [])) + res.extend(self._vlanconf[id].get('u', [])) + res.extend(self._vlanconf[id].get('t', [])) # add in the ignore ports - res.update(self.ignports) - - # filter out the strings - strports = set(x for x in res if isinstance(x, str)) + res.extend(self.ignports) - res.update(lookupfun(x) for x in strports) - res.difference_update(strports) + # eliminate dups so that lookupfun isn't called as often + res = set(res) - return res + return set(getidxs(res, lookupfun)) def _octstrtobits(os): - num = 1 + '''Convert a string into a list of bits. Easier to figure out what + ports are set.''' + + num = 1 # leading 1 to make sure leading zeros are not stripped for i in str(os): num = (num << 8) | ord(i) return bin(num)[3:] def _intstobits(*ints): + '''Convert the int args to a string of bits in the expected format + that SNMP expects for them. The results will be a string of '1's + and '0's where the first one represents 1, and second one + representing 2 and so on.''' + v = 0 for i in ints: v |= 1 << i @@ -95,6 +125,9 @@ def _intstobits(*ints): return ''.join(r) def _cmpbits(a, b): + '''Compare two strings of bits to make sure they are equal. + Trailing 0's are ignored.''' + try: last1a = a.rindex('1') except ValueError: @@ -116,6 +149,20 @@ def _cmpbits(a, b): import vlanmang def checkchanges(module): + '''Function to check for any differences between the switch, and the + configured state. + + The parameter module is a string to the name of a python module. It + will be imported, and any names that reference a vlanmang.SwitchConfig + class will be validate that the configuration matches. If it does not, + the returned list will contain a set of tuples, each one containing + (verb, arg1, arg2, switcharg2). verb is what needs to be changed. + arg1 is either the port (for setting Pvid), or the VLAN that needs to + be configured. arg2 is what it needs to be set to. switcharg2 is + what the switch is currently configured to, so that you can easily + see what the effect of the configuration change is. + ''' + mod = importlib.import_module(module) mods = [ i for i in mod.__dict__.itervalues() if isinstance(i, vlanmang.SwitchConfig) ] @@ -159,11 +206,20 @@ def checkchanges(module): return res, switch def getidxs(lst, lookupfun): + '''Take a list of ports, and if any are a string, replace them w/ + the value returned by lookupfun(s). + + Note that duplicates are not detected or removed, both in the + original list, and the values returned by the lookup function + may duplicate other values in the list.''' + return [ lookupfun(i) if isinstance(i, str) else i for i in lst ] def getpvidmapping(data, lookupfun): '''Return a mapping from vlan based table to a port: vlan - dictionary.''' + dictionary. This only looks at that untagged part of the vlan + configuration, and is used for finding what a port's Pvid should + be.''' res = [] for id in data: @@ -175,6 +231,10 @@ def getpvidmapping(data, lookupfun): return dict(res) def getegress(data, lookupfun): + '''Return a dictionary, keyed by VLAN id with a string of the ports + that need to be enagled for egress. This include both tagged and + untagged traffic.''' + r = {} for id in data: r[id] = _intstobits(*(getidxs(data[id].get('u', []), @@ -561,6 +621,8 @@ class _TestSNMPSwitch(unittest.TestCase): # that a switch switch = SNMPSwitch(None, None) + lookup = { x: chr(x) for x in xrange(1, 10) } + # when getCmd returns tooBig when too many oids are asked for def custgetcmd(eng, cd, targ, contextdata, *oids): # induce a too big error @@ -569,6 +631,8 @@ class _TestSNMPSwitch(unittest.TestCase): else: #import pdb; pdb.set_trace() [ oid.resolveWithMib(_mvc) for oid in oids ] + oids = [ ObjectType(x[0], OctetString(lookup[x[0][-1]])) for x in oids ] + [ oid.resolveWithMib(_mvc) for oid in oids ] res = ( None, None, None, oids ) return iter([res]) @@ -579,7 +643,7 @@ class _TestSNMPSwitch(unittest.TestCase): res = switch.getegress(*xrange(1, 10)) # will still return the complete set of results - self.assertEqual(res, { x: '' for x in xrange(1, 10) }) + self.assertEqual(res, { x: _octstrtobits(lookup[x]) for x in xrange(1, 10) }) _skipSwitchTests = True