separate init and config resolution; dump configs

separate the initialization process from the resolution
of configurations
add logic to dump configuration file formatted outputs, as well
This commit is contained in:
Martin A. Brown 2016-02-22 12:30:34 -08:00
parent a959db5485
commit c1c4b2b79c
1 changed files with 122 additions and 62 deletions

View File

@ -5,7 +5,6 @@ from __future__ import absolute_import, division, print_function
import os
import sys
import functools
from argparse import ArgumentParser, ArgumentError, Namespace
from argparse import _UNRECOGNIZED_ARGS_ATTR
@ -173,7 +172,7 @@ def prepend_tag(base, d, sep=CLISEP):
newd = dict()
tag = ''.join((base, sep))
for k, v in d.items():
newd[''.join((tag, k.upper()))] = v
newd[''.join((tag, k))] = v
return newd
@ -398,82 +397,76 @@ class CascadingConfig(object):
synonym = opt.replace(NSSEP, CLISEP)
argparser.add_argument(opt, synonym, action='store_true')
# -- remember some attributes
self.tag = tag
self.order = order
self.argparser = argparser
self.parser = argparser.parse_known_args_no_defaults
self.argv = argv
self.env = env
self.configfile = configfile
self.order = order
self.defaults = argparser.parse_args([]) # -- "compiled-in" defaults
self.cli, _ = self.parser(argv)
self.environment, _ = self.parser(argv_from_env(env, tag))
@property
def config(self):
self.read_defaults()
self.read_cli()
self.read_environment()
self.read_systemconfig()
self.read_userconfig()
self.resolve()
self.ccrequest()
return self._config
syscfg = getattr(self.defaults, configfile, None)
def read_defaults(self):
'''read the defaults that the developer set in the ArgumentParser'''
self.defaults = self.argparser.parse_args([])
def read_cli(self):
'''read configuration supplied by CLI (argv from constructor)'''
parser = self.argparser.parse_known_args_no_defaults
self.cli, self.cli_extras = parser(self.argv)
def read_environment(self):
'''read relevant environment variables'''
parser = self.argparser.parse_known_args_no_defaults
self.environment, extras = parser(argv_from_env(self.env, self.tag))
self.environment_extras = extras
def read_systemconfig(self):
'''read the specified system configuration file'''
parser = self.argparser.parse_known_args_no_defaults
syscfg = getattr(self.defaults, self.configfile, None)
self.syscfg = syscfg
if syscfg is not None:
self.systemconfig, _ = self.parser(argv_from_cfg(syscfg, tag))
self.systemconfig, extras = parser(argv_from_cfg(syscfg, self.tag))
self.systemconfig_extras = extras
else:
self.systemconfig = Namespace()
self.systemconfig_extras = list()
candidates = list()
candidates.append(('cli', getattr(self.cli, configfile, None)))
candidates.append(('env', getattr(self.environment, configfile, None)))
for source, usrcfg in candidates:
def read_userconfig(self):
'''read a single user specified configuration file'''
parser = self.argparser.parse_known_args_no_defaults
maybe = list()
maybe.append(('cli', getattr(self.cli, self.configfile, None)))
maybe.append(('env', getattr(self.environment, self.configfile, None)))
for source, usrcfg in maybe:
if usrcfg is None:
continue
elif usrcfg == syscfg:
elif usrcfg == self.syscfg:
logger.info("Skipping systemconfig file %s in userconfig (%s)",
syscfg, source)
self.syscfg, source)
continue
else:
logger.debug("Using %s for user config", usrcfg)
break
del candidates
del maybe
if usrcfg is None:
self.userconfig = Namespace()
self.userconfig_extras = list()
else:
logger.debug("Reading %s for user config", usrcfg)
self.userconfig, _ = self.parser(argv_from_cfg(usrcfg, tag))
self.resolve()
# -- clean up after ourselves (and report in, if asked)
#
diagfunc = False
for opt in self.mine:
opt = opt.lstrip(CLISEP)
if getattr(self.config, opt, False):
diagfunc = getattr(self, opt)
delattr(self.config, opt)
if diagfunc:
sys.exit(diagfunc())
def dump_env(self):
d = dict_from_ns(self.config)
d = prepend_tag(self.tag.upper(), d, sep=ENVSEP)
for k, v in d.items():
if isinstance(v, (list, tuple)):
v = MULTIVALUESEP.join(v)
print('{}={}'.format(k, v))
return 0
def dump_cfg(self):
d = dict_from_ns(self.config)
return 0
def dump_cli(self):
d = dict_from_ns(self.config)
cli = list()
for k, v in d.items():
k = ''.join(('--', k.replace(NSSEP, CLISEP)))
if isinstance(v, (list, tuple)):
for val in v:
cli.extend((k, str(val)))
else:
cli.extend((k, str(v)))
print(' '.join(cli))
return 0
def debug_options(self):
return 0
self.userconfig, extras = parser(argv_from_cfg(usrcfg, self.tag))
self.userconfig_extras = extras
def resolve(self, order=None):
if order is None:
@ -487,10 +480,77 @@ class CascadingConfig(object):
logger.debug("Source %s: %s=%s", sourcename, name, newval)
oldval = getattr(config, name, None)
if oldval is not None:
logger.info("Source %s: replacing %s=%s with %s=%s",
sourcename, name, oldval, name, newval)
logger.debug("Source %s: replacing %s=%s with %s=%s",
sourcename, name, oldval, name, newval)
setattr(config, name, newval)
self.config = config
self._config = config
def dump_env(self):
d = dict_from_ns(self._config)
d = prepend_tag(self.tag, d, sep=ENVSEP)
d = dict([(k.upper(), v) for k, v in d.items()])
for k, v in sorted(d.items()):
if isinstance(v, (list, tuple)):
v = MULTIVALUESEP.join([str(x) for x in v])
print('{}={}'.format(k, v))
return 0
def dump_cfg(self):
d = dict_from_ns(self._config)
d = prepend_tag(self.tag, d, sep=CFGSEP)
cfg = ConfigParser()
for k, v in d.items():
k = k.replace(NSSEP, CFGSEP)
parts = k.split(CFGSEP)
assert len(parts) >= 2
if 2 == len(parts):
sect, field = parts[0], CFGSEP.join(parts[1:])
else:
sect = CFGSEP.join(parts[0:2])
field = CFGSEP.join(parts[2:])
if not cfg.has_section(sect):
cfg.add_section(sect)
if isinstance(v, (list, tuple)):
vstr = ',\n'.join([str(x) for x in v])
cfg.set(sect, field, vstr)
else:
cfg.set(sect, field, str(v))
cfg.write(sys.stdout)
return 0
def dump_cli(self):
d = dict_from_ns(self._config)
cli = list()
for k, v in d.items():
k = ''.join(('--', k.replace(NSSEP, CLISEP)))
if isinstance(v, (list, tuple)):
for val in v:
cli.extend((k, str(val)))
else:
cli.extend((k, str(v)))
print(' '.join(cli))
return 0
def debug_options(self):
import pprint
for k, v in vars(self).items():
if isinstance(v, Namespace):
print('\n'.join(('', k, '----------')))
pprint.pprint(vars(v))
else:
print('\n'.join(('', k, '----------')))
pprint.pprint(v)
return 0
def ccrequest(self):
diagfunc = False
for opt in self.mine:
opt = opt.lstrip(CLISEP)
if getattr(self._config, opt, False):
diagfunc = getattr(self, opt)
delattr(self._config, opt)
if diagfunc:
sys.exit(diagfunc())
def sample(args):