From d86a562d40903f6a0a839719f9f8fce3afd07cba Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Sat, 20 Apr 2024 22:53:33 +0200 Subject: [PATCH] Reverse mappings --- nsc.py | 125 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 108 insertions(+), 17 deletions(-) diff --git a/nsc.py b/nsc.py index b734250..2a01f23 100755 --- a/nsc.py +++ b/nsc.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +from collections import defaultdict from dataclasses import dataclass from datetime import timedelta import dns.name @@ -10,18 +11,20 @@ from dns.rdataclass import RdataClass from dns.rdatatype import RdataType import dns.rdtypes.ANY.MX import dns.rdtypes.ANY.NS +import dns.rdtypes.ANY.PTR import dns.rdtypes.ANY.SOA import dns.rdtypes.ANY.TXT import dns.rdtypes.IN.A import dns.rdtypes.IN.AAAA from dns.zone import Zone -from ipaddress import ip_address, IPv4Address, IPv6Address +from ipaddress import ip_address, IPv4Address, IPv6Address, ip_network, IPv4Network, IPv6Network import socket import sys -from typing import Optional, Dict, List, Self, Tuple +from typing import Optional, Dict, List, Self, Tuple, DefaultDict IPAddress = IPv4Address | IPv6Address +IPNetwork = IPv4Network | IPv6Network IPAddr = str | IPAddress | List[str | IPAddress] @@ -73,12 +76,14 @@ class NscNode: else: return [self._parse_name(n) for n in names] - def A(self, *addrs: IPAddr) -> Self: + def A(self, *addrs: IPAddr, reverse: bool = True) -> Self: for a in self._parse_addrs(addrs): if isinstance(a, IPv4Address): self._add(dns.rdtypes.IN.A.A(RdataClass.IN, RdataType.A, str(a))) else: self._add(dns.rdtypes.IN.AAAA.AAAA(RdataClass.IN, RdataType.AAAA, str(a))) + if reverse: + self.nsc_zone.nsc._add_reverse_mapping(a, dns.name.from_text(self.name + '.' + self.nsc_zone.name)) return self def MX(self, pri: int, name: str) -> Self: @@ -88,6 +93,7 @@ class NscNode: return self def NS(self, names: str | List[str]) -> Self: + # FIXME: Variadic? for name in self._parse_names(names): self._add(dns.rdtypes.ANY.NS.NS(RdataClass.IN, RdataType.NS, name)) return self @@ -96,6 +102,10 @@ class NscNode: self._add(dns.rdtypes.ANY.TXT.TXT(RdataClass.IN, RdataType.TXT, text)) return self + def PTR(self, target: Name | str) -> Self: + self._add(dns.rdtypes.ANY.PTR.PTR(RdataClass.IN, RdataType.PTR, target)) + return self + def generic(self, typ: str, text: str) -> Self: self._add(dns.rdata.from_text(RdataClass.IN, typ, text)) return self @@ -148,15 +158,19 @@ NscZoneConfig.default_config = NscZoneConfig( class NscZone: + nsc: 'Nsc' name: str zone: Zone _min_ttl: int + reverse_for: Optional[IPNetwork] - def __init__(self, name: str, **kwargs) -> None: + def __init__(self, nsc: 'Nsc', name: str, reverse_for: Optional[IPNetwork] = None, **kwargs) -> None: + self.nsc = nsc self.name = name self.config = NscZoneConfig(**kwargs).finalize() self.zone = dns.zone.Zone(origin=name, rdclass=RdataClass.IN) self._min_ttl = int(self.config.min_ttl.total_seconds()) + self.reverse_for = reverse_for conf = self.config root = self[""] @@ -179,13 +193,14 @@ class NscZone: def __getitem__(self, name: str) -> NscNode: return NscNode(self, name) - def host(self, name: str, *args) -> NscNode: + def host(self, name: str, *args, reverse: bool = True) -> NscNode: n = NscNode(self, name) - n.A(*args) + n.A(*args, reverse=reverse) return n def dump(self) -> None: # Could use self.zone.to_file(sys.stdout), but we want better formatting + print(f'; Zone file for {self.name}') last_name = None for name, ttl, rec in self.zone.iterate_rdatas(): if name == last_name: @@ -195,35 +210,102 @@ class NscZone: print(f'{print_name}\t{ttl if ttl != self._min_ttl else ""}\t{rec.rdtype.name}\t{rec.to_text()}') last_name = name + def _add_ipv4_reverse(self, addr: IPv4Address, ptr_to: Name) -> None: + # Called only for addresses from this reverse network + assert self.reverse_for is not None + parts = str(addr).split('.') + parts = parts[self.reverse_for.prefixlen // 8:] + name = '.'.join(reversed(parts)) + self.n(name).PTR(ptr_to) + + def _add_ipv6_reverse(self, addr: IPv6Address, ptr_to: Name) -> None: + # Called only for addresses from this reverse network + assert self.reverse_for is not None + parts = addr.exploded.replace(':', "") + parts = parts[self.reverse_for.prefixlen // 4:] + name = '.'.join(reversed(parts)) + self.n(name).PTR(ptr_to) + class Nsc: - zones: Dict[str, Zone] + zones: Dict[str, NscZone] default_zone_config: NscZoneConfig + ipv4_reverse: DefaultDict[IPv4Address, List[Name]] + ipv6_reverse: DefaultDict[IPv6Address, List[Name]] def __init__(self, **kwargs) -> None: self.zones = {} self.default_zone_config = NscZoneConfig(**kwargs) + self.ipv4_reverse = defaultdict(list) + self.ipv6_reverse = defaultdict(list) def add_zone(self, *args, inherit_config: Optional[NscZoneConfig] = None, **kwargs) -> Zone: if inherit_config is None: inherit_config = self.default_zone_config - dom = NscZone(*args, inherit_config=inherit_config, **kwargs) - assert dom.name not in self.zones - self.zones[dom.name] = dom - return dom + z = NscZone(self, *args, inherit_config=inherit_config, **kwargs) + assert z.name not in self.zones + self.zones[z.name] = z + return z + + def add_reverse_zone(self, net: str | IPNetwork, name: Optional[str] = None, **kwargs) -> Zone: + if not (isinstance(net, IPv4Network) or isinstance(net, IPv6Network)): + net = ip_network(net, strict=True) + name = name or self._reverse_zone_name(net) + return self.add_zone(name, reverse_for=net, **kwargs) + + def _reverse_zone_name(self, net: IPNetwork) -> str: + if isinstance(net, IPv4Network): + parts = str(net.network_address).split('.') + out = parts[:net.prefixlen // 8] + if net.prefixlen % 8 != 0: + out.append(parts[len(out)] + '/' + str(net.prefixlen)) + return '.'.join(reversed(out)) + '.in-addr.arpa' + elif isinstance(net, IPv6Network): + assert net.prefixlen % 4 == 0 + nibbles = net.network_address.exploded.replace(':', "") + nibbles = nibbles[:net.prefixlen // 4] + return '.'.join(reversed(nibbles)) + '.ip6.arpa' + else: + raise NotImplementedError() + def _add_reverse_mapping(self, addr: IPAddress, ptr_to: Name) -> None: + if isinstance(addr, IPv4Address): + self.ipv4_reverse[addr].append(ptr_to) + else: + self.ipv6_reverse[addr].append(ptr_to) + + def dump_reverse(self) -> None: + print('### Requests for reverse mappings ###') + for ipa4, name in sorted(self.ipv4_reverse.items()): + print(f'{ipa4}\t{name}') + for ipa6, name in sorted(self.ipv6_reverse.items()): + print(f'{ipa6}\t{name}') + + def fill_reverse(self) -> None: + for z in self.zones.values(): + if z.reverse_for is not None: + if isinstance(z.reverse_for, IPv4Network): + for addr4, ptr_list in self.ipv4_reverse.items(): + if addr4 in z.reverse_for: + for ptr_to in ptr_list: + z._add_ipv4_reverse(addr4, ptr_to) + else: + for addr6, ptr_list in self.ipv6_reverse.items(): + if addr6 in z.reverse_for: + for ptr_to in ptr_list: + z._add_ipv6_reverse(addr6, ptr_to) -class MyZone(Zone): - admin_email = 'admin@ucw.cz' - origin_server = 'ns.ucw.cz' +c = Nsc( + admin_email='admin@ucw.cz', + origin_server='ns.ucw.cz', +) -c = Nsc() -z = c.add_zone('ucw.cz') # origin_server='jabberwock.ucw.cz') +z = c.add_zone('ucw.cz') z[""].NS(['jabberwock', 'chirigo.gebbeth.cz', 'drak.ucw.cz']) -z['jabberwock'].A('1.2.3.4', '2a00:da80:fff0:2::2') +z['jabberwock'].A('1.2.3.4', '2a00:da80:fff0:2::2', '195.113.31.123') z.host('test', '1.2.3.4', ['5.6.7.8', '8.7.6.5']) @@ -235,3 +317,12 @@ z.host('test', '1.2.3.4', ['5.6.7.8', '8.7.6.5']) .generic('HINFO', 'Something fishy')) z.dump() + +r = c.add_reverse_zone('195.113.0.0/16') +r2 = c.add_reverse_zone('2a00:da80:fff0:2::/64') + +c.dump_reverse() +c.fill_reverse() + +r.dump() +r2.dump() -- 2.39.2