for z in nsc.get_zones():
print(f'Zone: {z.name}')
+ print(f'View: {z.config.view or "-"}')
print(f'Type: {z.zone_type.name}')
if isinstance(z, NscZonePrimary):
if z.aliases:
print(f'Old hash: {z.prev_state.hash}')
print(f'New serial: {z.state.serial}')
print(f'New hash: {z.state.hash}')
- out_file = zone_dir / z.safe_name
+ out_file = zone_dir / z.file_name
print(f'Dumping to: {out_file}')
+ out_file.parent.mkdir(parents=True, exist_ok=True)
with open(out_file, 'w') as f:
z.dump(file=f)
elif isinstance(z, NscZoneSecondary):
name = z.name
if len(name) > 30 and not args.long:
name = name[:16] + '[...]' + name[-16:]
+ if z.config.view:
+ name += ' (' + z.config.view + ')'
if do_show:
table.add_row([action, name, z.zone_type.name, status])
for z in nsc.get_zones():
if isinstance(z, NscZonePrimary) and z.is_changed():
- print(f'Updating zone {z.name} (serial {z.state.serial})')
+ in_view = ' (' + z.config.view + ')' if z.config.view else ""
+ print(f'Updating zone {z.name}{in_view} (serial {z.state.serial})')
z.write_zone()
nsc.daemon.reload_zone(z)
for alias in z.aliases:
daemon_options: List[str]
add_null_mx: bool
name_parse_mode: NameParseMode
+ view: str # "" for main
default_config: Optional['NscZoneConfig'] = None
add_daemon_options: Optional[List[str]] = None,
add_null_mx: Optional[bool] = None,
name_parse_mode: Optional[NameParseMode] = None,
+ view: Optional[str] = None,
inherit_config: Optional['NscZoneConfig'] = None,
) -> None:
if inherit_config is None:
self.daemon_options = daemon_options if daemon_options is not None else inherit_config.daemon_options
self.add_null_mx = add_null_mx if add_null_mx is not None else inherit_config.add_null_mx
self.name_parse_mode = name_parse_mode if name_parse_mode is not None else inherit_config.name_parse_mode
+ self.view = view if view is not None else inherit_config.view
if add_daemon_options is not None:
self.daemon_options += add_daemon_options
daemon_options=[],
add_null_mx=False,
name_parse_mode=NameParseMode.absolute,
+ view="",
)
pass
def save(self, file: Path) -> None:
+ file.parent.mkdir(parents=True, exist_ok=True)
new_file = Path(str(file) + '.new')
with open(new_file, 'w') as f:
js = {
nsc: 'Nsc'
name: str
dns_name: Name
- safe_name: str # For use in file names
+ file_name: str # May contain slashes
zone_type: ZoneType
config: NscZoneConfig
reverse_for: Optional[IPNetwork]
self.nsc = nsc
self.name = name
self.dns_name = dns.name.from_text(name)
- self.safe_name = name.replace('/', '@')
self.config = NscZoneConfig(**kwargs).finalize()
+ self.file_name = name.replace('/', '@')
+ if self.config.view:
+ self.file_name = f'{self.config.view}/{self.file_name}'
self.reverse_for = reverse_for
def process(self) -> None:
super().__init__(*args, **kwargs)
self.zone_type = ZoneType.primary
- self.zone_file = self.nsc.zone_dir / self.safe_name
- self.state_file = self.nsc.state_dir / (self.safe_name + '.json')
+ self.zone_file = self.nsc.zone_dir / self.file_name
+ self.state_file = self.nsc.state_dir / (self.file_name + '.json')
self.state = NscZoneState()
self.prev_state = NscZoneState()
return n
def zone_header(self) -> str:
+ view = f' (view {self.config.view})' if self.config.view else ""
return (
- f'; Zone file for {self.name}\n'
+ f'; Zone file for {self.name}{view}\n'
+ '; Generated by NSC, please do not edit manually.\n'
+ '\n')
def write_zone(self) -> None:
self.update_soa()
+ self.zone_file.parent.mkdir(parents=True, exist_ok=True)
new_file = Path(str(self.zone_file) + '.new')
with open(new_file, 'w') as f:
self.dump(file=f)
super().__init__(*args, **kwargs)
self.zone_type = ZoneType.secondary
self.primary_server = primary_server
- self.secondary_file = self.nsc.secondary_dir / self.safe_name
+ self.secondary_file = self.nsc.secondary_dir / self.file_name
+
+ def process(self) -> None:
+ self.secondary_file.parent.mkdir(parents=True, exist_ok=True)
class NscZoneAlias(NscZone):
class Nsc:
start_time: datetime
- zones: Dict[str, NscZone]
+ zones: Dict[Tuple[str, str], NscZone] # key is (zone name, view)
default_zone_config: NscZoneConfig
ipv4_reverse: DefaultDict[IPv4Address, List[Name]]
ipv6_reverse: DefaultDict[IPv6Address, List[Name]]
reverse_for = ip_network(reverse_for, strict=True)
name = name or self._reverse_zone_name(reverse_for)
assert name is not None
- assert name not in self.zones
z: NscZone
if alias_for is not None:
secondary_to = ip_address(secondary_to)
z = NscZoneSecondary(self, name, reverse_for=reverse_for, primary_server=secondary_to, inherit_config=inherit_config, **kwargs)
- self.zones[name] = z
+ key = (name, z.config.view)
+ assert key not in self.zones
+ self.zones[key] = z
return z
def __getitem__(self, name: str) -> NscZone:
- return self.zones[name]
+ return self.zones[name, ""]
+
+ def get_zone(self, name: str, view: str = "") -> NscZone:
+ return self.zones[name, view]
def _reverse_zone_name(self, net: IPNetwork) -> str:
if isinstance(net, IPv4Network):