#!/usr/bin/env python3
+from collections import defaultdict
from dataclasses import dataclass
from datetime import timedelta
import dns.name
from dns.rdatatype import RdataType
import dns.rdtypes.ANY.MX
import dns.rdtypes.ANY.NS
+import dns.rdtypes.ANY.PTR
import dns.rdtypes.ANY.SOA
import dns.rdtypes.ANY.TXT
import dns.rdtypes.IN.A
import dns.rdtypes.IN.AAAA
from dns.zone import Zone
-from ipaddress import ip_address, IPv4Address, IPv6Address
+from ipaddress import ip_address, IPv4Address, IPv6Address, ip_network, IPv4Network, IPv6Network
import socket
import sys
-from typing import Optional, Dict, List, Self, Tuple
+from typing import Optional, Dict, List, Self, Tuple, DefaultDict
IPAddress = IPv4Address | IPv6Address
+IPNetwork = IPv4Network | IPv6Network
IPAddr = str | IPAddress | List[str | IPAddress]
else:
return [self._parse_name(n) for n in names]
- def A(self, *addrs: IPAddr) -> Self:
+ def A(self, *addrs: IPAddr, reverse: bool = True) -> Self:
for a in self._parse_addrs(addrs):
if isinstance(a, IPv4Address):
self._add(dns.rdtypes.IN.A.A(RdataClass.IN, RdataType.A, str(a)))
else:
self._add(dns.rdtypes.IN.AAAA.AAAA(RdataClass.IN, RdataType.AAAA, str(a)))
+ if reverse:
+ self.nsc_zone.nsc._add_reverse_mapping(a, dns.name.from_text(self.name + '.' + self.nsc_zone.name))
return self
def MX(self, pri: int, name: str) -> Self:
return self
def NS(self, names: str | List[str]) -> Self:
+ # FIXME: Variadic?
for name in self._parse_names(names):
self._add(dns.rdtypes.ANY.NS.NS(RdataClass.IN, RdataType.NS, name))
return self
self._add(dns.rdtypes.ANY.TXT.TXT(RdataClass.IN, RdataType.TXT, text))
return self
+ def PTR(self, target: Name | str) -> Self:
+ self._add(dns.rdtypes.ANY.PTR.PTR(RdataClass.IN, RdataType.PTR, target))
+ return self
+
def generic(self, typ: str, text: str) -> Self:
self._add(dns.rdata.from_text(RdataClass.IN, typ, text))
return self
class NscZone:
+ nsc: 'Nsc'
name: str
zone: Zone
_min_ttl: int
+ reverse_for: Optional[IPNetwork]
- def __init__(self, name: str, **kwargs) -> None:
+ def __init__(self, nsc: 'Nsc', name: str, reverse_for: Optional[IPNetwork] = None, **kwargs) -> None:
+ self.nsc = nsc
self.name = name
self.config = NscZoneConfig(**kwargs).finalize()
self.zone = dns.zone.Zone(origin=name, rdclass=RdataClass.IN)
self._min_ttl = int(self.config.min_ttl.total_seconds())
+ self.reverse_for = reverse_for
conf = self.config
root = self[""]
def __getitem__(self, name: str) -> NscNode:
return NscNode(self, name)
- def host(self, name: str, *args) -> NscNode:
+ def host(self, name: str, *args, reverse: bool = True) -> NscNode:
n = NscNode(self, name)
- n.A(*args)
+ n.A(*args, reverse=reverse)
return n
def dump(self) -> None:
# Could use self.zone.to_file(sys.stdout), but we want better formatting
+ print(f'; Zone file for {self.name}')
last_name = None
for name, ttl, rec in self.zone.iterate_rdatas():
if name == last_name:
print(f'{print_name}\t{ttl if ttl != self._min_ttl else ""}\t{rec.rdtype.name}\t{rec.to_text()}')
last_name = name
+ def _add_ipv4_reverse(self, addr: IPv4Address, ptr_to: Name) -> None:
+ # Called only for addresses from this reverse network
+ assert self.reverse_for is not None
+ parts = str(addr).split('.')
+ parts = parts[self.reverse_for.prefixlen // 8:]
+ name = '.'.join(reversed(parts))
+ self.n(name).PTR(ptr_to)
+
+ def _add_ipv6_reverse(self, addr: IPv6Address, ptr_to: Name) -> None:
+ # Called only for addresses from this reverse network
+ assert self.reverse_for is not None
+ parts = addr.exploded.replace(':', "")
+ parts = parts[self.reverse_for.prefixlen // 4:]
+ name = '.'.join(reversed(parts))
+ self.n(name).PTR(ptr_to)
+
class Nsc:
- zones: Dict[str, Zone]
+ zones: Dict[str, NscZone]
default_zone_config: NscZoneConfig
+ ipv4_reverse: DefaultDict[IPv4Address, List[Name]]
+ ipv6_reverse: DefaultDict[IPv6Address, List[Name]]
def __init__(self, **kwargs) -> None:
self.zones = {}
self.default_zone_config = NscZoneConfig(**kwargs)
+ self.ipv4_reverse = defaultdict(list)
+ self.ipv6_reverse = defaultdict(list)
def add_zone(self, *args, inherit_config: Optional[NscZoneConfig] = None, **kwargs) -> Zone:
if inherit_config is None:
inherit_config = self.default_zone_config
- dom = NscZone(*args, inherit_config=inherit_config, **kwargs)
- assert dom.name not in self.zones
- self.zones[dom.name] = dom
- return dom
+ z = NscZone(self, *args, inherit_config=inherit_config, **kwargs)
+ assert z.name not in self.zones
+ self.zones[z.name] = z
+ return z
+
+ def add_reverse_zone(self, net: str | IPNetwork, name: Optional[str] = None, **kwargs) -> Zone:
+ if not (isinstance(net, IPv4Network) or isinstance(net, IPv6Network)):
+ net = ip_network(net, strict=True)
+ name = name or self._reverse_zone_name(net)
+ return self.add_zone(name, reverse_for=net, **kwargs)
+
+ def _reverse_zone_name(self, net: IPNetwork) -> str:
+ if isinstance(net, IPv4Network):
+ parts = str(net.network_address).split('.')
+ out = parts[:net.prefixlen // 8]
+ if net.prefixlen % 8 != 0:
+ out.append(parts[len(out)] + '/' + str(net.prefixlen))
+ return '.'.join(reversed(out)) + '.in-addr.arpa'
+ elif isinstance(net, IPv6Network):
+ assert net.prefixlen % 4 == 0
+ nibbles = net.network_address.exploded.replace(':', "")
+ nibbles = nibbles[:net.prefixlen // 4]
+ return '.'.join(reversed(nibbles)) + '.ip6.arpa'
+ else:
+ raise NotImplementedError()
+ def _add_reverse_mapping(self, addr: IPAddress, ptr_to: Name) -> None:
+ if isinstance(addr, IPv4Address):
+ self.ipv4_reverse[addr].append(ptr_to)
+ else:
+ self.ipv6_reverse[addr].append(ptr_to)
+
+ def dump_reverse(self) -> None:
+ print('### Requests for reverse mappings ###')
+ for ipa4, name in sorted(self.ipv4_reverse.items()):
+ print(f'{ipa4}\t{name}')
+ for ipa6, name in sorted(self.ipv6_reverse.items()):
+ print(f'{ipa6}\t{name}')
+
+ def fill_reverse(self) -> None:
+ for z in self.zones.values():
+ if z.reverse_for is not None:
+ if isinstance(z.reverse_for, IPv4Network):
+ for addr4, ptr_list in self.ipv4_reverse.items():
+ if addr4 in z.reverse_for:
+ for ptr_to in ptr_list:
+ z._add_ipv4_reverse(addr4, ptr_to)
+ else:
+ for addr6, ptr_list in self.ipv6_reverse.items():
+ if addr6 in z.reverse_for:
+ for ptr_to in ptr_list:
+ z._add_ipv6_reverse(addr6, ptr_to)
-class MyZone(Zone):
- admin_email = 'admin@ucw.cz'
- origin_server = 'ns.ucw.cz'
+c = Nsc(
+ admin_email='admin@ucw.cz',
+ origin_server='ns.ucw.cz',
+)
-c = Nsc()
-z = c.add_zone('ucw.cz') # origin_server='jabberwock.ucw.cz')
+z = c.add_zone('ucw.cz')
z[""].NS(['jabberwock', 'chirigo.gebbeth.cz', 'drak.ucw.cz'])
-z['jabberwock'].A('1.2.3.4', '2a00:da80:fff0:2::2')
+z['jabberwock'].A('1.2.3.4', '2a00:da80:fff0:2::2', '195.113.31.123')
z.host('test', '1.2.3.4', ['5.6.7.8', '8.7.6.5'])
.generic('HINFO', 'Something fishy'))
z.dump()
+
+r = c.add_reverse_zone('195.113.0.0/16')
+r2 = c.add_reverse_zone('2a00:da80:fff0:2::/64')
+
+c.dump_reverse()
+c.fill_reverse()
+
+r.dump()
+r2.dump()