#!/usr/bin/env python # College of Creative Studies # ARP # # Douglas Thrift # # $Id$ import ctypes, ctypes.util import fnmatch import ipaddr import os import re import socket import subprocess import sys class ARP(object): def __call__(self, address = None, interface = None, numbers = False): raise NotImplementedError("Unknown platform: '%s'." % sys.platform) def __str__(self): return str(self.__class__.__name__) class CommandARP(ARP): def __init__(self): if sys.platform.startswith('linux'): self.entry = re.compile(r'^(.+) \(([0-9]{1,3}(?:\.[0-9]{1,3}){3})\) at ([0-9a-f]{2}(?::[0-9a-f]{2}){5}) \[.+\] on [a-z0-9]+$') else: self.entry = re.compile(r'^(.+) \(([0-9]{1,3}(?:\.[0-9]{1,3}){3})\) at ([0-9a-f]{2}(?::[0-9a-f]{2}){5}|\(incomplete\)) on [a-z0-9]+(?: expires in [0-9]+ seconds| expired| permanent)? \[.+\]$') def __call__(self, address = None, interface = None, numbers = False): args = ['arp'] if numbers: args.append('-n') if interface is not None: args += ['-i', str(interface)] if address is not None: if sys.platform.startswith('linux'): args.append('-a') args.append(str(address)) else: args.append('-a') try: arp = subprocess.Popen(args, stdout = subprocess.PIPE, close_fds = True) for line in arp.stdout: match = self.entry.match(line) if match is not None: mac_address = match.group(3) if mac_address == '(incomplete)': mac_address = None host_name = match.group(1) if numbers or host_name == '?': host_name = None yield (mac_address, ipaddr.IPAddress(match.group(2)), host_name) finally: arp.stdout.close() if arp.wait() != 0: raise subprocess.CalledProcessError(arp.returncode, args) class in_addr(ctypes.BigEndianStructure): _fields_ = [ ('s_addr', ctypes.c_uint32), ] class rt_metrics(ctypes.Structure): _fields_ = [ ('rmx_locks', ctypes.c_ulong), ('rmx_mtu', ctypes.c_ulong), ('rmx_hopcount', ctypes.c_ulong), ('rmx_expire', ctypes.c_ulong), ('rmx_recvpipe', ctypes.c_ulong), ('rmx_sendpipe', ctypes.c_ulong), ('rmx_ssthresh', ctypes.c_ulong), ('rmx_rtt', ctypes.c_ulong), ('rmx_rttvar', ctypes.c_ulong), ('rmx_pksent', ctypes.c_ulong), ('rmx_weight', ctypes.c_ulong), ('rmx_filler', ctypes.c_ulong * 3), ] class rt_msghdr(ctypes.Structure): _fields_ = [ ('rtm_msglen', ctypes.c_ushort), ('rtm_version', ctypes.c_ubyte), ('rtm_type', ctypes.c_ubyte), ('rtm_index', ctypes.c_ushort), ('rtm_flags', ctypes.c_int), ('rtm_addrs', ctypes.c_int), ('rtm_pid', ctypes.c_int32), ('rtm_seq', ctypes.c_int), ('rtm_errno', ctypes.c_int), ('rtm_fmask', ctypes.c_int), ('rtm_inits', ctypes.c_ulong), ('rtm_rmx', rt_metrics), ] class sockaddr(ctypes.Structure): _fields_ = [ ('sa_len', ctypes.c_ubyte), ('sa_family', ctypes.c_uint8), ('sa_data', ctypes.c_char * 14), ] class sockaddr_dl(ctypes.Structure): _fields_ = [ ('sdl_len', ctypes.c_ubyte), ('sdl_family', ctypes.c_ubyte), ('sdl_index', ctypes.c_ushort), ('sdl_type', ctypes.c_ubyte), ('sdl_nlen', ctypes.c_ubyte), ('sdl_alen', ctypes.c_ubyte), ('sdl_slen', ctypes.c_ubyte), ('sdl_data', ctypes.c_byte * 46), ] def lladdr(self): return (ctypes.c_byte * 46).from_address(ctypes.addressof(self.sdl_data) + self.sdl_nlen) class sockaddr_inarp(ctypes.Structure): _fields_ = [ ('sin_len', ctypes.c_ubyte), ('sin_family', ctypes.c_ubyte), ('sin_port', ctypes.c_ushort), ('sin_addr', in_addr), ('sin_srcaddr', in_addr), ('sin_tos', ctypes.c_ushort), ('sin_other', ctypes.c_ushort), ] def sa_size(self): return ctypes.sizeof(ctypes.c_long) if ctypes.cast(ctypes.pointer(self), ctypes.POINTER(sockaddr)).contents.sa_len == 0 else 1 + ((ctypes.cast(ctypes.pointer(self), ctypes.POINTER(sockaddr)).contents.sa_len - 1) | (ctypes.sizeof(ctypes.c_long) - 1)) class SysCtlARP(ARP): def __init__(self): self.libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno = True) self.size = ctypes.c_size_t(0) self.mib = (ctypes.c_int * 6)(4, 17, 0, 2, 2, 0x400) # CTL_NET, PF_ROUTE, 0, AF_INET, NET_RT_FLAGS, RTF_LLINFO def __call__(self, address = None, interface = None, numbers = False): if self.libc.sysctl(self.mib, len(self.mib), None, ctypes.byref(self.size), None, 0) < 0: status = ctypes.get_errno() raise OSError(status, os.strerror(status)) TABLE = (ctypes.c_char * self.size.value) table = TABLE() if self.libc.sysctl(self.mib, len(self.mib), table, ctypes.byref(self.size), None, 0): status = ctypes.get_errno() raise OSError(status, os.strerror(status)) def each(): index = ctypes.addressof(table) limit = index + self.size.value while True: if index >= limit: break header = ctypes.cast(TABLE.from_address(index), ctypes.POINTER(rt_msghdr)).contents inarp = ctypes.cast(TABLE.from_address(ctypes.addressof(header) + ctypes.sizeof(header)), ctypes.POINTER(sockaddr_inarp)).contents dl = ctypes.cast(TABLE.from_address(ctypes.addressof(inarp) + inarp.sa_size()), ctypes.POINTER(sockaddr_dl)).contents yield header, inarp, dl index += header.rtm_msglen if interface is not None: interface = self.libc.if_nametoindex(interface) if interface == 0: status = ctypes.get_errno() raise OSError(status, os.strerror(status)) if address is not None and not isinstance(address, ipaddr.IPv4Address): address = ipaddr.IPv4Address(address) for header, inarp, dl in each(): if interface is not None and dl.sdl_index == interface: continue if address is not None and inarp.sin_addr.s_addr != int(address): continue if dl.sdl_alen: mac_address = (ctypes.c_char * 18)() self.libc.ether_ntoa_r(dl.lladdr(), ctypes.byref(mac_address)) mac_address = mac_address.value else: mac_address = None ip_address = ipaddr.IPv4Address(inarp.sin_addr.s_addr) if numbers: host_name = None else: try: host_name = socket.gethostbyaddr(str(ip_address))[0] except socket.herror: host_name = None yield (mac_address, ip_address, host_name) if fnmatch.fnmatchcase(sys.platform, '*freebsd*'): arp = SysCtlARP() elif sys.platform.startswith('linux') or fnmatch.fnmatchcase(sys.platform, '*bsd*'): arp = CommandARP() else: arp = ARP() if __name__ == '__main__': import pprint pprint.pprint(list(arp())) pprint.pprint(list(arp(interface = 'bridge0'))) pprint.pprint(list(arp(address = '128.111.69.202', numbers = True)))