From 8e5f8cb2d390396020f7aae6998eeece0315c891 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Mon, 22 Apr 2024 10:41:51 +0200 Subject: [PATCH] Add mechanism for classless reverse delegations Also clean up parsing. --- TODO | 1 - example/__init__.py | 6 +++++ nsconfig/core.py | 57 +++++++++++++++++++++++++-------------------- nsconfig/util.py | 34 +++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 26 deletions(-) diff --git a/TODO b/TODO index 664dcd3..e1613f1 100644 --- a/TODO +++ b/TODO @@ -1,6 +1,5 @@ - Names with dots - E-mail addresses with dots in SOA -- Classless reverse delegation - Blackhole zones - DNSSEC - Automated generation of Null MX diff --git a/example/__init__.py b/example/__init__.py index 4497c6b..557ba09 100644 --- a/example/__init__.py +++ b/example/__init__.py @@ -13,4 +13,10 @@ for rev in ['10.1.0.0/16', '10.2.0.0/16', 'fd12:3456:789a::/48']: nsc.add_zone('example.net', follow_primary='10.42.0.1') +rz = nsc.add_zone(reverse_for='10.3.0.0/16') +rz.delegate_classless('10.3.16.0/20').NS('ns1.example.org') + +rz = nsc.add_zone(reverse_for='10.3.16.0/20') +rz[""].NS('ns1.example.org') + import example.example_org diff --git a/nsconfig/core.py b/nsconfig/core.py index 53739ef..a5434ec 100644 --- a/nsconfig/core.py +++ b/nsconfig/core.py @@ -6,6 +6,7 @@ from dns.node import Node from dns.rdata import Rdata from dns.rdataclass import RdataClass from dns.rdatatype import RdataType +import dns.rdtypes.ANY.CNAME import dns.rdtypes.ANY.MX import dns.rdtypes.ANY.NS import dns.rdtypes.ANY.PTR @@ -23,18 +24,14 @@ import socket import sys from typing import Optional, Dict, List, Self, DefaultDict, TextIO, TYPE_CHECKING -from nsconfig.util import flatten_list +from nsconfig.util import flatten_list, parse_address, parse_network, parse_name +from nsconfig.util import IPAddress, IPNetwork, IPAddr if TYPE_CHECKING: from nsconfig.daemon import NscDaemon -IPAddress = IPv4Address | IPv6Address -IPNetwork = IPv4Network | IPv6Network -IPAddr = str | IPAddress | List[str | IPAddress] - - class NscNode: nsc_zone: 'NscZonePrimary' name: str @@ -58,39 +55,24 @@ class NscNode: rds = self.node.find_rdataset(rec.rdclass, rec.rdtype, create=True) rds.add(rec, ttl=self._ttl) - def _parse_addr(self, addr: IPAddr | str) -> IPAddress: - if isinstance(addr, IPv4Address) or isinstance(addr, IPv6Address): - return addr - elif isinstance(addr, str): - return ip_address(addr) - else: - raise ValueError('Cannot parse IP address') - - def _parse_name(self, name: str) -> Name: - # FIXME: Names with escaped dots - if '.' in name: - return dns.name.from_text(name) - else: - return dns.name.from_text(name, origin=None) - def A(self, *addrs: IPAddr, reverse: bool = True) -> Self: - for a in map(self._parse_addr, flatten_list(addrs)): + for a in map(parse_address, flatten_list(addrs)): if isinstance(a, IPv4Address): self._add(dns.rdtypes.IN.A.A(RdataClass.IN, RdataType.A, str(a))) else: self._add(dns.rdtypes.IN.AAAA.AAAA(RdataClass.IN, RdataType.AAAA, str(a))) if reverse: - self.nsc_zone.nsc._add_reverse_mapping(a, dns.name.from_text(self.name + '.' + self.nsc_zone.name)) + self.nsc_zone.nsc._add_reverse_mapping(a, parse_name(self.name + '.' + self.nsc_zone.name)) return self def MX(self, pri: int, name: str) -> Self: self._add( - dns.rdtypes.ANY.MX.MX(RdataClass.IN, RdataType.MX, pri, self._parse_name(name)) + dns.rdtypes.ANY.MX.MX(RdataClass.IN, RdataType.MX, pri, parse_name(name)) ) return self def NS(self, *names: str | List[str]) -> Self: - for name in map(self._parse_name, flatten_list(names)): + for name in map(parse_name, flatten_list(names)): self._add(dns.rdtypes.ANY.NS.NS(RdataClass.IN, RdataType.NS, name)) return self @@ -103,6 +85,10 @@ class NscNode: self._add(dns.rdtypes.ANY.PTR.PTR(RdataClass.IN, RdataType.PTR, target)) return self + def CNAME(self, target: Name | str) -> Self: + self._add(dns.rdtypes.ANY.CNAME.CNAME(RdataClass.IN, RdataType.CNAME, target)) + return self + def generic(self, typ: str, text: str) -> Self: self._add(dns.rdata.from_text(RdataClass.IN, typ, text)) return self @@ -340,6 +326,27 @@ class NscZonePrimary(NscZone): def is_changed(self) -> bool: return self.state.serial != self.prev_state.serial + def delegate_classless(self, net: str | IPNetwork, subdomain: Optional[str] = None) -> NscNode: + net = parse_network(net) + assert self.reverse_for is not None + assert isinstance(self.reverse_for, IPv4Network) + assert self.reverse_for.prefixlen % 8 == 0 + assert isinstance(net, IPv4Network) + assert net.subnet_of(self.reverse_for) + assert net.prefixlen < self.reverse_for.prefixlen + 8 + + start = int(net.network_address.packed[net.prefixlen // 8]) + num = 1 << (8 - net.prefixlen % 8) + + if subdomain is None: + subdomain = f'{start}/{net.prefixlen}' + + for i in range(start, start + num): + target = f'{i}.{subdomain}' + self[str(i)].CNAME(parse_name(target, relative=True)) + + return self[subdomain] + class NscZoneSecondary(NscZone): primary_server: IPAddress diff --git a/nsconfig/util.py b/nsconfig/util.py index 7b9370c..48e2e6f 100644 --- a/nsconfig/util.py +++ b/nsconfig/util.py @@ -1,6 +1,14 @@ +import dns.name +from dns.name import Name +from ipaddress import ip_address, IPv4Address, IPv6Address, ip_network, IPv4Network, IPv6Network from typing import Any, List +IPAddress = IPv4Address | IPv6Address +IPNetwork = IPv4Network | IPv6Network +IPAddr = str | IPAddress | List[str | IPAddress] + + def flatten_list(args: Any) -> List[Any]: def flat(args): if isinstance(args, list) or isinstance(args, tuple): @@ -12,3 +20,29 @@ def flatten_list(args: Any) -> List[Any]: out: List[Any] = [] flat(args) return out + + +def parse_address(addr: IPAddress | str) -> IPAddress: + if isinstance(addr, IPv4Address) or isinstance(addr, IPv6Address): + return addr + elif isinstance(addr, str): + return ip_address(addr) + else: + raise ValueError('Cannot parse IP address') + + +def parse_network(addr: IPNetwork | str) -> IPNetwork: + if isinstance(addr, IPv4Network) or isinstance(addr, IPv6Network): + return addr + elif isinstance(addr, str): + return ip_network(addr) + else: + raise ValueError('Cannot parse IP network') + + +def parse_name(name: str, relative: bool = False) -> Name: + # FIXME: Names with escaped dots + if '.' in name and not relative: + return dns.name.from_text(name) + else: + return dns.name.from_text(name, origin=None) -- 2.39.2