]> mj.ucw.cz Git - pynsc.git/blob - nsconfig/cli.py
Top-of-file comments
[pynsc.git] / nsconfig / cli.py
1 # PyNSC: Command-line interface
2 # (c) 2024 Martin Mareš <mj@ucw.cz>
3
4 import argparse
5 from argparse import Namespace
6 from pathlib import Path
7 from texttable import Texttable
8
9 from nsconfig.core import Nsc, NscZonePrimary, NscZoneSecondary, NscZoneAlias
10
11
12 def do_test(nsc: Nsc) -> None:
13     test_dir = Path('test')
14     test_dir.mkdir(exist_ok=True)
15     for z in nsc.get_zones():
16         print(f'Zone:        {z.name}')
17         print(f'Type:        {z.zone_type.name}')
18         if isinstance(z, NscZonePrimary):
19             if z.aliases:
20                 aliases = ' '.join([alias.name for alias in z.aliases])
21                 print(f'Aliases:     {aliases}')
22             print(f'Old serial:  {z.prev_state.serial}')
23             print(f'Old hash:    {z.prev_state.hash}')
24             print(f'New serial:  {z.state.serial}')
25             print(f'New hash:    {z.state.hash}')
26             out_file = test_dir / z.safe_name
27             print(f'Dumping to:  {out_file}')
28             with open(out_file, 'w') as f:
29                 z.dump(file=f)
30         elif isinstance(z, NscZoneSecondary):
31             print(f'Primary:     {z.primary_server}')
32         elif isinstance(z, NscZoneAlias):
33             print(f'Alias for:   {z.alias_for.name}')
34         print()
35
36     if nsc.daemon:
37         conf_file = test_dir / 'daemon.conf'
38         print(f'Dumping daemon config to {conf_file}')
39         with open(conf_file, 'w') as f:
40             nsc.daemon.dump_config(file=f)
41
42
43 def do_status(nsc: Nsc, args: Namespace) -> None:
44     table = Texttable(max_width=0)
45     table.header(['S', 'Zone', 'Type', 'Status'])
46     table.set_deco(Texttable.HEADER)
47
48     for z in nsc.get_zones():
49         if z.is_changed():
50             action = '*'
51         else:
52             action = ""
53         do_show = args.all
54         if isinstance(z, NscZonePrimary):
55             status = f'serial {z.prev_state.serial}'
56             if z.is_changed():
57                 status += f' -> {z.state.serial}'
58             do_show = True
59         elif isinstance(z, NscZoneSecondary):
60             status = f'from {z.primary_server}'
61         elif isinstance(z, NscZoneAlias):
62             status = f'for "{z.alias_for.name}"'
63         else:
64             raise NotImplementedError()
65         name = z.name
66         if len(name) > 30 and not args.long:
67             name = name[:16] + '[...]' + name[-16:]
68         if do_show:
69             table.add_row([action, name, z.zone_type.name, status])
70
71     print(table.draw())
72
73
74 def do_update(nsc: Nsc) -> None:
75     nsc.daemon.write_config()
76
77     for z in nsc.get_zones():
78         if isinstance(z, NscZonePrimary) and z.is_changed():
79             print(f'Updating zone {z.name} (serial {z.state.serial})')
80             z.write_zone()
81             nsc.daemon.reload_zone(z)
82             for alias in z.aliases:
83                 nsc.daemon.reload_zone(z)
84             z.write_state()
85
86     nsc.daemon.reload_daemon()
87
88
89 def main(nsc: Nsc) -> None:
90     parser = argparse.ArgumentParser(description='Configure name server')
91     subparsers = parser.add_subparsers(help='action to perform', dest='action', required=True, metavar='ACTION')
92
93     test_parser = subparsers.add_parser('test', help='test new configuration', description='Test new configuration')
94
95     status_parser = subparsers.add_parser('status', help='list status of zones', description='List status of zones')
96     status_parser.add_argument('-a', '--all', default=False, action='store_true', help='show non-primary zones')
97     status_parser.add_argument('-l', '--long', default=False, action='store_true', help='do not abbreviate zone names')
98
99     update_parser = subparsers.add_parser('update', help='update configuration', description='Update zone files and daemon configuration as needed')
100
101     args = parser.parse_args()
102
103     nsc.process()
104
105     if args.action == 'test':
106         do_test(nsc)
107     elif args.action == 'status':
108         do_status(nsc, args)
109     elif args.action == 'update':
110         do_update(nsc)