Source code for pycopia.SNMP.traps

# vim:ts=4:sw=4:softtabstop=4:smarttab:expandtab
#    Copyright (C) 1999-  Keith Dart <>
#    This library is free software; you can redistribute it and/or
#    modify it under the terms of the GNU Lesser General Public
#    License as published by the Free Software Foundation; either
#    version 2.1 of the License, or (at your option) any later version.
#    This library is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    Lesser General Public License for more details.

from __future__ import absolute_import
from __future__ import print_function
#from __future__ import unicode_literals
from __future__ import division

SNMP trap dispatcher. This uses the straps program as a simple proxy to allow
non-root programs and scripts to receive traps.
See straps(8) for details on how this works.


import struct
from pycopia import socket
from pycopia.timelib import now

from pycopia.SNMP import BER_decode
from pycopia.SNMP.SNMP import SNMPv2TrapPDU, TimeTicks, ObjectIdentifier, IpAddress

from pycopia.mibs.SNMPv2_MIB import (sysUpTime, snmpTrapOID, snmpTrapEnterprise,
                            coldStart, warmStart, authenticationFailure)

from pycopia.mibs.IF_MIB import linkDown, linkUp
from pycopia.mibs.SNMP_COMMUNITY_MIB import snmpTrapAddress, snmpTrapCommunity
import collections

# egpNeighborLoss not in mibs! So fake it here. This should occur only
# rarely, anyway...
class _egpNeighborLoss(object):
    status = 1
    OID = ObjectIdentifier([1,3,6,1,6,3,1,1,5,6])
    def __init__(self, value=None):

# translate SNMPv1 generic trap ID to SNMPv2 object
    0: coldStart.OID,
    1: warmStart.OID,
    2: linkDown.OID,
    3: linkUp.OID,
    4: authenticationFailure.OID,
    6: _egpNeighborLoss.OID,

def _translate2v2(ip, community, pdu):
    """Translate an SNMPv1 trap to an SNMPv2 trap per RFC2576. This is
    done so that the rest of the API only has to deal with V2 traps.
    varbinds = pdu.varbinds
    # 3.1.(1)
    varbinds.insert(0, sysUpTime(pdu.time_stamp).varbind)
    # 3.1.(2)
    if pdu.generic == 6:  # enterpriseSpecific(6)
        trapoid = ObjectIdentifier(pdu.enterprise+[0, pdu.specific])
    else: # 3.1.(3)
        trapoid = _TRAPMAP[pdu.generic]
    varbinds.insert(1, snmpTrapOID(trapoid).varbind)
    # 3.1.(4)
    return SNMPv2TrapPDU(varbinds=varbinds)

# all trap callbacks should match this signature.
def _default_trap_handler(traprecord):

[docs]class TrapRecord(object): """Holder for SNMP Traps. """ def __init__(self, timestamp, ip, community, pdu): self.timestamp = timestamp self.ip = ip = community self.pdu = pdu def __str__(self): pdu = self.pdu trapoid = pdu.varbinds[1] s = ["Trap from %s with ID %s for %s at %s:" % (self.ip, pdu.request_id,, self.timestamp)] s.append(" Uptime: %s" % (pdu.varbinds[0],)) obj = trapoid.value.get_object() if obj: s.append(" Trap OID: %s (%s)" % (trapoid, obj.__name__)) else: s.append(" Trap OID: %s" % (trapoid,)) for vb in pdu.varbinds[2:]: if vb.Object: s.append(" %s (%s) = %s" % (vb.oid, vb.Object.__name__, vb.value)) else: s.append(" %s = %s" % (vb.oid, vb.value)) return "\n".join(s) def __repr__(self): return "TrapRecord(%r, %r, %r, %r)" % (self.timestamp, self.ip,, self.pdu)
class TrapDispatcher(socket.AsyncSocket): def __init__(self, traphandler=_default_trap_handler, port=162, debug=False): super(TrapDispatcher, self).__init__(socket.AF_UNIX, socket.SOCK_STREAM) self.traphandler = traphandler # should be a callable object if type(traphandler) is list: self._handlers = traphandler else: self._handlers = [traphandler] self._debug = debug self.connect("/tmp/.straps-%d" % port) def _get_debug(self): return self._debug def _set_debug(self, val): self._debug = bool(val) def _del_debug(self): self._debug = False debug = property(_get_debug, _set_debug, _del_debug) def register_handler(self, handler): if isinstance(handler, collections.Callable): self._handlers.append(handler) def readable(self): return True def writable(self): return False def priority(self): return False def read_handler(self): ip = struct.unpack("!I", self.recv(4))[0] # network byte-order port = struct.unpack("!H", self.recv(2))[0] # network byte-order length = struct.unpack("i", self.recv(4))[0] # host byte-order src = IpAddress(ip) src.port = port msg = self.recv(length) assert length == len(msg) tlv = BER_decode.get_tlv(msg) # should be community based message version, community, pdu = tlv.decode() if version == 0: pdu = _translate2v2(ip, community, pdu) tr = TrapRecord(now(), src, community, pdu) for handler in self._handlers: # handler returns False/None. If other handlers may run # return False. Return True if handled, and no further processing required. if handler(tr): break def handle_connect(self): pass def error_handler(self, ex, val, tb): if self._debug: from pycopia import debugger debugger.post_mortem(ex, val, tb) else: import traceback traceback.print_exception(ex, val, tb) def start_straps(port=162): # the daemonize and straps program source code is in the pycopia-utils # package. import os if port != 162: cmd = "daemonize -f /tmp/straps_%s.log straps %d" % (os.getpid(), port) else: cmd = "daemonize -f /tmp/straps_%s.log straps" % (os.getpid(),) rv = os.system(cmd) return rv
[docs]def get_dispatcher(*handlers): """Return a TrapDispatcher instance ready to respond to traps. """ from pycopia import scheduler from pycopia import asyncio start_straps() scheduler.sleep(2) dispatcher = TrapDispatcher(list(handlers)) asyncio.register(dispatcher) return dispatcher
if __name__ == "__main__": from pycopia import asyncio get_dispatcher([_default_trap_handler]) while True: asyncio.pause()