]> mj.ucw.cz Git - pynsc.git/commitdiff
Add mechanism for classless reverse delegations
authorMartin Mares <mj@ucw.cz>
Mon, 22 Apr 2024 08:41:51 +0000 (10:41 +0200)
committerMartin Mares <mj@ucw.cz>
Mon, 22 Apr 2024 08:41:51 +0000 (10:41 +0200)
Also clean up parsing.

TODO
example/__init__.py
nsconfig/core.py
nsconfig/util.py

diff --git a/TODO b/TODO
index 664dcd3d88295179cb53996b3966c4926641ee22..e1613f142aa2e51b6890cd2299a2488a33986dd6 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,6 +1,5 @@
 - Names with dots
 - E-mail addresses with dots in SOA
-- Classless reverse delegation
 - Blackhole zones
 - DNSSEC
 - Automated generation of Null MX
index 4497c6b6ceddb291e8f6a6ddee34f0220c22be0b..557ba093c5dd7593353b056984f740efd16c3b8f 100644 (file)
@@ -13,4 +13,10 @@ for rev in ['10.1.0.0/16', '10.2.0.0/16', 'fd12:3456:789a::/48']:
 
 nsc.add_zone('example.net', follow_primary='10.42.0.1')
 
+rz = nsc.add_zone(reverse_for='10.3.0.0/16')
+rz.delegate_classless('10.3.16.0/20').NS('ns1.example.org')
+
+rz = nsc.add_zone(reverse_for='10.3.16.0/20')
+rz[""].NS('ns1.example.org')
+
 import example.example_org
index 53739ef304afb03744c24c24b125aa70971835a4..a5434ec6c63df2ee1e5db97f9b8a3ee3a0896cc9 100644 (file)
@@ -6,6 +6,7 @@ from dns.node import Node
 from dns.rdata import Rdata
 from dns.rdataclass import RdataClass
 from dns.rdatatype import RdataType
+import dns.rdtypes.ANY.CNAME
 import dns.rdtypes.ANY.MX
 import dns.rdtypes.ANY.NS
 import dns.rdtypes.ANY.PTR
@@ -23,18 +24,14 @@ import socket
 import sys
 from typing import Optional, Dict, List, Self, DefaultDict, TextIO, TYPE_CHECKING
 
-from nsconfig.util import flatten_list
+from nsconfig.util import flatten_list, parse_address, parse_network, parse_name
+from nsconfig.util import IPAddress, IPNetwork, IPAddr
 
 
 if TYPE_CHECKING:
     from nsconfig.daemon import NscDaemon
 
 
-IPAddress = IPv4Address | IPv6Address
-IPNetwork = IPv4Network | IPv6Network
-IPAddr = str | IPAddress | List[str | IPAddress]
-
-
 class NscNode:
     nsc_zone: 'NscZonePrimary'
     name: str
@@ -58,39 +55,24 @@ class NscNode:
         rds = self.node.find_rdataset(rec.rdclass, rec.rdtype, create=True)
         rds.add(rec, ttl=self._ttl)
 
-    def _parse_addr(self, addr: IPAddr | str) -> IPAddress:
-        if isinstance(addr, IPv4Address) or isinstance(addr, IPv6Address):
-            return addr
-        elif isinstance(addr, str):
-            return ip_address(addr)
-        else:
-            raise ValueError('Cannot parse IP address')
-
-    def _parse_name(self, name: str) -> Name:
-        # FIXME: Names with escaped dots
-        if '.' in name:
-            return dns.name.from_text(name)
-        else:
-            return dns.name.from_text(name, origin=None)
-
     def A(self, *addrs: IPAddr, reverse: bool = True) -> Self:
-        for a in map(self._parse_addr, flatten_list(addrs)):
+        for a in map(parse_address, flatten_list(addrs)):
             if isinstance(a, IPv4Address):
                 self._add(dns.rdtypes.IN.A.A(RdataClass.IN, RdataType.A, str(a)))
             else:
                 self._add(dns.rdtypes.IN.AAAA.AAAA(RdataClass.IN, RdataType.AAAA, str(a)))
             if reverse:
-                self.nsc_zone.nsc._add_reverse_mapping(a, dns.name.from_text(self.name + '.' + self.nsc_zone.name))
+                self.nsc_zone.nsc._add_reverse_mapping(a, parse_name(self.name + '.' + self.nsc_zone.name))
         return self
 
     def MX(self, pri: int, name: str) -> Self:
         self._add(
-            dns.rdtypes.ANY.MX.MX(RdataClass.IN, RdataType.MX, pri, self._parse_name(name))
+            dns.rdtypes.ANY.MX.MX(RdataClass.IN, RdataType.MX, pri, parse_name(name))
         )
         return self
 
     def NS(self, *names: str | List[str]) -> Self:
-        for name in map(self._parse_name, flatten_list(names)):
+        for name in map(parse_name, flatten_list(names)):
             self._add(dns.rdtypes.ANY.NS.NS(RdataClass.IN, RdataType.NS, name))
         return self
 
@@ -103,6 +85,10 @@ class NscNode:
         self._add(dns.rdtypes.ANY.PTR.PTR(RdataClass.IN, RdataType.PTR, target))
         return self
 
+    def CNAME(self, target: Name | str) -> Self:
+        self._add(dns.rdtypes.ANY.CNAME.CNAME(RdataClass.IN, RdataType.CNAME, target))
+        return self
+
     def generic(self, typ: str, text: str) -> Self:
         self._add(dns.rdata.from_text(RdataClass.IN, typ, text))
         return self
@@ -340,6 +326,27 @@ class NscZonePrimary(NscZone):
     def is_changed(self) -> bool:
         return self.state.serial != self.prev_state.serial
 
+    def delegate_classless(self, net: str | IPNetwork, subdomain: Optional[str] = None) -> NscNode:
+        net = parse_network(net)
+        assert self.reverse_for is not None
+        assert isinstance(self.reverse_for, IPv4Network)
+        assert self.reverse_for.prefixlen % 8 == 0
+        assert isinstance(net, IPv4Network)
+        assert net.subnet_of(self.reverse_for)
+        assert net.prefixlen < self.reverse_for.prefixlen + 8
+
+        start = int(net.network_address.packed[net.prefixlen // 8])
+        num = 1 << (8 - net.prefixlen % 8)
+
+        if subdomain is None:
+            subdomain = f'{start}/{net.prefixlen}'
+
+        for i in range(start, start + num):
+            target = f'{i}.{subdomain}'
+            self[str(i)].CNAME(parse_name(target, relative=True))
+
+        return self[subdomain]
+
 
 class NscZoneSecondary(NscZone):
     primary_server: IPAddress
index 7b9370c6ea564561899c5794d2adec9481e90696..48e2e6f7d39bc70fd598b2aae04f3fdebffe3eca 100644 (file)
@@ -1,6 +1,14 @@
+import dns.name
+from dns.name import Name
+from ipaddress import ip_address, IPv4Address, IPv6Address, ip_network, IPv4Network, IPv6Network
 from typing import Any, List
 
 
+IPAddress = IPv4Address | IPv6Address
+IPNetwork = IPv4Network | IPv6Network
+IPAddr = str | IPAddress | List[str | IPAddress]
+
+
 def flatten_list(args: Any) -> List[Any]:
     def flat(args):
         if isinstance(args, list) or isinstance(args, tuple):
@@ -12,3 +20,29 @@ def flatten_list(args: Any) -> List[Any]:
     out: List[Any] = []
     flat(args)
     return out
+
+
+def parse_address(addr: IPAddress | str) -> IPAddress:
+    if isinstance(addr, IPv4Address) or isinstance(addr, IPv6Address):
+        return addr
+    elif isinstance(addr, str):
+        return ip_address(addr)
+    else:
+        raise ValueError('Cannot parse IP address')
+
+
+def parse_network(addr: IPNetwork | str) -> IPNetwork:
+    if isinstance(addr, IPv4Network) or isinstance(addr, IPv6Network):
+        return addr
+    elif isinstance(addr, str):
+        return ip_network(addr)
+    else:
+        raise ValueError('Cannot parse IP network')
+
+
+def parse_name(name: str, relative: bool = False) -> Name:
+    # FIXME: Names with escaped dots
+    if '.' in name and not relative:
+        return dns.name.from_text(name)
+    else:
+        return dns.name.from_text(name, origin=None)