VLAN Manager tool
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

694 lines
18 KiB

  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. from pysnmp.hlapi import *
  4. from pysnmp.smi.builder import MibBuilder
  5. from pysnmp.smi.view import MibViewController
  6. import importlib
  7. import itertools
  8. import mock
  9. import random
  10. import unittest
  11. _mbuilder = MibBuilder()
  12. _mvc = MibViewController(_mbuilder)
  13. #import data
  14. # received packages
  15. # pvid: dot1qPvid
  16. #
  17. # tx packets:
  18. # dot1qVlanStaticEgressPorts
  19. # dot1qVlanStaticUntaggedPorts
  20. #
  21. # vlans:
  22. # dot1qVlanCurrentTable
  23. # lists ALL vlans, including baked in ones
  24. #
  25. # note that even though an snmpwalk of dot1qVlanStaticEgressPorts
  26. # skips over other vlans (only shows statics), the other vlans (1,2,3)
  27. # are still accessible via that oid
  28. #
  29. # LLDP:
  30. # 1.0.8802.1.1.2.1.4.1.1 aka LLDP-MIB, lldpRemTable
  31. class SwitchConfig(object):
  32. def __init__(self, host, community, vlanconf, ignports):
  33. self._host = host
  34. self._community = community
  35. self._vlanconf = vlanconf
  36. self._ignports = ignports
  37. @property
  38. def host(self):
  39. return self._host
  40. @property
  41. def community(self):
  42. return self._community
  43. @property
  44. def vlanconf(self):
  45. return self._vlanconf
  46. @property
  47. def ignports(self):
  48. return self._ignports
  49. def getportlist(self, lookupfun):
  50. '''Return a set of all the ports indexes in data.'''
  51. res = set()
  52. for id in self._vlanconf:
  53. res.update(self._vlanconf[id].get('u', []))
  54. res.update(self._vlanconf[id].get('t', []))
  55. # add in the ignore ports
  56. res.update(self.ignports)
  57. # filter out the strings
  58. strports = set(x for x in res if isinstance(x, str))
  59. res.update(lookupfun(x) for x in strports)
  60. res.difference_update(strports)
  61. return res
  62. def _octstrtobits(os):
  63. num = 1
  64. for i in str(os):
  65. num = (num << 8) | ord(i)
  66. return bin(num)[3:]
  67. def _intstobits(*ints):
  68. v = 0
  69. for i in ints:
  70. v |= 1 << i
  71. r = list(bin(v)[2:-1])
  72. r.reverse()
  73. return ''.join(r)
  74. def _cmpbits(a, b):
  75. try:
  76. last1a = a.rindex('1')
  77. except ValueError:
  78. last1a = -1
  79. a = ''
  80. try:
  81. last1b = b.rindex('1')
  82. except ValueError:
  83. last1b = -1
  84. b = ''
  85. if last1a != -1:
  86. a = a[:last1a + 1]
  87. if last1b != -1:
  88. b = b[:last1b + 1]
  89. return a == b
  90. import vlanmang
  91. def checkchanges(module):
  92. mod = importlib.import_module(module)
  93. mods = [ i for i in mod.__dict__.itervalues() if isinstance(i, vlanmang.SwitchConfig) ]
  94. res = []
  95. for i in mods:
  96. vlans = i.vlanconf.keys()
  97. switch = SNMPSwitch(i.host, i.community)
  98. portmapping = switch.getportmapping()
  99. invportmap = { y: x for x, y in portmapping.iteritems() }
  100. lufun = invportmap.__getitem__
  101. # get complete set of ports
  102. portlist = i.getportlist(lufun)
  103. ports = set(portmapping.iterkeys())
  104. # make sure switch agrees w/ them all
  105. if ports != portlist:
  106. raise ValueError('missing or extra ports found: %s' %
  107. `ports.symmetric_difference(portlist)`)
  108. # compare pvid
  109. pvidmap = getpvidmapping(i.vlanconf, lufun)
  110. switchpvid = switch.getpvid()
  111. res.extend(('setpvid', idx, vlan, switchpvid[idx]) for idx, vlan in
  112. pvidmap.iteritems() if switchpvid[idx] != vlan)
  113. # compare egress & untagged
  114. switchegress = switch.getegress(*vlans)
  115. egress = getegress(i.vlanconf, lufun)
  116. switchuntagged = switch.getuntagged(*vlans)
  117. untagged = getuntagged(i.vlanconf, lufun)
  118. for i in vlans:
  119. if not _cmpbits(switchegress[i], egress[i]):
  120. res.append(('setegress', i, egress[i], switchegress[i]))
  121. if not _cmpbits(switchuntagged[i], untagged[i]):
  122. res.append(('setuntagged', i, untagged[i], switchuntagged[i]))
  123. return res, switch
  124. def getidxs(lst, lookupfun):
  125. return [ lookupfun(i) if isinstance(i, str) else i for i in lst ]
  126. def getpvidmapping(data, lookupfun):
  127. '''Return a mapping from vlan based table to a port: vlan
  128. dictionary.'''
  129. res = []
  130. for id in data:
  131. for i in data[id].get('u', []):
  132. if isinstance(i, str):
  133. i = lookupfun(i)
  134. res.append((i, id))
  135. return dict(res)
  136. def getegress(data, lookupfun):
  137. r = {}
  138. for id in data:
  139. r[id] = _intstobits(*(getidxs(data[id].get('u', []),
  140. lookupfun) + getidxs(data[id].get('t', []), lookupfun)))
  141. return r
  142. def getuntagged(data, lookupfun):
  143. r = {}
  144. for id in data:
  145. r[id] = _intstobits(*getidxs(data[id].get('u', []), lookupfun))
  146. return r
  147. class SNMPSwitch(object):
  148. '''A class for manipulating switches via standard SNMP MIBs.'''
  149. def __init__(self, host, auth):
  150. self._eng = SnmpEngine()
  151. if isinstance(auth, str):
  152. self._cd = CommunityData(auth, mpModel=0)
  153. else:
  154. self._cd = auth
  155. self._targ = UdpTransportTarget((host, 161))
  156. def _getmany(self, *oids):
  157. woids = [ ObjectIdentity(*oid) for oid in oids ]
  158. [ oid.resolveWithMib(_mvc) for oid in woids ]
  159. errorInd, errorStatus, errorIndex, varBinds = \
  160. next(getCmd(self._eng, self._cd, self._targ, ContextData(), *(ObjectType(oid) for oid in woids)))
  161. if errorInd: # pragma: no cover
  162. raise ValueError(errorIndication)
  163. elif errorStatus:
  164. if str(errorStatus) == 'tooBig' and len(oids) > 1:
  165. # split the request in two
  166. pivot = len(oids) / 2
  167. a = self._getmany(*oids[:pivot])
  168. b = self._getmany(*oids[pivot:])
  169. return a + b
  170. raise ValueError('%s at %s' %
  171. (errorStatus.prettyPrint(), errorIndex and
  172. varBinds[int(errorIndex)-1][0] or '?'))
  173. else:
  174. if len(varBinds) != len(oids): # pragma: no cover
  175. raise ValueError('too many return values')
  176. return varBinds
  177. def _get(self, oid):
  178. varBinds = self._getmany(oid)
  179. varBind = varBinds[0]
  180. return varBind[1]
  181. def _set(self, oid, value):
  182. oid = ObjectIdentity(*oid)
  183. oid.resolveWithMib(_mvc)
  184. if isinstance(value, (int, long)):
  185. value = Integer(value)
  186. elif isinstance(value, str):
  187. value = OctetString(value)
  188. errorInd, errorStatus, errorIndex, varBinds = \
  189. next(setCmd(self._eng, self._cd, self._targ, ContextData(), ObjectType(oid, value)))
  190. if errorInd: # pragma: no cover
  191. raise ValueError(errorIndication)
  192. elif errorStatus: # pragma: no cover
  193. raise ValueError('%s at %s' %
  194. (errorStatus.prettyPrint(), errorIndex and
  195. varBinds[int(errorIndex)-1][0] or '?'))
  196. else:
  197. for varBind in varBinds:
  198. if varBind[1] != value: # pragma: no cover
  199. raise RuntimeError('failed to set: %s' % ' = '.join([x.prettyPrint() for x in varBind]))
  200. def _walk(self, *oid):
  201. oid = ObjectIdentity(*oid)
  202. # XXX - keep these, this might stop working, no clue what managed to magically make things work
  203. # ref: http://snmplabs.com/pysnmp/examples/smi/manager/browsing-mib-tree.html#mib-objects-to-pdu-var-binds
  204. # mibdump.py --mib-source '/Users/jmg/Nextcloud/Documents/user manuals/netgear/gs7xxt-v6.3.1.19-mibs' --mib-source /usr/share/snmp/mibs --rebuild rfc1212 pbridge vlan
  205. #oid.addAsn1MibSource('/usr/share/snmp/mibs', '/Users/jmg/Nextcloud/Documents/user manuals/netgear/gs7xxt-v6.3.1.19-mibs')
  206. oid.resolveWithMib(_mvc)
  207. for (errorInd, errorStatus, errorIndex, varBinds) in nextCmd(
  208. self._eng, self._cd, self._targ, ContextData(),
  209. ObjectType(oid),
  210. lexicographicMode=False):
  211. if errorInd: # pragma: no cover
  212. raise ValueError(errorInd)
  213. elif errorStatus: # pragma: no cover
  214. raise ValueError('%s at %s' % (errorStatus.prettyPrint(), errorIndex and varBinds[int(errorIndex)-1][0] or '?'))
  215. else:
  216. for varBind in varBinds:
  217. yield varBind
  218. def getportmapping(self):
  219. '''Return a port name mapping. Keys are the port index
  220. and the value is the name from the ifName entry.'''
  221. return { x[0][-1]: str(x[1]) for x in self._walk('IF-MIB', 'ifName') }
  222. def findport(self, name):
  223. '''Look up a port name and return it's port index. This
  224. looks up via the ifName table in IF-MIB.'''
  225. return [ x[0][-1] for x in self._walk('IF-MIB', 'ifName') if str(x[1]) == name ][0]
  226. def getvlanname(self, vlan):
  227. '''Return the name for the vlan.'''
  228. v = self._get(('Q-BRIDGE-MIB', 'dot1qVlanStaticName', vlan))
  229. return str(v).decode('utf-8')
  230. def createvlan(self, vlan, name):
  231. # createAndGo(4)
  232. self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticRowStatus',
  233. int(vlan)), 4)
  234. self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticName', int(vlan)),
  235. name)
  236. def deletevlan(self, vlan):
  237. self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticRowStatus',
  238. int(vlan)), 6) # destroy(6)
  239. def getvlans(self):
  240. '''Return an iterator with all the vlan ids.'''
  241. return (x[0][-1] for x in self._walk('Q-BRIDGE-MIB', 'dot1qVlanStatus'))
  242. def staticvlans(self):
  243. '''Return an iterator of the staticly defined/configured
  244. vlans. This sometimes excludes special built in vlans,
  245. like vlan 1.'''
  246. return (x[0][-1] for x in self._walk('Q-BRIDGE-MIB', 'dot1qVlanStaticName'))
  247. def getpvid(self):
  248. '''Returns a dictionary w/ the interface index as the key,
  249. and the pvid of the interface.'''
  250. return { x[0][-1]: int(x[1]) for x in self._walk('Q-BRIDGE-MIB', 'dot1qPvid') }
  251. def setpvid(self, port, vlan):
  252. self._set(('Q-BRIDGE-MIB', 'dot1qPvid', int(port)), Gauge32(vlan))
  253. def getegress(self, *vlans):
  254. r = { x[-1]: _octstrtobits(y) for x, y in
  255. self._getmany(*(('Q-BRIDGE-MIB',
  256. 'dot1qVlanStaticEgressPorts', x) for x in vlans)) }
  257. return r
  258. def setegress(self, vlan, ports):
  259. value = OctetString.fromBinaryString(ports)
  260. self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticEgressPorts',
  261. int(vlan)), value)
  262. def getuntagged(self, *vlans):
  263. r = { x[-1]: _octstrtobits(y) for x, y in
  264. self._getmany(*(('Q-BRIDGE-MIB',
  265. 'dot1qVlanStaticUntaggedPorts', x) for x in vlans)) }
  266. return r
  267. def setuntagged(self, vlan, ports):
  268. value = OctetString.fromBinaryString(ports)
  269. self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticUntaggedPorts',
  270. int(vlan)), value)
  271. if __name__ == '__main__': # pragma: no cover
  272. import pprint
  273. import sys
  274. changes, switch = checkchanges('data')
  275. if not changes:
  276. print 'No changes to apply.'
  277. sys.exit(0)
  278. pprint.pprint(changes)
  279. res = raw_input('Apply the changes? (type yes to apply): ')
  280. if res != 'yes':
  281. print 'not applying changes.'
  282. sys.exit(1)
  283. print 'applying...'
  284. failed = []
  285. for verb, arg1, arg2, oldarg in changes:
  286. print '%s: %s %s' % (verb, arg1, `arg2`)
  287. try:
  288. fun = getattr(switch, verb)
  289. fun(arg1, arg2)
  290. pass
  291. except Exception as e:
  292. print 'failed'
  293. failed.append((verb, arg1, arg2, e))
  294. if failed:
  295. print '%d failed to apply, they are:' % len(failed)
  296. for verb, arg1, arg2, e in failed:
  297. print '%s: %s %s: %s' % (verb, arg1, arg2, `e`)
  298. class _TestMisc(unittest.TestCase):
  299. def setUp(self):
  300. import test_data
  301. self._test_data = test_data
  302. def test_intstobits(self):
  303. self.assertEqual(_intstobits(1, 5, 10), '1000100001')
  304. self.assertEqual(_intstobits(3, 4, 9), '001100001')
  305. def test_octstrtobits(self):
  306. self.assertEqual(_octstrtobits('\x00'), '0' * 8)
  307. self.assertEqual(_octstrtobits('\xff'), '1' * 8)
  308. self.assertEqual(_octstrtobits('\xf0'), '1' * 4 + '0' * 4)
  309. self.assertEqual(_octstrtobits('\x0f'), '0' * 4 + '1' * 4)
  310. def test_cmpbits(self):
  311. self.assertTrue(_cmpbits('111000', '111'))
  312. self.assertTrue(_cmpbits('000111000', '000111'))
  313. self.assertTrue(_cmpbits('11', '11'))
  314. self.assertTrue(_cmpbits('0', '000'))
  315. self.assertFalse(_cmpbits('0011', '11'))
  316. self.assertFalse(_cmpbits('11', '0011'))
  317. self.assertFalse(_cmpbits('10', '000'))
  318. self.assertFalse(_cmpbits('0', '1000'))
  319. self.assertFalse(_cmpbits('00010', '000'))
  320. self.assertFalse(_cmpbits('0', '001000'))
  321. def test_pvidegressuntagged(self):
  322. data = {
  323. 1: {
  324. 'u': [ 1, 5, 10 ] + range(13, 20),
  325. 't': [ 'lag2', 6, 7 ],
  326. },
  327. 10: {
  328. 'u': [ 2, 3, 6, 7, 8, 'lag2' ],
  329. },
  330. 13: {
  331. 'u': [ 4, 9 ],
  332. 't': [ 'lag2', 6, 7 ],
  333. },
  334. 14: {
  335. 't': [ 'lag2' ],
  336. },
  337. }
  338. swconf = SwitchConfig('', '', data, [ 'lag3' ])
  339. lookup = {
  340. 'lag2': 30,
  341. 'lag3': 31,
  342. }
  343. lufun = lookup.__getitem__
  344. check = dict(itertools.chain(enumerate([ 1, 10, 10, 13, 1, 10,
  345. 10, 10, 13, 1 ], 1), enumerate([ 1 ] * 7, 13),
  346. [ (30, 10) ]))
  347. # That a pvid mapping
  348. res = getpvidmapping(data, lufun)
  349. # is correct
  350. self.assertEqual(res, check)
  351. self.assertEqual(swconf.getportlist(lufun),
  352. set(xrange(1, 11)) | set(xrange(13, 20)) | set(lookup.values()))
  353. checkegress = {
  354. 1: '1000111001001111111' + '0' * (30 - 20) + '1',
  355. 10: '01100111' + '0' * (30 - 9) + '1',
  356. 13: '000101101' + '0' * (30 - 10) + '1',
  357. 14: '0' * (30 - 1) + '1',
  358. }
  359. self.assertEqual(getegress(data, lufun), checkegress)
  360. checkuntagged = {
  361. 1: '1000100001001111111',
  362. 10: '01100111' + '0' * (30 - 9) + '1',
  363. 13: '000100001',
  364. 14: '',
  365. }
  366. self.assertEqual(getuntagged(data, lufun), checkuntagged)
  367. #@unittest.skip('foo')
  368. @mock.patch('vlanmang.SNMPSwitch.getuntagged')
  369. @mock.patch('vlanmang.SNMPSwitch.getegress')
  370. @mock.patch('vlanmang.SNMPSwitch.getpvid')
  371. @mock.patch('vlanmang.SNMPSwitch.getportmapping')
  372. @mock.patch('importlib.import_module')
  373. def test_checkchanges(self, imprt, portmapping, gpvid, gegress, guntagged):
  374. # that import returns the test data
  375. imprt.side_effect = itertools.repeat(self._test_data)
  376. # that getportmapping returns the following dict
  377. ports = { x: 'g%d' % x for x in xrange(1, 24) }
  378. ports[30] = 'lag1'
  379. ports[31] = 'lag2'
  380. ports[32] = 'lag3'
  381. portmapping.side_effect = itertools.repeat(ports)
  382. # that the switch's pvid returns
  383. spvid = { x: 283 for x in xrange(1, 24) }
  384. spvid[30] = 5
  385. gpvid.side_effect = itertools.repeat(spvid)
  386. # the the extra port is caught
  387. self.assertRaises(ValueError, checkchanges, 'data')
  388. # that the functions were called
  389. imprt.assert_called_with('data')
  390. portmapping.assert_called()
  391. # XXX - check that an ignore statement is honored
  392. # delete the extra port
  393. del ports[32]
  394. # that the egress data provided
  395. gegress.side_effect = [ {
  396. 1: '1' * 10,
  397. 5: '1' * 10,
  398. 283: '00000000111111111110011000000100000',
  399. } ]
  400. # that the untagged data provided
  401. guntagged.side_effect = [ {
  402. 1: '1' * 10,
  403. 5: '1' * 8 + '0' * 10,
  404. 283: '00000000111111111110011',
  405. } ]
  406. res, switch = checkchanges('data')
  407. self.assertIsInstance(switch, SNMPSwitch)
  408. validres = [ ('setpvid', x, 5, 283) for x in xrange(1, 9) ] + \
  409. [ ('setpvid', 20, 1, 283),
  410. ('setpvid', 21, 1, 283),
  411. ('setpvid', 30, 1, 5),
  412. ('setegress', 1, '0' * 19 + '11' + '0' * 8 + '1', '1' * 10),
  413. ('setuntagged', 1, '0' * 19 + '11' + '0' * 8 + '1', '1' * 10),
  414. ('setegress', 5, '1' * 8 + '0' * 11 + '11' + '0' * 8 + '1', '1' * 10),
  415. ]
  416. self.assertEqual(set(res), set(validres))
  417. class _TestSNMPSwitch(unittest.TestCase):
  418. def test_splitmany(self):
  419. # make sure that if we get a tooBig error that we split the
  420. # _getmany request
  421. switch = SNMPSwitch(None, None)
  422. @mock.patch('vlanmang.SNMPSwitch._getmany')
  423. def test_get(self, gm):
  424. # that a switch
  425. switch = SNMPSwitch(None, None)
  426. # when _getmany returns this structure
  427. retval = object()
  428. gm.side_effect = [[[ None, retval ]]]
  429. arg = object()
  430. # will return the correct value
  431. self.assertIs(switch._get(arg), retval)
  432. # and call _getmany w/ the correct arg
  433. gm.assert_called_with(arg)
  434. @mock.patch('pysnmp.hlapi.ContextData')
  435. @mock.patch('vlanmang.getCmd')
  436. def test_getmany(self, gc, cd):
  437. # that a switch
  438. switch = SNMPSwitch(None, None)
  439. # when getCmd returns tooBig when too many oids are asked for
  440. def custgetcmd(eng, cd, targ, contextdata, *oids):
  441. # induce a too big error
  442. if len(oids) > 3:
  443. res = ( None, 'tooBig', None, None )
  444. else:
  445. #import pdb; pdb.set_trace()
  446. [ oid.resolveWithMib(_mvc) for oid in oids ]
  447. res = ( None, None, None, oids )
  448. return iter([res])
  449. gc.side_effect = custgetcmd
  450. #import pdb; pdb.set_trace()
  451. res = switch.getegress(*xrange(1, 10))
  452. # will still return the complete set of results
  453. self.assertEqual(res, { x: '' for x in xrange(1, 10) })
  454. _skipSwitchTests = True
  455. class _TestSwitch(unittest.TestCase):
  456. def setUp(self):
  457. # If we don't have it, pretend it's true for now and
  458. # we'll recheck it later
  459. model = 'GS108T smartSwitch'
  460. if getattr(self, 'switchmodel', model) != model or \
  461. _skipSwitchTests: # pragma: no cover
  462. self.skipTest('Need a GS108T switch to run these tests')
  463. args = open('test.creds').read().split()
  464. self.switch = SNMPSwitch(*args)
  465. self.switchmodel = self.switch._get(('ENTITY-MIB',
  466. 'entPhysicalModelName', 1))
  467. if self.switchmodel != model: # pragma: no cover
  468. self.skipTest('Need a GS108T switch to run these tests')
  469. def test_misc(self):
  470. switch = self.switch
  471. self.assertEqual(switch.findport('g1'), 1)
  472. self.assertEqual(switch.findport('l1'), 14)
  473. def test_portnames(self):
  474. switch = self.switch
  475. resp = dict((x, 'g%d' % x) for x in xrange(1, 9))
  476. resp.update({ 13: 'cpu' })
  477. resp.update((x, 'l%d' % (x - 13)) for x in xrange(14, 18))
  478. self.assertEqual(switch.getportmapping(), resp)
  479. def test_egress(self):
  480. switch = self.switch
  481. egress = switch.getegress(1, 2, 3)
  482. checkegress = {
  483. 1: '1' * 8 + '0' * 5 + '1' * 4 + '0' * 23,
  484. 2: '0' * 8 * 5,
  485. 3: '0' * 8 * 5,
  486. }
  487. self.assertEqual(egress, checkegress)
  488. def test_untagged(self):
  489. switch = self.switch
  490. untagged = switch.getuntagged(1, 2, 3)
  491. checkuntagged = {
  492. 1: '1' * 8 * 5,
  493. 2: '1' * 8 * 5,
  494. 3: '1' * 8 * 5,
  495. }
  496. self.assertEqual(untagged, checkuntagged)
  497. def test_vlan(self):
  498. switch = self.switch
  499. existingvlans = set(switch.getvlans())
  500. while True:
  501. testvlan = random.randint(1,4095)
  502. if testvlan not in existingvlans:
  503. break
  504. # Test that getting a non-existant vlans raises an exception
  505. self.assertRaises(ValueError, switch.getvlanname, testvlan)
  506. self.assertTrue(set(switch.staticvlans()).issubset(existingvlans))
  507. pvidres = { x: 1 for x in xrange(1, 9) }
  508. pvidres.update({ x: 1 for x in xrange(14, 18) })
  509. self.assertEqual(switch.getpvid(), pvidres)
  510. testname = 'Sometestname'
  511. # Create test vlan
  512. switch.createvlan(testvlan, testname)
  513. testport = None
  514. try:
  515. # make sure the test vlan was created
  516. self.assertIn(testvlan, set(switch.staticvlans()))
  517. self.assertEqual(testname, switch.getvlanname(testvlan))
  518. switch.setegress(testvlan, '00100')
  519. pvidmap = switch.getpvid()
  520. testport = 3
  521. egressports = switch.getegress(testvlan)
  522. self.assertEqual(egressports[testvlan], '00100000' + '0' * 8 * 4)
  523. switch.setuntagged(testvlan, '00100')
  524. untaggedports = switch.getuntagged(testvlan)
  525. self.assertEqual(untaggedports[testvlan], '00100000' + '0' * 8 * 4)
  526. switch.setpvid(testport, testvlan)
  527. self.assertEqual(switch.getpvid()[testport], testvlan)
  528. finally:
  529. if testport:
  530. switch.setpvid(testport, pvidmap[3])
  531. switch.deletevlan(testvlan)