Coverage for lib/ansible/parsing/dataloader.py : 57%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
# (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> # Copyright: (c) 2017, Ansible Project # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
# Make coding more python3-ish
# Tries to determine if a path is inside a role, last dir must be 'tasks' # this is not perfect but people should really avoid 'tasks' dirs outside roles when using Ansible.
''' The DataLoader class is used to load and parse YAML or JSON content, either from a given file name or from a string that was previously read in through other means. A Vault password can be specified, and any vault-encrypted files will be decrypted.
Data read from files will also be cached, so the file will never be read from disk more than once.
Usage:
dl = DataLoader() # optionally: dl.set_vault_password('foo') ds = dl.load('...') ds = dl.load_from_file('/path/to/file') '''
# initialize the vault stuff with an empty password # TODO: replace with a ref to something that can get the password # a creds/auth provider # self.set_vault_password(None)
# TODO: since we can query vault_secrets late, we could provide this to DataLoader init
'''Backwards compat for now'''
''' Loads data from a file, which can contain either JSON or YAML. '''
# if the file has already been read in and cached, we'll # return those results to avoid more file/vault operations else: # read the file contents and load the data structure from them
# cache the file contents for next time
return parsed_data else: # return a deep copy here, so the cache is not affected
path = self.path_dwim(path) return os.listdir(path)
'''is the given path executable?''' path = self.path_dwim(path) return is_executable(path)
'''Decrypt b_vault_data if encrypted and return b_data and the show_content flag'''
b_ciphertext, b_version, cipher_name, vault_id = parse_vaulttext_envelope(b_vault_data) b_data = self._vault.decrypt(b_vault_data, filename=b_file_name)
show_content = False return b_data, show_content
''' Reads the file contents from the given file name
If the contents are vault-encrypted, it will decrypt them and return the decrypted data
:arg file_name: The name of the file to read. If this is a relative path, it will be expanded relative to the basedir :raises AnsibleFileNotFOund: if the file_name does not refer to a file :raises AnsibleParserError: if we were unable to read the file :return: Returns a byte string of the file contents ''' raise AnsibleParserError("Invalid filename: '%s'" % str(file_name))
# This is what we really want but have to fix unittests to make it pass # if not os.path.exists(b_file_name) or not os.path.isfile(b_file_name): raise AnsibleFileNotFound("Unable to retrieve file contents", file_name=file_name)
except (IOError, OSError) as e: raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (file_name, str(e)), orig_exc=e)
''' returns the current basedir '''
''' sets the base directory, used to find files when a relative path is given '''
''' make relative paths work like folks expect. '''
else:
''' imperfect role detection, roles are still valid w/o tasks|meta/main.yml|yaml|etc '''
RE_TASKS.search(path) and os.path.exists(os.path.join(b_path, b_main)) or os.path.exists(os.path.join(b_upath, b_tasked)) or os.path.exists(os.path.join(os.path.dirname(b_path), b_tasked)) ): return False
''' find one file in either a role or playbook dir with or without explicitly named dirname subdirs
Used in action plugins and lookups to find supplemental files that could be in either place. '''
# I have full path, nothing else needs to be looked at else: # base role/play path + templates/files/vars + relative filename
# not told if role, but detect if it is a role and if so make sure you get correct base path
basedir = unfrackpath(os.path.dirname(path), follow=False)
# resolved base role/play path + templates/files/vars + relative filename
# look in role's tasks dir w/o dirname
# try to create absolute path for loader basedir + templates/files/vars + filename
# try to create absolute path for loader basedir
# try to create absolute path for dirname + filename
# try to create absolute path for filename
''' find one file in first path in stack taking roles into account and adding play basedir as fallback
:arg paths: A list of text strings which are the paths to look for the filename in. :arg dirname: A text string representing a directory. The directory is prepended to the source to form the path to search for. :arg source: A text string which is the filename to search for :rtype: A text string :returns: An absolute path to the filename ``source`` if found :raises: An AnsibleFileNotFound Exception if the file is found to exist in the search paths '''
display.warning('Invalid request to find a file that matches a "null" value') # path is absolute, no relative needed, check existence and return source else: display.debug(u'evaluation_path:\n\t%s' % '\n\t'.join(paths)) for path in paths: upath = unfrackpath(path, follow=False) b_upath = to_bytes(upath, errors='surrogate_or_strict') b_mydir = os.path.dirname(b_upath)
# if path is in role and 'tasks' not there already, add it into the search if (is_role or self._is_role(path)) and b_mydir.endswith(b'tasks'): search.append(os.path.join(os.path.dirname(b_mydir), b_dirname, b_source)) search.append(os.path.join(b_mydir, b_source)) else: # don't add dirname if user already is using it in source if b_source.split(b'/')[0] != dirname: search.append(os.path.join(b_upath, b_dirname, b_source)) search.append(os.path.join(b_upath, b_source))
# always append basedir as last resort # don't add dirname if user already is using it in source if b_source.split(b'/')[0] != dirname: search.append(os.path.join(to_bytes(self.get_basedir()), b_dirname, b_source)) search.append(os.path.join(to_bytes(self.get_basedir()), b_source))
display.debug(u'search_path:\n\t%s' % to_text(b'\n\t'.join(search))) for b_candidate in search: display.vvvvv(u'looking for "%s" at "%s"' % (source, to_text(b_candidate))) if os.path.exists(b_candidate): result = to_text(b_candidate) break
raise AnsibleFileNotFound(file_name=source, paths=[to_text(p) for p in search])
''' Create a tempfile containing defined content ''' fd, content_tempfile = tempfile.mkstemp() f = os.fdopen(fd, 'wb') content = to_bytes(content) try: f.write(content) except Exception as err: os.remove(content_tempfile) raise Exception(err) finally: f.close() return content_tempfile
""" If the file is vault encrypted return a path to a temporary decrypted file If the file is not encrypted then the path is returned Temporary files are cleanup in the destructor """
if not file_path or not isinstance(file_path, (binary_type, text_type)): raise AnsibleParserError("Invalid filename: '%s'" % to_native(file_path))
b_file_path = to_bytes(file_path, errors='surrogate_or_strict') if not self.path_exists(b_file_path) or not self.is_file(b_file_path): raise AnsibleFileNotFound(file_name=file_path)
real_path = self.path_dwim(file_path)
try: if decrypt: with open(to_bytes(real_path), 'rb') as f: # Limit how much of the file is read since we do not know # whether this is a vault file and therefore it could be very # large. if is_encrypted_file(f, count=len(b_HEADER)): # if the file is encrypted and no password was specified, # the decrypt call would throw an error, but we check first # since the decrypt function doesn't know the file name data = f.read() if not self._vault.secrets: raise AnsibleParserError("A vault password or secret must be specified to decrypt %s" % to_native(file_path))
data = self._vault.decrypt(data, filename=real_path) # Make a temp file real_path = self._create_content_tempfile(data) self._tempfiles.add(real_path)
return real_path
except (IOError, OSError) as e: raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (to_native(real_path), to_native(e)), orig_exc=e)
""" Removes any temporary files created from a previous call to get_real_file. file_path must be the path returned from a previous call to get_real_file. """ if file_path in self._tempfiles: os.unlink(file_path) self._tempfiles.remove(file_path)
try: self.cleanup_tmp_file(f) except Exception as e: display.warning("Unable to cleanup temp files: %s" % to_native(e)) |