|
|
@@ -215,35 +215,36 @@ def checkchanges(module): |
|
|
|
''' |
|
|
|
|
|
|
|
mod = importlib.import_module(module) |
|
|
|
mods = [ (k, v) for k, v in mod.__dict__.iteritems() if isinstance(v, vlanmang.SwitchConfig) ] |
|
|
|
mods = [ (k, v) for k, v in mod.__dict__.items() if isinstance(v, |
|
|
|
vlanmang.SwitchConfig) ] |
|
|
|
|
|
|
|
res = [] |
|
|
|
|
|
|
|
for name, i in mods: |
|
|
|
#print 'probing %s' % `name` |
|
|
|
#print('probing %s' % repr(name)) |
|
|
|
vlans = i.vlanconf.keys() |
|
|
|
switch = SNMPSwitch(i.host, **i.authargs) |
|
|
|
switchpvid = switch.getpvid() |
|
|
|
portmapping = switch.getportmapping() |
|
|
|
invportmap = { y: x for x, y in portmapping.iteritems() } |
|
|
|
invportmap = { y: x for x, y in portmapping.items() } |
|
|
|
lufun = invportmap.__getitem__ |
|
|
|
|
|
|
|
# get complete set of ports |
|
|
|
portlist = i.getportlist(lufun) |
|
|
|
|
|
|
|
ports = set(portmapping.iterkeys()) |
|
|
|
ports = set(portmapping.keys()) |
|
|
|
|
|
|
|
# make sure switch agrees w/ them all |
|
|
|
if ports != portlist: |
|
|
|
raise ValueError('missing or extra ports found: %s' % |
|
|
|
`ports.symmetric_difference(portlist)`) |
|
|
|
repr(ports.symmetric_difference(portlist))) |
|
|
|
|
|
|
|
# compare pvid |
|
|
|
pvidmap = getpvidmapping(i.vlanconf, lufun) |
|
|
|
switchpvid = switch.getpvid() |
|
|
|
|
|
|
|
res.extend((switch, name, 'setpvid', idx, vlan, switchpvid[idx]) for idx, vlan in |
|
|
|
pvidmap.iteritems() if switchpvid[idx] != vlan) |
|
|
|
pvidmap.items() if switchpvid[idx] != vlan) |
|
|
|
|
|
|
|
# compare egress & untagged |
|
|
|
switchegress = switch.getegress(*vlans) |
|
|
@@ -361,7 +362,7 @@ class SNMPSwitch(object): |
|
|
|
self._targ = UdpTransportTarget((host, 161)) |
|
|
|
|
|
|
|
def __repr__(self): # pragma: no cover |
|
|
|
return '<SNMPSwitch: auth=%s, targ=%s>' % (`self._auth`, `self._targ`) |
|
|
|
return '<SNMPSwitch: auth=%s, targ=%s>' % (repr(self._auth), repr(self._targ)) |
|
|
|
|
|
|
|
def _getmany(self, *oids): |
|
|
|
woids = [ ObjectIdentity(*oid) for oid in oids ] |
|
|
@@ -376,7 +377,7 @@ class SNMPSwitch(object): |
|
|
|
elif errorStatus: |
|
|
|
if str(errorStatus) == 'tooBig' and len(oids) > 1: |
|
|
|
# split the request in two |
|
|
|
pivot = len(oids) / 2 |
|
|
|
pivot = len(oids) // 2 |
|
|
|
a = self._getmany(*oids[:pivot]) |
|
|
|
b = self._getmany(*oids[pivot:]) |
|
|
|
return a + b |
|
|
@@ -404,9 +405,9 @@ class SNMPSwitch(object): |
|
|
|
oid = ObjectIdentity(*oid) |
|
|
|
oid.resolveWithMib(_mvc) |
|
|
|
|
|
|
|
if isinstance(value, (int, long)): |
|
|
|
if isinstance(value, int): |
|
|
|
value = Integer(value) |
|
|
|
elif isinstance(value, str): |
|
|
|
elif isinstance(value, bytes): |
|
|
|
value = OctetString(value) |
|
|
|
|
|
|
|
errorInd, errorStatus, errorIndex, varBinds = \ |
|
|
@@ -467,14 +468,17 @@ class SNMPSwitch(object): |
|
|
|
|
|
|
|
v = self._get(('Q-BRIDGE-MIB', 'dot1qVlanStaticName', vlan)) |
|
|
|
|
|
|
|
return str(v).decode('utf-8') |
|
|
|
if v is None: |
|
|
|
return v |
|
|
|
|
|
|
|
return bytes(v).decode('utf-8') |
|
|
|
|
|
|
|
def createvlan(self, vlan, name): |
|
|
|
# createAndGo(4) |
|
|
|
self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticRowStatus', |
|
|
|
int(vlan)), 4) |
|
|
|
self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticName', int(vlan)), |
|
|
|
name) |
|
|
|
name.encode('utf-8')) |
|
|
|
|
|
|
|
def deletevlan(self, vlan): |
|
|
|
self._set(('Q-BRIDGE-MIB', 'dot1qVlanStaticRowStatus', |
|
|
@@ -556,37 +560,37 @@ def main(): |
|
|
|
changes = checkchanges('data') |
|
|
|
|
|
|
|
if not changes: |
|
|
|
print 'No changes to apply.' |
|
|
|
print('No changes to apply.') |
|
|
|
sys.exit(0) |
|
|
|
|
|
|
|
pprint.pprint([ x[1:] for x in changes ]) |
|
|
|
|
|
|
|
res = raw_input('Apply the changes? (type yes to apply): ') |
|
|
|
if res != 'yes': |
|
|
|
print 'not applying changes.' |
|
|
|
print('not applying changes.') |
|
|
|
sys.exit(1) |
|
|
|
|
|
|
|
print 'applying...' |
|
|
|
print('applying...') |
|
|
|
failed = [] |
|
|
|
prevname = None |
|
|
|
for switch, name, verb, arg1, arg2, oldarg in changes: |
|
|
|
if prevname != name: |
|
|
|
print 'Configuring switch %s...' % `name` |
|
|
|
print('Configuring switch %s...' % repr(name)) |
|
|
|
prevname = name |
|
|
|
|
|
|
|
print '%s: %s %s' % (verb, arg1, `arg2`) |
|
|
|
print('%s: %s %s' % (verb, arg1, repr(arg2))) |
|
|
|
try: |
|
|
|
fun = getattr(switch, verb) |
|
|
|
fun(arg1, arg2) |
|
|
|
pass |
|
|
|
except Exception as e: |
|
|
|
print 'failed' |
|
|
|
print('failed') |
|
|
|
failed.append((verb, arg1, arg2, e)) |
|
|
|
|
|
|
|
if failed: |
|
|
|
print '%d failed to apply, they are:' % len(failed) |
|
|
|
print('%d failed to apply, they are:' % len(failed)) |
|
|
|
for verb, arg1, arg2, e in failed: |
|
|
|
print '%s: %s %s: %s' % (verb, arg1, arg2, `e`) |
|
|
|
print('%s: %s %s: %s' % (verb, arg1, arg2, repr(e))) |
|
|
|
|
|
|
|
if __name__ == '__main__': # pragma: no cover |
|
|
|
main() |
|
|
@@ -622,7 +626,7 @@ class _TestMisc(unittest.TestCase): |
|
|
|
def test_pvidegressuntagged(self): |
|
|
|
data = { |
|
|
|
1: { |
|
|
|
'u': [ 1, 5, 10 ] + range(13, 20), |
|
|
|
'u': [ 1, 5, 10 ] + list(range(13, 20)), |
|
|
|
't': [ 'lag2', 6, 7 ], |
|
|
|
}, |
|
|
|
10: { |
|
|
@@ -655,7 +659,7 @@ class _TestMisc(unittest.TestCase): |
|
|
|
self.assertEqual(res, check) |
|
|
|
|
|
|
|
self.assertEqual(swconf.getportlist(lufun), |
|
|
|
set(xrange(1, 11)) | set(xrange(13, 20)) | |
|
|
|
set(range(1, 11)) | set(range(13, 20)) | |
|
|
|
set(lookup.values())) |
|
|
|
|
|
|
|
checkegress = { |
|
|
@@ -784,14 +788,14 @@ class _TestMisc(unittest.TestCase): |
|
|
|
imprt.side_effect = itertools.repeat(self._test_data) |
|
|
|
|
|
|
|
# that getportmapping returns the following dict |
|
|
|
ports = { x: 'g%d' % x for x in xrange(1, 24) } |
|
|
|
ports = { x: 'g%d' % x for x in range(1, 24) } |
|
|
|
ports[30] = 'lag1' |
|
|
|
ports[31] = 'lag2' |
|
|
|
ports[32] = 'lag3' |
|
|
|
portmapping.side_effect = itertools.repeat(ports) |
|
|
|
|
|
|
|
# that the switch's pvid returns |
|
|
|
spvid = { x: 283 for x in xrange(1, 24) } |
|
|
|
spvid = { x: 283 for x in range(1, 24) } |
|
|
|
spvid[30] = 5 |
|
|
|
gpvid.side_effect = itertools.repeat(spvid) |
|
|
|
|
|
|
@@ -831,7 +835,7 @@ class _TestMisc(unittest.TestCase): |
|
|
|
self.assertTrue(all(x[1] == 'distswitch' for x in res)) |
|
|
|
|
|
|
|
res = [ x[2:] for x in res ] |
|
|
|
validres = [ ('setpvid', x, 5, 283) for x in xrange(1, 9) ] + \ |
|
|
|
validres = [ ('setpvid', x, 5, 283) for x in range(1, 9) ] + \ |
|
|
|
[ ('setpvid', 20, 1, 283), |
|
|
|
('setpvid', 21, 1, 283), |
|
|
|
('setpvid', 30, 1, 5), |
|
|
@@ -869,7 +873,7 @@ class _TestSNMPSwitch(unittest.TestCase): |
|
|
|
# that a switch |
|
|
|
switch = SNMPSwitch(None, community=None) |
|
|
|
|
|
|
|
lookup = { x: chr(x) for x in xrange(1, 10) } |
|
|
|
lookup = { x: chr(x) for x in range(1, 10) } |
|
|
|
|
|
|
|
# when getCmd returns tooBig when too many oids are asked for |
|
|
|
def custgetcmd(eng, cd, targ, contextdata, *oids): |
|
|
@@ -889,11 +893,11 @@ class _TestSNMPSwitch(unittest.TestCase): |
|
|
|
gc.side_effect = custgetcmd |
|
|
|
|
|
|
|
#import pdb; pdb.set_trace() |
|
|
|
res = switch.getegress(*xrange(1, 10)) |
|
|
|
res = switch.getegress(*range(1, 10)) |
|
|
|
|
|
|
|
# will still return the complete set of results |
|
|
|
self.assertEqual(res, { x: _octstrtobits(lookup[x]) for x in |
|
|
|
xrange(1, 10) }) |
|
|
|
range(1, 10) }) |
|
|
|
|
|
|
|
_skipSwitchTests = False |
|
|
|
|
|
|
@@ -901,7 +905,8 @@ class _TestSwitch(unittest.TestCase): |
|
|
|
def setUp(self): |
|
|
|
# If we don't have it, pretend it's true for now and |
|
|
|
# we'll recheck it later |
|
|
|
model = 'GS108T smartSwitch' |
|
|
|
# PySNMP sucks, and doesn't do proper unicode compares |
|
|
|
model = b'GS108T smartSwitch' |
|
|
|
if getattr(self, 'switchmodel', model) != model or \ |
|
|
|
_skipSwitchTests: # pragma: no cover |
|
|
|
self.skipTest('Need a GS108T switch to run these tests') |
|
|
@@ -926,9 +931,9 @@ class _TestSwitch(unittest.TestCase): |
|
|
|
def test_portnames(self): |
|
|
|
switch = self.switch |
|
|
|
|
|
|
|
resp = dict((x, 'g%d' % x) for x in xrange(1, 9)) |
|
|
|
resp = dict((x, 'g%d' % x) for x in range(1, 9)) |
|
|
|
resp.update({ 13: 'cpu' }) |
|
|
|
resp.update((x, 'l%d' % (x - 13)) for x in xrange(14, 18)) |
|
|
|
resp.update((x, 'l%d' % (x - 13)) for x in range(14, 18)) |
|
|
|
|
|
|
|
self.assertEqual(switch.getportmapping(), resp) |
|
|
|
|
|
|
@@ -947,8 +952,9 @@ class _TestSwitch(unittest.TestCase): |
|
|
|
|
|
|
|
self.assertTrue(set(switch.staticvlans()).issubset(existingvlans)) |
|
|
|
|
|
|
|
pvidres = { x: 1 for x in xrange(1, 9) } |
|
|
|
pvidres.update({ x: 1 for x in xrange(14, 18) }) |
|
|
|
pvidres = { x: 1 for x in range(1, 9) } |
|
|
|
pvidres.update({ x: 1 for x in range(14, 18) }) |
|
|
|
pvidres[4] = 12 |
|
|
|
|
|
|
|
rpvidres = switch.getpvid() |
|
|
|
|
|
|
|