from dns.rdata import Rdata
from dns.rdataclass import RdataClass
from dns.rdatatype import RdataType
+import dns.rdtypes.ANY.CNAME
import dns.rdtypes.ANY.MX
import dns.rdtypes.ANY.NS
import dns.rdtypes.ANY.PTR
import sys
from typing import Optional, Dict, List, Self, DefaultDict, TextIO, TYPE_CHECKING
-from nsconfig.util import flatten_list
+from nsconfig.util import flatten_list, parse_address, parse_network, parse_name
+from nsconfig.util import IPAddress, IPNetwork, IPAddr
if TYPE_CHECKING:
from nsconfig.daemon import NscDaemon
-IPAddress = IPv4Address | IPv6Address
-IPNetwork = IPv4Network | IPv6Network
-IPAddr = str | IPAddress | List[str | IPAddress]
-
-
class NscNode:
nsc_zone: 'NscZonePrimary'
name: str
rds = self.node.find_rdataset(rec.rdclass, rec.rdtype, create=True)
rds.add(rec, ttl=self._ttl)
- def _parse_addr(self, addr: IPAddr | str) -> IPAddress:
- if isinstance(addr, IPv4Address) or isinstance(addr, IPv6Address):
- return addr
- elif isinstance(addr, str):
- return ip_address(addr)
- else:
- raise ValueError('Cannot parse IP address')
-
- def _parse_name(self, name: str) -> Name:
- # FIXME: Names with escaped dots
- if '.' in name:
- return dns.name.from_text(name)
- else:
- return dns.name.from_text(name, origin=None)
-
def A(self, *addrs: IPAddr, reverse: bool = True) -> Self:
- for a in map(self._parse_addr, flatten_list(addrs)):
+ for a in map(parse_address, flatten_list(addrs)):
if isinstance(a, IPv4Address):
self._add(dns.rdtypes.IN.A.A(RdataClass.IN, RdataType.A, str(a)))
else:
self._add(dns.rdtypes.IN.AAAA.AAAA(RdataClass.IN, RdataType.AAAA, str(a)))
if reverse:
- self.nsc_zone.nsc._add_reverse_mapping(a, dns.name.from_text(self.name + '.' + self.nsc_zone.name))
+ self.nsc_zone.nsc._add_reverse_mapping(a, parse_name(self.name + '.' + self.nsc_zone.name))
return self
def MX(self, pri: int, name: str) -> Self:
self._add(
- dns.rdtypes.ANY.MX.MX(RdataClass.IN, RdataType.MX, pri, self._parse_name(name))
+ dns.rdtypes.ANY.MX.MX(RdataClass.IN, RdataType.MX, pri, parse_name(name))
)
return self
def NS(self, *names: str | List[str]) -> Self:
- for name in map(self._parse_name, flatten_list(names)):
+ for name in map(parse_name, flatten_list(names)):
self._add(dns.rdtypes.ANY.NS.NS(RdataClass.IN, RdataType.NS, name))
return self
self._add(dns.rdtypes.ANY.PTR.PTR(RdataClass.IN, RdataType.PTR, target))
return self
+ def CNAME(self, target: Name | str) -> Self:
+ self._add(dns.rdtypes.ANY.CNAME.CNAME(RdataClass.IN, RdataType.CNAME, target))
+ return self
+
def generic(self, typ: str, text: str) -> Self:
self._add(dns.rdata.from_text(RdataClass.IN, typ, text))
return self
def is_changed(self) -> bool:
return self.state.serial != self.prev_state.serial
+ def delegate_classless(self, net: str | IPNetwork, subdomain: Optional[str] = None) -> NscNode:
+ net = parse_network(net)
+ assert self.reverse_for is not None
+ assert isinstance(self.reverse_for, IPv4Network)
+ assert self.reverse_for.prefixlen % 8 == 0
+ assert isinstance(net, IPv4Network)
+ assert net.subnet_of(self.reverse_for)
+ assert net.prefixlen < self.reverse_for.prefixlen + 8
+
+ start = int(net.network_address.packed[net.prefixlen // 8])
+ num = 1 << (8 - net.prefixlen % 8)
+
+ if subdomain is None:
+ subdomain = f'{start}/{net.prefixlen}'
+
+ for i in range(start, start + num):
+ target = f'{i}.{subdomain}'
+ self[str(i)].CNAME(parse_name(target, relative=True))
+
+ return self[subdomain]
+
class NscZoneSecondary(NscZone):
primary_server: IPAddress
+import dns.name
+from dns.name import Name
+from ipaddress import ip_address, IPv4Address, IPv6Address, ip_network, IPv4Network, IPv6Network
from typing import Any, List
+IPAddress = IPv4Address | IPv6Address
+IPNetwork = IPv4Network | IPv6Network
+IPAddr = str | IPAddress | List[str | IPAddress]
+
+
def flatten_list(args: Any) -> List[Any]:
def flat(args):
if isinstance(args, list) or isinstance(args, tuple):
out: List[Any] = []
flat(args)
return out
+
+
+def parse_address(addr: IPAddress | str) -> IPAddress:
+ if isinstance(addr, IPv4Address) or isinstance(addr, IPv6Address):
+ return addr
+ elif isinstance(addr, str):
+ return ip_address(addr)
+ else:
+ raise ValueError('Cannot parse IP address')
+
+
+def parse_network(addr: IPNetwork | str) -> IPNetwork:
+ if isinstance(addr, IPv4Network) or isinstance(addr, IPv6Network):
+ return addr
+ elif isinstance(addr, str):
+ return ip_network(addr)
+ else:
+ raise ValueError('Cannot parse IP network')
+
+
+def parse_name(name: str, relative: bool = False) -> Name:
+ # FIXME: Names with escaped dots
+ if '.' in name and not relative:
+ return dns.name.from_text(name)
+ else:
+ return dns.name.from_text(name, origin=None)