#!/usr/bin/env python3 # Copyright (c) 2014, Ralf Jung # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # 1. Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR # ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #============================================================================== import urllib.request, socket, sys, argparse, os, configparser, itertools, subprocess, re, ssl import dns, dns.resolver VERBOSE_CHANGE = 1 VERBOSE_FULL = 2 def sslContext(config): if config['DEFAULT'].get('ssl_check_cert', 'yes').lower() in ('0', 'false', 'no'): context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) return context else: return None def readConfig(fname, defSection = 'DEFAULT'): config = configparser.ConfigParser() with open(fname) as file: stream = itertools.chain(("["+defSection+"]\n",), file) config.read_file(stream) return config def getConfigDir(): try: from xdg import BaseDirectory return os.path.join(BaseDirectory.xdg_config_home, "dyn-nsupdate") except ImportError: return os.path.expanduser("~/.config/dyn-nsupdate") def urlopen(url, config): if sys.version_info >= (3, 4, 3): return urllib.request.urlopen(url, context=sslContext(config)).read().decode('utf-8').strip('\n') else: return urllib.request.urlopen(url).read().decode('utf-8').strip('\n') def getMyIP(family, config, methods = {}, verbose = 0): '''Returns our current IP address ( can be "IPv4" or "IPv6"), detected as given by the configuration. Additional detection methods can be supplied via .''' method = config[family]['method'] if method == 'none': return None elif method == 'remove': return "" elif method == 'web': server = config[family].get('server', config['DEFAULT']['server']) url = 'https://'+server+'/checkip' try: ip = urlopen(url, config) except urllib.error.URLError: raise Exception("Error fetching {}, make sure the URL is correct and the internet connection actually works.".format(url)) if verbose >= VERBOSE_FULL: print("Server",server,"says my",family,"is",ip) return ip elif method in methods: return methods[method]() else: raise Exception("Unsupported "+family+" detection method: "+method) def getMyIPv4(config, verbose = 0): '''Returns our current IPv4 address, detected as given by the configuration''' return getMyIP("IPv4", config, verbose=verbose) def getMyIPv6(config, verbose = 0): '''Returns our current IPv6 address, detected as given by the configuration''' def local(): device = config["IPv6"].get("device") out = subprocess.check_output(["ip", "addr", "show"] + ([] if device is None else ["dev", device])) for line in out.decode('utf-8').split('\n'): m = re.search('inet6 ([a-fA-F0-9:]+)/64 ([a-zA-Z0-9 ]*)', line) if m is not None: ip = m.group(1) flags = m.group(2).split() if not 'temporary' in flags and not 'deprecated' in flags and not "link" in flags: if verbose >= VERBOSE_FULL: print("Local IPv6 detected to be",ip) return ip raise Exception("Unable to detect correct local IPv6 address") return getMyIP("IPv6", config, methods={'local': local}, verbose=verbose) def getResolver(server): '''Return a resovler with the given server (defined by DNS name)''' addr = socket.getaddrinfo(server, None, family=socket.AF_INET) addr = addr[0][4][0] res = dns.resolver.Resolver() res.nameservers = [addr] return res def getCurIP(domain, rtype, res): '''Return the current IP of the given . can be A or AAAA.''' try: return res.query(domain, rtype)[0].address except dns.exception.DNSException: # domain not found return "" def updateDomain(server, domain, ipv4, ipv6, password, config, verbose): '''Update the given domain, using the server, password. ipv4 or ipv6 can be None to not update that record, or strings with the respective addresses. Updates ae only performed if necessary. Returns True on success, False on failure.''' assert ipv4 is not None or ipv6 is not None # check what the domain is currently mapped to res = getResolver(server) if verbose >= VERBOSE_FULL: print("Resolving names using {}".format(res.nameservers)) curIPv4 = getCurIP(domain, 'A', res) curIPv6 = getCurIP(domain, 'AAAA', res) if verbose >= VERBOSE_FULL: print("Current status of domain {} is: IPv4 address '{}', IPv6 address '{}'".format(domain, curIPv4, curIPv6)) # check if there's something to do needUpdate = (ipv4 is not None and curIPv4 != ipv4) or (ipv6 is not None and curIPv6 != ipv6) if not needUpdate: if verbose >= VERBOSE_FULL: print("Everything already up-to-date, nothing to do") return True # we need to update the IP url = 'https://'+server+'/update?password='+urllib.parse.quote(password)+'&domain='+urllib.parse.quote(domain) expected = "good" if ipv4 is not None: url += '&ip='+urllib.parse.quote(ipv4) expected += " "+ipv4 if ipv6 is not None: url += '&ipv6='+urllib.parse.quote(ipv6) expected += " "+ipv6 if verbose >= VERBOSE_FULL: print("Request:",url) result = urlopen(url, config) # did everything go as planned? if result == expected: if verbose >= VERBOSE_CHANGE: msg = "Successfully updated domain {} on {}:".format(domain, server) if ipv4 is not None: if curIPv4 == ipv4: msg += " IPv4={} (unchanged)".format(curIPv4) else: msg += " IPv4={} -> {}".format(curIPv4, ipv4) if ipv4 is not None and ipv6 is not None: msg += "," if ipv6 is not None: if curIPv6 == ipv6: msg += " IPv6={} (unchanged)".format(curIPv6) else: msg += " IPv6={} -> {}".format(curIPv6, ipv6) print(msg) # all went all right return True else: # Something went wrong print("Unexpected answer from server",server,"while updating",domain) print("Got '{}', expected '{}'".format(result, expected)) return False if __name__ == "__main__": # allow overwriting some values on the command-line parser = argparse.ArgumentParser(description='Update a domain managed by a dyn-nsupdate server') parser.add_argument("-c", "--config", dest="config", default=os.path.join(getConfigDir(), "dyn-ns-client.conf"), help="The configuration file") parser.add_argument("-v", "--verbose", action="count", dest="verbose", default=0, help="Be more verbose") args = parser.parse_args() # read config if not os.path.isfile(args.config): raise Exception("The config file does not exist: "+args.config) config = readConfig(args.config) # get our own addresses myIPv4 = getMyIPv4(config, args.verbose) myIPv6 = getMyIPv6(config, args.verbose) # update all the domains exitcode = 0 domains = map(str.strip, config['DEFAULT']['domains'].split(',')) if not domains: raise Exception("No domain given to update!") for domain in domains: if not updateDomain(config['DEFAULT']['server'], domain, myIPv4, myIPv6, config['DEFAULT']['password'], config, verbose=args.verbose): exitcode = 1 sys.exit(exitcode)