Coverage for lib/ansible/cli/__init__.py : 52%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> # (c) 2016, Toshio Kuratomi <tkuratomi@ansible.com> # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
except ImportError: from ansible.utils.display import Display display = Display()
'''Optparser which sorts the options by opt before outputting --help'''
self.option_list.sort(key=operator.methodcaller('get_opt_string')) return optparse.OptionParser.format_help(self, formatter=None)
# Note: Inherit from SortedOptParser so that we get our format_help method '''Ignore invalid options.
Meant for the special case where we need to take care of help and version but may not know the full range of options yet. (See it in use in set_action) ''' # Since this is special purposed to just handle help and version, we # take a pre-existing option parser here and set our options from # that. This allows us to give accurate help based on the given # option parser. SortedOptParser.__init__(self, usage=parser.usage, option_list=parser.option_list, option_class=parser.option_class, conflict_handler=parser.conflict_handler, description=parser.description, formatter=parser.formatter, add_help_option=False, prog=parser.prog, epilog=parser.epilog) self.version = parser.version
try: optparse.OptionParser._process_long_opt(self, rargs, values) except optparse.BadOptionError: pass
try: optparse.OptionParser._process_short_opts(self, rargs, values) except optparse.BadOptionError: pass
''' code behind bin/ansible* programs '''
# -F (quit-if-one-screen) -R (allow raw ansi control chars) # -S (chop long lines) -X (disable termcap init and de-init)
""" Base init method for all command line programs """
""" Get the action the user wants to execute from the sys argv list. """ for i in range(0, len(self.args)): arg = self.args[i] if arg in self.VALID_ACTIONS: self.action = arg del self.args[i] break
if not self.action: # if we're asked for help or version, we don't need an action. # have to use a special purpose Option Parser to figure that out as # the standard OptionParser throws an error for unknown options and # without knowing action, we only know of a subset of the options # that could be legal for this command tmp_parser = InvalidOptsParser(self.parser) tmp_options, tmp_args = tmp_parser.parse_args(self.args) if not(hasattr(tmp_options, 'help') and tmp_options.help) or (hasattr(tmp_options, 'version') and tmp_options.version): raise AnsibleOptionsError("Missing required action")
""" Actually runs a child defined method using the execute_<action> pattern """ fn = getattr(self, "execute_%s" % self.action) fn()
def run(self): """Run the ansible command
Subclasses must implement this method. It does the actual work of running an Ansible command. """
else: display.v(u"No config file found; using defaults")
# warn about deprecated config options name = deprecated[0] why = deprecated[1]['why'] if 'alternatives' in deprecated[1]: alt = ', use %s instead' % deprecated[1]['alternatives'] else: alt = '' ver = deprecated[1]['version'] display.deprecated("%s option, %s %s" % (name, why, alt), version=ver)
# warn about typing issues with configuration entries display.warning("Unable to set correct type for configuration entry: %s" % unable)
def split_vault_id(vault_id): # return (before_@, after_@) # if no @, return whole string as after_ if '@' not in vault_id: return (None, vault_id)
parts = vault_id.split('@', 1) ret = tuple(parts) return ret
ask_vault_pass=None, create_new_password=None, auto_prompt=True):
# convert vault_password_files into vault_ids slugs id_slug = u'%s@%s' % (C.DEFAULT_VAULT_IDENTITY, password_file)
# note this makes --vault-id higher precendence than --vault-password-file # if we want to intertwingle them in order probably need a cli callback to populate vault_ids # used by --vault-id and --vault-password-file vault_ids.append(id_slug)
# if an action needs an encrypt password (create_new_password=True) and we dont # have other secrets setup, then automatically add a password prompt as well. # prompts cant/shouldnt work without a tty, so dont add prompt secrets
id_slug = u'%s@%s' % (C.DEFAULT_VAULT_IDENTITY, u'prompt_ask_vault_pass') vault_ids.append(id_slug)
# TODO: remove the now unused args ask_vault_pass=None, create_new_password=False, auto_prompt=True): # list of tuples
# Depending on the vault_id value (including how --ask-vault-pass / --vault-password-file create a vault_id) # we need to show different prompts. This is for compat with older Towers that expect a # certain vault password prompt format, so 'promp_ask_vault_pass' vault_id gets the old format.
# If there are configured default vault identities, they are considered 'first' # so we prepend them to vault_ids (from cli) here
vault_password_files.append(C.DEFAULT_VAULT_PASSWORD_FILE)
prompt_formats['prompt'] = ['New vault password (%(vault_id)s): ', 'Confirm vew vault password (%(vault_id)s): '] # 2.3 format prompts for --ask-vault-pass prompt_formats['prompt_ask_vault_pass'] = ['New Vault password: ', 'Confirm New Vault password: '] else: # The format when we use just --ask-vault-pass needs to match 'Vault password:\s*?$'
vault_password_files, ask_vault_pass, create_new_password, auto_prompt=auto_prompt)
vault_id_name, vault_id_value = CLI.split_vault_id(vault_id_slug) if vault_id_value in ['prompt', 'prompt_ask_vault_pass']:
# --vault-id some_name@prompt_ask_vault_pass --vault-id other_name@prompt_ask_vault_pass will be a little # confusing since it will use the old format without the vault id in the prompt built_vault_id = vault_id_name or C.DEFAULT_VAULT_IDENTITY
# choose the prompt based on --vault-id=prompt or --ask-vault-pass. --ask-vault-pass # always gets the old format for Tower compatibility. # ie, we used --ask-vault-pass, so we need to use the old vault password prompt # format since Tower needs to match on that format. prompted_vault_secret = PromptVaultSecret(prompt_formats=prompt_formats[vault_id_value], vault_id=built_vault_id)
# a empty or invalid password from the prompt will warn and continue to the next # without erroring globablly try: prompted_vault_secret.load() except AnsibleError as exc: display.warning('Error in vault password prompt (%s): %s' % (vault_id_name, exc)) raise
vault_secrets.append((built_vault_id, prompted_vault_secret))
# update loader with new secrets incrementally, so we can load a vault password # that is encrypted with a vault secret provided earlier loader.set_vault_secrets(vault_secrets) continue
# assuming anything else is a password file display.vvvvv('Reading vault password file: %s' % vault_id_value) # read vault_pass from a file file_vault_secret = get_file_vault_secret(filename=vault_id_value, vault_id=vault_id_name, loader=loader)
# an invalid password file will error globally try: file_vault_secret.load() except AnsibleError as exc: display.warning('Error in vault password file loading (%s): %s' % (vault_id_name, exc)) raise
if vault_id_name: vault_secrets.append((vault_id_name, file_vault_secret)) else: vault_secrets.append((C.DEFAULT_VAULT_IDENTITY, file_vault_secret))
# update loader with as-yet-known vault secrets loader.set_vault_secrets(vault_secrets)
''' prompt for connection and become passwords if needed '''
sshpass = getpass.getpass(prompt="SSH password: ") become_prompt = "%s password[defaults to SSH password]: " % become_prompt_method if sshpass: sshpass = to_bytes(sshpass, errors='strict', nonstring='simplerepr') else:
becomepass = getpass.getpass(prompt=become_prompt) if op.ask_pass and becomepass == '': becomepass = sshpass if becomepass: becomepass = to_bytes(becomepass) except EOFError: pass
''' this keeps backwards compatibility with sudo/su self.options '''
display.deprecated('The %s command line option has been deprecated in favor of the "become" command line arguments' % which, '2.6')
pass self.options.become = True self.options.become_method = 'sudo' _dep('sudo') self.options.become = True self.options.become_method = 'su' _dep('su')
# other deprecations: _dep('sudo') _dep('su')
''' check for conflicting options '''
# Check for vault related conflicts self.parser.error("--ask-vault-pass and --vault-password-file are mutually exclusive")
if (op.new_vault_id and op.new_vault_password_file): self.parser.error("--new-vault-password-file and --new-vault-id are mutually exclusive")
# Check for privilege escalation conflicts (op.su or op.su_user) and (op.become or op.become_user) or (op.sudo or op.sudo_user) and (op.become or op.become_user)):
self.parser.error("Sudo arguments ('--sudo', '--sudo-user', and '--ask-sudo-pass') and su arguments ('--su', '--su-user', and '--ask-su-pass') " "and become arguments ('--become', '--become-user', and '--ask-become-pass') are exclusive of each other")
self.parser.error("The number of processes (--forks) must be >= 1")
def unfrack_paths(option, opt, value, parser): paths = getattr(parser.values, option.dest) if paths is None: paths = []
if isinstance(value, string_types): paths[:0] = [unfrackpath(x) for x in value.split(os.pathsep) if x] elif isinstance(value, list): paths[:0] = [unfrackpath(x) for x in value if x] else: pass # FIXME: should we raise options error?
setattr(parser.values, option.dest, paths)
def unfrack_path(option, opt, value, parser): if value != '-': setattr(parser.values, option.dest, unfrackpath(value)) else: setattr(parser.values, option.dest, value)
async_opts=False, connect_opts=False, subset_opts=False, check_opts=False, inventory_opts=False, epilog=None, fork_opts=False, runas_prompt_opts=False, desc=None, basedir_opts=False, vault_rekey_opts=False): ''' create an options parser for most ansible scripts '''
# base opts help="verbose mode (-vvv for more, -vvvv to enable connection debugging)")
help="specify inventory host path or comma separated host list. --inventory-file is deprecated") help='outputs a list of matching hosts; does not execute anything else') help='further limit selected hosts to an additional pattern')
help="prepend colon-separated path(s) to module library (default=%s)" % C.DEFAULT_MODULE_PATH, action="callback", callback=CLI.unfrack_paths, type='str') help="set additional variables as key=value or YAML/JSON, if filename prepend with @", default=[])
help="specify number of parallel processes to use (default=%s)" % C.DEFAULT_FORKS)
help='ask for vault password') help="vault password file", action="callback", callback=CLI.unfrack_paths, type='string') help='the vault identity to use')
parser.add_option('--new-vault-password-file', default=None, dest='new_vault_password_file', help="new vault password file for rekey", action="callback", callback=CLI.unfrack_path, type='string') parser.add_option('--new-vault-id', default=None, dest='new_vault_id', type='string', help='the new vault identity to use for rekey')
help="only run plays and tasks tagged with these values") help="only run plays and tasks whose tags do not match these values")
parser.add_option('-o', '--one-line', dest='one_line', action='store_true', help='condense output') parser.add_option('-t', '--tree', dest='tree', default=None, help='log output to this directory')
help='ask for connection password') help='use this file to authenticate the connection', action="callback", callback=CLI.unfrack_path, type='string') help='connect as this user (default=%s)' % C.DEFAULT_REMOTE_USER) help="connection type to use (default=%s)" % C.DEFAULT_TRANSPORT) help="override the connection timeout in seconds (default=%s)" % C.DEFAULT_TIMEOUT) help="specify common arguments to pass to sftp/scp/ssh (e.g. ProxyCommand)") help="specify extra arguments to pass to sftp only (e.g. -f, -l)") help="specify extra arguments to pass to scp only (e.g. -l)") help="specify extra arguments to pass to ssh only (e.g. -R)")
# priv user defaults to root later on to enable detecting when this option was given here help="run operations with sudo (nopasswd) (deprecated, use become)") help='desired sudo user (default=root) (deprecated, use become)') help='run operations with su (deprecated, use become)') help='run operations with su as this user (default=%s) (deprecated, use become)' % C.DEFAULT_SU_USER)
# consolidated privilege escalation (become) help="run operations with become (does not imply password prompting)") help="privilege escalation method to use (default=%s), valid choices: [ %s ]" % (C.DEFAULT_BECOME_METHOD, ' | '.join(C.BECOME_METHODS))) help='run operations as this user (default=%s)' % C.DEFAULT_BECOME_USER)
runas_group = rg help='ask for sudo password (deprecated, use become)') help='ask for su password (deprecated, use become)') help='ask for privilege escalation password')
parser.add_option('-P', '--poll', default=C.DEFAULT_POLL_INTERVAL, type='int', dest='poll_interval', help="set the poll interval if using -B (default=%s)" % C.DEFAULT_POLL_INTERVAL) parser.add_option('-B', '--background', dest='seconds', type='int', default=0, help='run asynchronously, failing after X seconds (default=N/A)')
help="don't make any changes; instead, try to predict some of the changes that may occur") help="perform a syntax check on the playbook, but do not execute it") help="when changing (small) files and templates, show the differences in those files; works great with --check")
help="run handlers even if a task fails") help="clear the fact cache for every host in inventory")
parser.add_option('--playbook-dir', default=None, dest='basedir', action='store', help="Since this tool does not use playbooks, use this as a subsitute playbook directory." "This sets the relative path for many features including roles/ group_vars/ etc.")
def parse(self): """Parse the command line args
This method parses the command line arguments. It uses the parser stored in the self.parser attribute and saves the args and options in self.args and self.options respectively.
Subclasses need to implement this method. They will usually create a base_parser, add their own options to the base_parser, and then call this method to do the actual parsing. An implementation will look something like this::
def parse(self): parser = super(MyCLI, self).base_parser(usage="My Ansible CLI", inventory_opts=True) parser.add_option('--my-option', dest='my_option', action='store') self.parser = parser super(MyCLI, self).parse() # If some additional transformations are needed for the # arguments and options, do it here. """
# process tags # optparse defaults does not do what's expected if len(self.options.tags) > 1: display.deprecated('Specifying --tags multiple times on the command line currently uses the last specified value. ' 'In 2.4, values will be merged instead. Set merge_multiple_cli_tags=True in ansible.cfg to get this behavior now.', version=2.5, removed=False) self.options.tags = [self.options.tags[-1]]
# process skip_tags if not C.MERGE_MULTIPLE_CLI_TAGS: if len(self.options.skip_tags) > 1: display.deprecated('Specifying --skip-tags multiple times on the command line currently uses the last specified value. ' 'In 2.4, values will be merged instead. Set merge_multiple_cli_tags=True in ansible.cfg to get this behavior now.', version=2.5, removed=False) self.options.skip_tags = [self.options.skip_tags[-1]]
skip_tags = set() for tag_set in self.options.skip_tags: for tag in tag_set.split(u','): skip_tags.add(tag.strip()) self.options.skip_tags = list(skip_tags)
# process inventory options except for CLIs that require their own processing
# should always be list self.options.inventory = [self.options.inventory]
# Ensure full paths when needed else: self.options.inventory = C.DEFAULT_HOST_LIST
def version(prog): ''' return ansible version ''' cpath = "Default w/o overrides" else:
''' return full ansible version info ''' # expensive call, user with care ansible_version_string = CLI.version('') else: ansible_versions[counter] = 0 except: pass for counter in range(len(ansible_versions), 3): ansible_versions.append(0) 'full': ansible_version, 'major': ansible_versions[0], 'minor': ansible_versions[1], 'revision': ansible_versions[2]}
def _git_repo_info(repo_path): ''' returns a string containing git branch, commit id and commit date ''' # Check if the .git is a file. If it is a file, it means that we are in a submodule structure. try: gitdir = yaml.safe_load(open(repo_path)).get('gitdir') # There is a possibility the .git file to have an absolute path. if os.path.isabs(gitdir): repo_path = gitdir else: repo_path = os.path.join(repo_path[:-4], gitdir) except (IOError, AttributeError): return '' else: branch_path = None else: # detached HEAD commit = line[:10] branch = 'detached HEAD' branch_path = os.path.join(repo_path, "HEAD")
offset = time.timezone else: else: result = ''
def _gitinfo(): return result tokens = line.strip().split(' ') if tokens[0] == 'path': submodule_path = tokens[2] submodule_info = CLI._git_repo_info(os.path.join(basedir, submodule_path, '.git')) if not submodule_info: submodule_info = ' not found - use git submodule update --init ' + submodule_path result += "\n {0}: {1}".format(submodule_path, submodule_info)
''' find reasonable way to display text ''' # this is a much simpler form of what is in pydoc.py if not sys.stdout.isatty(): display.display(text, screen_only=True) elif 'PAGER' in os.environ: if sys.platform == 'win32': display.display(text, screen_only=True) else: self.pager_pipe(text, os.environ['PAGER']) else: p = subprocess.Popen('less --version', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p.communicate() if p.returncode == 0: self.pager_pipe(text, 'less') else: display.display(text, screen_only=True)
def pager_pipe(text, cmd): ''' pipe text through a pager ''' if 'LESS' not in os.environ: os.environ['LESS'] = CLI.LESS_OPTS try: cmd = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=sys.stdout) cmd.communicate(input=to_bytes(text)) except IOError: pass except KeyboardInterrupt: pass
def tty_ify(cls, text):
t = cls._ITALIC.sub("`" + r"\1" + "'", text) # I(word) => `word' t = cls._BOLD.sub("*" + r"\1" + "*", t) # B(word) => *word* t = cls._MODULE.sub("[" + r"\1" + "]", t) # M(word) => [word] t = cls._URL.sub(r"\1", t) # U(word) => word t = cls._CONST.sub("`" + r"\1" + "'", t) # C(word) => `word'
return t
def _play_prereqs(options):
# all needs loader
loader.set_basedir(basedir)
vault_ids=vault_ids, vault_password_files=options.vault_password_files, ask_vault_pass=options.ask_vault_pass, auto_prompt=False)
# create the inventory, and filter it based on the subset specified (if any)
# create the variable manager, which will be shared throughout # the code, ensuring a consistent view of global variables
# load vars from cli options
# Empty inventory display.warning("provided hosts list is empty, only localhost is available. Note that the implicit localhost does not match 'all'") no_hosts = True
raise AnsibleError("Specified hosts and/or --limit does not match any hosts")
|