-#!/usr/bin/python3
+#!/usr/bin/env python3
# Copyright (c) 2014, Ralf Jung <post@ralfj.de>
# All rights reserved.
#
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-#
-# The views and conclusions contained in the software and documentation are those
-# of the authors and should not be interpreted as representing official policies,
-# either expressed or implied, of the FreeBSD Project.
-#==============================================================================
-# configuration variables
-domains = ['test.dyn.example.com'] # list of domains to update
-password = 'some_secure_password'
-haveIPv4 = True
-haveIPv6 = False
-
-serverIPv4 = 'ipv4.ns.example.com' # Only needed if haveIPv4 is True. This server should NOT have an AAAA record!
-serverIPv6 = 'ipv6.ns.example.com' # Only needed if haveIPv6 is True. This server should NOT have an A record!
-server = 'ns.example.com'
-# END of configuration variables
#==============================================================================
-import urllib.request, socket, sys, argparse
+import urllib.request, socket, sys, argparse, os, configparser, itertools, subprocess, re, ssl
+import dns, dns.resolver
-def urlopen(url):
- return urllib.request.urlopen(url).read().decode('utf-8').strip()
+VERBOSE_CHANGE = 1
+VERBOSE_FULL = 2
+
+def sslContext(config):
+ if config['DEFAULT'].get('ssl_check_cert', 'yes').lower() in ('0', 'false', 'no'):
+ context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ return context
+ else:
+ return None
-def getMyIP(server):
- return urlopen('https://'+server+'/checkip')
+def readConfig(fname, defSection = 'DEFAULT'):
+ config = configparser.ConfigParser()
+ with open(fname) as file:
+ stream = itertools.chain(("["+defSection+"]\n",), file)
+ config.read_file(stream)
+ return config
-def getCurIP(domain, family):
+def getConfigDir():
try:
- addr = socket.getaddrinfo(domain, None, family=family)
- return addr[0][4][0]
- except socket.gaierror: # domain not found
+ from xdg import BaseDirectory
+ return os.path.join(BaseDirectory.xdg_config_home, "dyn-nsupdate")
+ except ImportError:
+ return os.path.expanduser("~/.config/dyn-nsupdate")
+
+def urlopen(url, config):
+ if sys.version_info >= (3, 4, 3):
+ return urllib.request.urlopen(url, context=sslContext(config)).read().decode('utf-8').strip('\n')
+ else:
+ return urllib.request.urlopen(url).read().decode('utf-8').strip('\n')
+
+def getMyIP(family, config, methods = {}, verbose = 0):
+ '''Returns our current IP address (<family> can be "IPv4" or "IPv6"), detected as given by the configuration.
+ Additional detection methods can be supplied via <methods>.'''
+ method = config[family]['method']
+ if method == 'none':
+ return None
+ elif method == 'remove':
return ""
+ elif method == 'web':
+ server = config[family].get('server', config['DEFAULT']['server'])
+ url = 'https://'+server+'/checkip'
+ try:
+ ip = urlopen(url, config)
+ except urllib.error.URLError:
+ raise Exception("Error fetching {}, make sure the URL is correct and the internet connection actually works.".format(url))
+ if verbose >= VERBOSE_FULL:
+ print("Server",server,"says my",family,"is",ip)
+ return ip
+ elif method in methods:
+ return methods[method]()
+ else:
+ raise Exception("Unsupported "+family+" detection method: "+method)
-def getCurIPv4(domain):
- return getCurIP(domain, socket.AF_INET)
+def getMyIPv4(config, verbose = 0):
+ '''Returns our current IPv4 address, detected as given by the configuration'''
+ return getMyIP("IPv4", config, verbose=verbose)
-def getCurIPv6(domain):
- return getCurIP(domain, socket.AF_INET6)
+def getMyIPv6(config, verbose = 0):
+ '''Returns our current IPv6 address, detected as given by the configuration'''
+ def local():
+ device = config["IPv6"].get("device")
+ out = subprocess.check_output(["ip", "addr", "show"] + ([] if device is None else ["dev", device]))
+ for line in out.decode('utf-8').split('\n'):
+ m = re.search('inet6 ([a-fA-F0-9:]+)/64 ([a-zA-Z0-9 ]*)', line)
+ if m is not None:
+ ip = m.group(1)
+ flags = m.group(2).split()
+ if not 'temporary' in flags and not 'deprecated' in flags and not "link" in flags:
+ if verbose >= VERBOSE_FULL:
+ print("Local IPv6 detected to be",ip)
+ return ip
+ raise Exception("Unable to detect correct local IPv6 address")
+ return getMyIP("IPv6", config, methods={'local': local}, verbose=verbose)
-def update_domain(server, domain, ipv4, ipv6, password, verbose):
- '''Update the given domain, using the server, password. ipv4 or ipv6 can be None to not update that record. Returns True on success, False on failure.'''
+def getResolver(server):
+ '''Return a resovler with the given server (defined by DNS name)'''
+ addr = socket.getaddrinfo(server, None, family=socket.AF_INET)
+ addr = addr[0][4][0]
+ res = dns.resolver.Resolver()
+ res.nameservers = [addr]
+ return res
+
+def getCurIP(domain, rtype, res):
+ '''Return the current IP of the given <domain>. <rtype> can be A or AAAA.'''
+ try:
+ return res.query(domain, rtype)[0].address
+ except dns.exception.DNSException: # domain not found
+ return ""
+
+def updateDomain(server, domain, ipv4, ipv6, password, config, verbose):
+ '''Update the given domain, using the server, password. ipv4 or ipv6 can be None to not update that record, or strings with the respective addresses.
+ Updates ae only performed if necessary.
+ Returns True on success, False on failure.'''
assert ipv4 is not None or ipv6 is not None
# check what the domain is currently mapped to
- curIPv4 = getCurIPv4(domain)
- curIPv6 = getCurIPv6(domain)
- if verbose:
- print("Current status of domain {0} is: IPv4 address '{1}', IPv6 address '{2}'".format(domain, curIPv4, curIPv6))
+ res = getResolver(server)
+ if verbose >= VERBOSE_FULL:
+ print("Resolving names using {}".format(res.nameservers))
+ curIPv4 = getCurIP(domain, 'A', res)
+ curIPv6 = getCurIP(domain, 'AAAA', res)
+ if verbose >= VERBOSE_FULL:
+ print("Current status of domain {} is: IPv4 address '{}', IPv6 address '{}'".format(domain, curIPv4, curIPv6))
# check if there's something to do
needUpdate = (ipv4 is not None and curIPv4 != ipv4) or (ipv6 is not None and curIPv6 != ipv6)
if not needUpdate:
- if verbose:
- print("Everything alread up-to-date, nothing to do")
+ if verbose >= VERBOSE_FULL:
+ print("Everything already up-to-date, nothing to do")
return True
# we need to update the IP
if ipv6 is not None:
url += '&ipv6='+urllib.parse.quote(ipv6)
expected += " "+ipv6
- result = urlopen(url)
+ if verbose >= VERBOSE_FULL:
+ print("Request:",url)
+ result = urlopen(url, config)
# did everything go as planned?
if result == expected:
- if verbose:
- print("Successfully updated domain",domain)
+ if verbose >= VERBOSE_CHANGE:
+ msg = "Successfully updated domain {} on {}:".format(domain, server)
+ if ipv4 is not None:
+ if curIPv4 == ipv4:
+ msg += " IPv4={} (unchanged)".format(curIPv4)
+ else:
+ msg += " IPv4={} -> {}".format(curIPv4, ipv4)
+ if ipv4 is not None and ipv6 is not None:
+ msg += ","
+ if ipv6 is not None:
+ if curIPv6 == ipv6:
+ msg += " IPv6={} (unchanged)".format(curIPv6)
+ else:
+ msg += " IPv6={} -> {}".format(curIPv6, ipv6)
+ print(msg)
# all went all right
return True
else:
# Something went wrong
print("Unexpected answer from server",server,"while updating",domain)
- print(result)
+ print("Got '{}', expected '{}'".format(result, expected))
return False
if __name__ == "__main__":
# allow overwriting some values on the command-line
parser = argparse.ArgumentParser(description='Update a domain managed by a dyn-nsupdate server')
- parser.add_argument("-p", "--password",
- dest="password", default=password,
- help="The password used to update the domains")
+ parser.add_argument("-c", "--config",
+ dest="config", default=os.path.join(getConfigDir(), "dyn-ns-client.conf"),
+ help="The configuration file")
parser.add_argument("-v", "--verbose",
- action="store_true", dest="verbose",
+ action="count", dest="verbose", default=0,
help="Be more verbose")
- parser.add_argument("domains", metavar='DOMAIN', nargs='*', default=domains,
- help="The domains to update")
args = parser.parse_args()
+
+ # read config
+ if not os.path.isfile(args.config):
+ raise Exception("The config file does not exist: "+args.config)
+ config = readConfig(args.config)
- # get our own IPv4
- if haveIPv4:
- myIPv4 = getMyIP(serverIPv4)
- if args.verbose:
- print("My IPv4 is",myIPv4)
- else:
- myIPv4 = None
- # and IPv6
- if haveIPv6:
- myIPv6 = getMyIP(serverIPv6)
- if args.verbose:
- print("My IPv6 is",myIPv6)
- else:
- myIPv6 = None
+ # get our own addresses
+ myIPv4 = getMyIPv4(config, args.verbose)
+ myIPv6 = getMyIPv6(config, args.verbose)
# update all the domains
exitcode = 0
- for domain in args.domains:
- if not update_domain(server, domain, myIPv4, myIPv6, args.password, verbose=args.verbose):
+ domains = map(str.strip, config['DEFAULT']['domains'].split(','))
+ if not domains:
+ raise Exception("No domain given to update!")
+ for domain in domains:
+ if not updateDomain(config['DEFAULT']['server'], domain, myIPv4, myIPv6, config['DEFAULT']['password'], config, verbose=args.verbose):
exitcode = 1
sys.exit(exitcode)