X-Git-Url: https://git.ralfj.de/zonemaker.git/blobdiff_plain/4b221937f5e8ca0fd6bc8d3331529f3502615b38..HEAD:/zone.py diff --git a/zone.py b/zone.py index 1e1251a..db3bf1a 100644 --- a/zone.py +++ b/zone.py @@ -21,7 +21,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import re, datetime +import re, datetime, os, subprocess, re #from typing import * @@ -33,7 +33,7 @@ week = 7*day REGEX_label = r'[a-zA-Z90-9]([a-zA-Z90-9-]{0,61}[a-zA-Z90-9])?' # max. 63 characters; must not start or end with hyphen REGEX_ipv4 = r'^\d{1,3}(\.\d{1,3}){3}$' -REGEX_ipv6 = r'^[a-fA-F0-9]{1,4}(:[a-fA-F0-9]{1,4}){7}$' +REGEX_ipv6 = r'^[a-fA-F0-9]{1,4}(::?[a-fA-F0-9]{1,4}){1,7}$' def check_label(label: str) -> str: label = str(label) @@ -101,6 +101,20 @@ def column_widths(datas: 'Sequence', widths: 'Sequence[int]'): # last data point return result+str(datas[-1]) +def concatenate(root, path): + if path == '' or root == '': + raise Exception("Empty domain name is not valid") + if path == '@': + return root + if root == '@' or path.endswith('.'): + return path + return path+"."+root + +def escape_TXT(text): + for c in ('\\', '\"'): + text = text.replace(c, '\\'+c) + return text + ## Enums class Protocol: @@ -131,15 +145,7 @@ class RR: return self def relativize(self, root): - def _relativize(path): - if path == '' or root == '': - raise Exception("Empty domain name is not valid") - if path == '@': - return root - if root == '@' or path.endswith('.'): - return path - return path+"."+root - return self.mapPath(_relativize) + return self.mapPath(lambda path: concatenate(root, path)) def mapTTL(self, f): '''Run the current TTL and the recordType through f.''' @@ -181,13 +187,17 @@ class TXT: for c in ('\n', '\r', '\t'): if c in text: raise Exception("TXT record {0} contains invalid character") - # escape text - for c in ('\\', '\"'): - text = text.replace(c, '\\'+c) self._text = text def generate_rr(self): - return RR('@', 'TXT', '"{0}"'.format(self._text)) + text = escape_TXT(self._text) + # split into chunks of max. 255 characters; be careful not to split right after a backslash + chunks = re.findall(r'.{0,254}[^\\]', text) + assert sum(len(c) for c in chunks) == len (text) + chunksep = '"\n' + ' '*20 + '"' + chunked = '( "' + chunksep.join(chunks) + '" )' + # generate the chunks + return RR('@', 'TXT', chunked) class DKIM(TXT): # helper class to treat DKIM more antively @@ -245,10 +255,38 @@ class TLSA: self._selector = int(selector) self._matching_type = int(matching_type) self._data = check_hex(data) + + def from_crt(protocol: str, port: int, crtfile: str): + '''Generate a TLSA record from a given certificate file.''' + open(crtfile).close() # check if the file exists (and throw python-style exceptions if it dos not) + # Call the shell script to do the actual work + dir = os.path.dirname(os.path.realpath(__file__)) + cmd = [dir+"/tlsa", crtfile] + #print(" ".join(cmd), file=sys.stderr) + zone_line = subprocess.check_output(cmd).decode("utf-8").strip().split("\n")[-1] + m = re.match("^([0-9]+) ([0-9]+) ([0-9]+) ([0-9a-zA-Z]+)$", zone_line) + assert m is not None + # make sure we match on *the key only*, so that we can renew the certificate without harm + assert int(m.group(1)) == TLSA.Usage.EndEntity + assert int(m.group(2)) == TLSA.Selector.SubjectPublicKeyInfo + return TLSA(protocol, port, TLSA.Usage.EndEntity, TLSA.Selector.SubjectPublicKeyInfo, int(m.group(3)), m.group(4)) def generate_rr(self): return RR('_{}._{}'.format(self._port, self._protocol), 'TLSA', '{} {} {} {}'.format(self._usage, self._selector, self._matching_type, self._data)) +class CAA: + class Tag: + Issue = "issue" + IssueWild = "issuewild" + + def __init__(self, flag: int, tag: str, value: str) -> None: + self._flag = int(flag) + self._tag = str(tag) + self._value = str(value) + + def generate_rr(self): + return RR('@', 'CAA', '{} {} {}'.format(self._flag, self._tag, self._value)) + class CNAME: def __init__(self, name: str) -> None: @@ -295,12 +333,12 @@ def CName(name: str) -> Name: return Name(CNAME(name)) -def Delegation(name: str) -> Name: - return Name(NS(name)) +def Delegation(*names) -> Name: + return Name(list(map(NS, names))) -def SecureDelegation(name: str, tag: int, alg: int, digest: int, key: str) -> Name: - return Name(NS(name), DS(tag, alg, digest, key)) +def SecureDelegation(tag: int, alg: int, digest: int, key: str, *names) -> Name: + return Name(DS(tag, alg, digest, key), list(map(NS, names))) class Zone: @@ -346,6 +384,20 @@ class Zone: # be done return cur_serial + @staticmethod + def generate_rrs_from_dict(root, domains): + for name in sorted(domains.keys(), key=lambda s: s.split('.')): + if name.endswith('.'): + raise Exception("You are trying to add a record outside of your zone. This is not supported. Use '@' for the zone root.") + domain = domains[name] + name = concatenate(root, name) + if isinstance(domain, dict): + for rr in Zone.generate_rrs_from_dict(name, domain): + yield rr + else: + for rr in domain.generate_rrs(): + yield rr.relativize(name) + def generate_rrs(self) -> 'Iterator': # SOA record serial = self.inc_serial() @@ -360,11 +412,8 @@ class Zone: for name in self._NS: yield NS(name).generate_rr() # all the rest - for name in sorted(self._domains.keys(), key=lambda s: list(reversed(s.split('.')))): - if name.endswith('.'): - raise Exception("You are trying to add a record outside of your zone. This is not supported. Use '@' for the zone root.") - for rr in self._domains[name].generate_rrs(): - yield rr.relativize(name) + for rr in Zone.generate_rrs_from_dict('@', self._domains): + yield rr def write(self) -> None: print(";; {} zone file, generated by zonemaker on {}".format(self._name, datetime.datetime.now()))