]> mj.ucw.cz Git - pynsc.git/commitdiff
Reverse mappings
authorMartin Mares <mj@ucw.cz>
Sat, 20 Apr 2024 20:53:33 +0000 (22:53 +0200)
committerMartin Mares <mj@ucw.cz>
Sat, 20 Apr 2024 20:53:33 +0000 (22:53 +0200)
nsc.py

diff --git a/nsc.py b/nsc.py
index b734250a64308a5e90af83f76b6ba55c81d2c38e..2a01f23f7f27715410bcd7fc6a177e7520a19b0e 100755 (executable)
--- a/nsc.py
+++ b/nsc.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python3
 
+from collections import defaultdict
 from dataclasses import dataclass
 from datetime import timedelta
 import dns.name
@@ -10,18 +11,20 @@ from dns.rdataclass import RdataClass
 from dns.rdatatype import RdataType
 import dns.rdtypes.ANY.MX
 import dns.rdtypes.ANY.NS
+import dns.rdtypes.ANY.PTR
 import dns.rdtypes.ANY.SOA
 import dns.rdtypes.ANY.TXT
 import dns.rdtypes.IN.A
 import dns.rdtypes.IN.AAAA
 from dns.zone import Zone
-from ipaddress import ip_address, IPv4Address, IPv6Address
+from ipaddress import ip_address, IPv4Address, IPv6Address, ip_network, IPv4Network, IPv6Network
 import socket
 import sys
-from typing import Optional, Dict, List, Self, Tuple
+from typing import Optional, Dict, List, Self, Tuple, DefaultDict
 
 
 IPAddress = IPv4Address | IPv6Address
+IPNetwork = IPv4Network | IPv6Network
 IPAddr = str | IPAddress | List[str | IPAddress]
 
 
@@ -73,12 +76,14 @@ class NscNode:
         else:
             return [self._parse_name(n) for n in names]
 
-    def A(self, *addrs: IPAddr) -> Self:
+    def A(self, *addrs: IPAddr, reverse: bool = True) -> Self:
         for a in self._parse_addrs(addrs):
             if isinstance(a, IPv4Address):
                 self._add(dns.rdtypes.IN.A.A(RdataClass.IN, RdataType.A, str(a)))
             else:
                 self._add(dns.rdtypes.IN.AAAA.AAAA(RdataClass.IN, RdataType.AAAA, str(a)))
+            if reverse:
+                self.nsc_zone.nsc._add_reverse_mapping(a, dns.name.from_text(self.name + '.' + self.nsc_zone.name))
         return self
 
     def MX(self, pri: int, name: str) -> Self:
@@ -88,6 +93,7 @@ class NscNode:
         return self
 
     def NS(self, names: str | List[str]) -> Self:
+        # FIXME: Variadic?
         for name in self._parse_names(names):
             self._add(dns.rdtypes.ANY.NS.NS(RdataClass.IN, RdataType.NS, name))
         return self
@@ -96,6 +102,10 @@ class NscNode:
         self._add(dns.rdtypes.ANY.TXT.TXT(RdataClass.IN, RdataType.TXT, text))
         return self
 
+    def PTR(self, target: Name | str) -> Self:
+        self._add(dns.rdtypes.ANY.PTR.PTR(RdataClass.IN, RdataType.PTR, target))
+        return self
+
     def generic(self, typ: str, text: str) -> Self:
         self._add(dns.rdata.from_text(RdataClass.IN, typ, text))
         return self
@@ -148,15 +158,19 @@ NscZoneConfig.default_config = NscZoneConfig(
 
 
 class NscZone:
+    nsc: 'Nsc'
     name: str
     zone: Zone
     _min_ttl: int
+    reverse_for: Optional[IPNetwork]
 
-    def __init__(self, name: str, **kwargs) -> None:
+    def __init__(self, nsc: 'Nsc', name: str, reverse_for: Optional[IPNetwork] = None, **kwargs) -> None:
+        self.nsc = nsc
         self.name = name
         self.config = NscZoneConfig(**kwargs).finalize()
         self.zone = dns.zone.Zone(origin=name, rdclass=RdataClass.IN)
         self._min_ttl = int(self.config.min_ttl.total_seconds())
+        self.reverse_for = reverse_for
 
         conf = self.config
         root = self[""]
@@ -179,13 +193,14 @@ class NscZone:
     def __getitem__(self, name: str) -> NscNode:
         return NscNode(self, name)
 
-    def host(self, name: str, *args) -> NscNode:
+    def host(self, name: str, *args, reverse: bool = True) -> NscNode:
         n = NscNode(self, name)
-        n.A(*args)
+        n.A(*args, reverse=reverse)
         return n
 
     def dump(self) -> None:
         # Could use self.zone.to_file(sys.stdout), but we want better formatting
+        print(f'; Zone file for {self.name}')
         last_name = None
         for name, ttl, rec in self.zone.iterate_rdatas():
             if name == last_name:
@@ -195,35 +210,102 @@ class NscZone:
             print(f'{print_name}\t{ttl if ttl != self._min_ttl else ""}\t{rec.rdtype.name}\t{rec.to_text()}')
             last_name = name
 
+    def _add_ipv4_reverse(self, addr: IPv4Address, ptr_to: Name) -> None:
+        # Called only for addresses from this reverse network
+        assert self.reverse_for is not None
+        parts = str(addr).split('.')
+        parts = parts[self.reverse_for.prefixlen // 8:]
+        name = '.'.join(reversed(parts))
+        self.n(name).PTR(ptr_to)
+
+    def _add_ipv6_reverse(self, addr: IPv6Address, ptr_to: Name) -> None:
+        # Called only for addresses from this reverse network
+        assert self.reverse_for is not None
+        parts = addr.exploded.replace(':', "")
+        parts = parts[self.reverse_for.prefixlen // 4:]
+        name = '.'.join(reversed(parts))
+        self.n(name).PTR(ptr_to)
+
 
 class Nsc:
-    zones: Dict[str, Zone]
+    zones: Dict[str, NscZone]
     default_zone_config: NscZoneConfig
+    ipv4_reverse: DefaultDict[IPv4Address, List[Name]]
+    ipv6_reverse: DefaultDict[IPv6Address, List[Name]]
 
     def __init__(self, **kwargs) -> None:
         self.zones = {}
         self.default_zone_config = NscZoneConfig(**kwargs)
+        self.ipv4_reverse = defaultdict(list)
+        self.ipv6_reverse = defaultdict(list)
 
     def add_zone(self, *args, inherit_config: Optional[NscZoneConfig] = None, **kwargs) -> Zone:
         if inherit_config is None:
             inherit_config = self.default_zone_config
-        dom = NscZone(*args, inherit_config=inherit_config, **kwargs)
-        assert dom.name not in self.zones
-        self.zones[dom.name] = dom
-        return dom
+        z = NscZone(self, *args, inherit_config=inherit_config, **kwargs)
+        assert z.name not in self.zones
+        self.zones[z.name] = z
+        return z
+
+    def add_reverse_zone(self, net: str | IPNetwork, name: Optional[str] = None, **kwargs) -> Zone:
+        if not (isinstance(net, IPv4Network) or isinstance(net, IPv6Network)):
+            net = ip_network(net, strict=True)
+        name = name or self._reverse_zone_name(net)
+        return self.add_zone(name, reverse_for=net, **kwargs)
+
+    def _reverse_zone_name(self, net: IPNetwork) -> str:
+        if isinstance(net, IPv4Network):
+            parts = str(net.network_address).split('.')
+            out = parts[:net.prefixlen // 8]
+            if net.prefixlen % 8 != 0:
+                out.append(parts[len(out)] + '/' + str(net.prefixlen))
+            return '.'.join(reversed(out)) + '.in-addr.arpa'
+        elif isinstance(net, IPv6Network):
+            assert net.prefixlen % 4 == 0
+            nibbles = net.network_address.exploded.replace(':', "")
+            nibbles = nibbles[:net.prefixlen // 4]
+            return '.'.join(reversed(nibbles)) + '.ip6.arpa'
+        else:
+            raise NotImplementedError()
 
+    def _add_reverse_mapping(self, addr: IPAddress, ptr_to: Name) -> None:
+        if isinstance(addr, IPv4Address):
+            self.ipv4_reverse[addr].append(ptr_to)
+        else:
+            self.ipv6_reverse[addr].append(ptr_to)
+
+    def dump_reverse(self) -> None:
+        print('### Requests for reverse mappings ###')
+        for ipa4, name in sorted(self.ipv4_reverse.items()):
+            print(f'{ipa4}\t{name}')
+        for ipa6, name in sorted(self.ipv6_reverse.items()):
+            print(f'{ipa6}\t{name}')
+
+    def fill_reverse(self) -> None:
+        for z in self.zones.values():
+            if z.reverse_for is not None:
+                if isinstance(z.reverse_for, IPv4Network):
+                    for addr4, ptr_list in self.ipv4_reverse.items():
+                        if addr4 in z.reverse_for:
+                            for ptr_to in ptr_list:
+                                z._add_ipv4_reverse(addr4, ptr_to)
+                else:
+                    for addr6, ptr_list in self.ipv6_reverse.items():
+                        if addr6 in z.reverse_for:
+                            for ptr_to in ptr_list:
+                                z._add_ipv6_reverse(addr6, ptr_to)
 
-class MyZone(Zone):
-    admin_email = 'admin@ucw.cz'
-    origin_server = 'ns.ucw.cz'
 
+c = Nsc(
+    admin_email='admin@ucw.cz',
+    origin_server='ns.ucw.cz',
+)
 
-c = Nsc()
-z = c.add_zone('ucw.cz')  # origin_server='jabberwock.ucw.cz')
+z = c.add_zone('ucw.cz')
 
 z[""].NS(['jabberwock', 'chirigo.gebbeth.cz', 'drak.ucw.cz'])
 
-z['jabberwock'].A('1.2.3.4', '2a00:da80:fff0:2::2')
+z['jabberwock'].A('1.2.3.4', '2a00:da80:fff0:2::2', '195.113.31.123')
 
 z.host('test', '1.2.3.4', ['5.6.7.8', '8.7.6.5'])
 
@@ -235,3 +317,12 @@ z.host('test', '1.2.3.4', ['5.6.7.8', '8.7.6.5'])
     .generic('HINFO', 'Something fishy'))
 
 z.dump()
+
+r = c.add_reverse_zone('195.113.0.0/16')
+r2 = c.add_reverse_zone('2a00:da80:fff0:2::/64')
+
+c.dump_reverse()
+c.fill_reverse()
+
+r.dump()
+r2.dump()