Source code for tank.util.filesystem

# Copyright (c) 2016 Shotgun Software Inc.
# CONFIDENTIAL AND PROPRIETARY
#
# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit
# Source Code License included in this distribution package. See LICENSE.
# By accessing, using, copying or modifying this work you indicate your
# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights
# not expressly granted therein are reserved by Shotgun Software Inc.

"""
Utility methods for manipulating files and folders
"""

import os
import re
import sys
import errno
import stat
import shutil
import datetime
import functools
import subprocess
from contextlib import contextmanager

from .. import LogManager
from .platforms import is_linux, is_macos, is_windows
from tank_vendor import six

log = LogManager.get_logger(__name__)

# files or directories to skip if no skip_list is specified
SKIP_LIST_DEFAULT = [".svn", ".git", ".gitignore", ".hg", ".hgignore"]


[docs]def with_cleared_umask(func): """ Decorator which clears the umask for a method. The umask is a permissions mask that gets applied whenever new files or folders are created. For I/O methods that have a permissions parameter, it is important that the umask is cleared prior to execution, otherwise the default umask may alter the resulting permissions, for example:: def create_folders(path, permissions=0777): log.debug("Creating folder %s..." % path) os.makedirs(path, permissions) The 0777 permissions indicate that we want folders to be completely open for all users (a+rwx). However, the umask overrides this, so if the umask for example is set to 0777, meaning that I/O operations are not allowed to create files that are readable, executable or writable for users, groups or others, the resulting permissions on folders created by create folders will be 0, despite passing in 0777 permissions. By adding this decorator to the method, we temporarily reset the umask to 0, thereby giving full control to any permissions operation to take place without any restriction by the umask:: @with_cleared_umask def create_folders(path, permissions=0777): # Creates folders with the given permissions, # regardless of umask setting. log.debug("Creating folder %s..." % path) os.makedirs(path, permissions) """ @functools.wraps(func) def wrapper(*args, **kwargs): # set umask to zero, store old umask old_umask = os.umask(0) try: # execute method payload return func(*args, **kwargs) finally: # set mask back to previous value os.umask(old_umask) return wrapper
[docs]def compute_folder_size(path): """ Computes and returns the size of the given folder. :param path: folder to compute size for :return: size in bytes """ total_size = 0 for dirpath, dirnames, filenames in os.walk(path): for f in filenames: fp = os.path.join(dirpath, f) total_size += os.path.getsize(fp) return total_size
[docs]@with_cleared_umask def touch_file(path, permissions=0o666): """ Touch a file and optionally set its permissions. :param path: path to touch :param permissions: Optional permissions to set on the file. Default value is 0666, creating a file that is readable and writable for all users. :raises: OSError - if there was a problem reading/writing the file """ if not os.path.exists(path): try: fh = open(path, "wb") fh.close() os.chmod(path, permissions) except OSError as e: # Race conditions are perfectly possible on some network storage # setups so make sure that we ignore any file already exists errors, # as they are not really errors! if e.errno != errno.EEXIST: raise
[docs]@with_cleared_umask def ensure_folder_exists(path, permissions=0o775, create_placeholder_file=False): """ Helper method - creates a folder and parent folders if such do not already exist. :param path: path to create :param permissions: Permissions to use when folder is created :param create_placeholder_file: If true, a placeholder file will be generated. :raises: OSError - if there was a problem creating the folder """ if not os.path.exists(path): try: os.makedirs(path, permissions) if create_placeholder_file: ph_path = os.path.join(path, "placeholder") if not os.path.exists(ph_path): fh = open(ph_path, "wt") fh.write( "This file was automatically generated by Flow Production Tracking.\n\n" ) fh.write( "The placeholder file is needed when managing toolkit configurations\n" ) fh.write( "in source control packages such as git and perforce. These systems\n" ) fh.write( "do not handle empty folders so a placeholder file is required for the \n" ) fh.write("folder to be tracked and managed properly.\n") fh.close() except OSError as e: # Race conditions are perfectly possible on some network storage setups # so make sure that we ignore any file already exists errors, as they # are not really errors! if e.errno != errno.EEXIST: # re-raise raise
[docs]@with_cleared_umask def copy_file(src, dst, permissions=0o666): """ Copy file and sets its permissions. :param src: Source file :param dst: Target destination :param permissions: Permissions to use for target file. Default permissions will be readable and writable for all users. """ if is_windows(): src = os.path.realpath(src) # Check if the dst is a directory and change it to a filename if it is if os.path.isdir(dst): basename = os.path.basename(src) dst = os.path.join(dst, basename) dst = os.path.realpath(dst) # Use larger copy buffer in the shutil.copyfileobj operation with open(src, mode="rb") as windows_src: with open(dst, mode="wb") as windows_dst: shutil.copyfileobj(windows_src, windows_dst, length=16 * 1024 * 1024) log.debug("Used shutil override on Windows") else: shutil.copy(src, dst) os.chmod(dst, permissions)
[docs]def safe_delete_file(path): """ Deletes the given file if it exists. Ignores any errors raised in the process and logs them as warnings. If the user does not have sufficient permissions to remove the file, nothing will happen, it will simply be skipped over. :param path: Full path to file to remove """ try: if os.path.exists(path): # on windows, make sure file is not read-only if is_windows(): # make sure we have write permission attr = os.stat(path)[0] if not attr & stat.S_IWRITE: os.chmod(path, stat.S_IWRITE) os.remove(path) except Exception as e: log.warning("File '%s' could not be deleted, skipping: %s" % (path, e))
[docs]@with_cleared_umask def copy_folder(src, dst, folder_permissions=0o775, skip_list=None): """ Alternative implementation to ``shutil.copytree`` Copies recursively and creates folders if they don't already exist. Always skips system files such as ``"__MACOSX"``, ``".DS_Store"``, etc. Files will the extension ``.sh``, ``.bat`` or ``.exe`` will be given executable permissions. Returns a list of files that were copied. :param src: Source path to copy from :param dst: Destination to copy to :param folder_permissions: permissions to use for new folders :param skip_list: List of file names to skip. If this parameter is omitted or set to None, common files such as ``.git``, ``.gitignore`` etc will be ignored. :returns: List of files copied """ # files or directories to always skip SKIP_LIST_ALWAYS = ["__MACOSX", ".DS_Store"] # compute full skip list # note: we don't do # actual_skip_list = skip_list or SKIP_LIST_DEFAULT # because we want users to be able to pass in # skip_list=[] in order to clear the default skip list. if skip_list is None: actual_skip_list = list(SKIP_LIST_DEFAULT) else: actual_skip_list = list(skip_list) # add the items we always want to skip actual_skip_list.extend(SKIP_LIST_ALWAYS) files = [] if not os.path.exists(dst): os.mkdir(dst, folder_permissions) names = os.listdir(src) for name in names: # get rid of system files if name in actual_skip_list: continue srcname = os.path.join(src, name) dstname = os.path.join(dst, name) try: if os.path.isdir(srcname): files.extend(copy_folder(srcname, dstname, folder_permissions)) else: shutil.copy(srcname, dstname) files.append(srcname) # if the file extension is sh, set executable permissions if ( dstname.endswith(".sh") or dstname.endswith(".bat") or dstname.endswith(".exe") ): try: # make it readable and executable for everybody os.chmod(dstname, 0o775) except Exception as e: log.error( "Can't set executable permissions on %s: %s" % (dstname, e) ) except (IOError, os.error) as e: raise IOError("Can't copy %s to %s: %s" % (srcname, dstname, e)) return files
[docs]@with_cleared_umask def move_folder(src, dst, folder_permissions=0o775): """ Moves a directory. First copies all content into target. Then deletes all content from sources. Skips files that won't delete. .. note:: The source folder itself is not deleted, it is just emptied, if possible. :param src: Source path to copy from :param dst: Destination to copy to :param folder_permissions: permissions to use for new folders """ if os.path.exists(src): log.debug("Moving directory: %s -> %s" % (src, dst)) # first copy the content in the core folder src_files = copy_folder( src, dst, folder_permissions, skip_list=[] # copy all files ) # now clear out the install location log.debug("Clearing out source location...") for f in src_files: try: # on windows, ensure all files are writable if is_windows(): attr = os.stat(f)[0] if not attr & stat.S_IWRITE: # file is readonly! - turn off this attribute os.chmod(f, stat.S_IWRITE) os.remove(f) except Exception as e: log.warning("Could not delete file %s: %s" % (f, e))
[docs]@with_cleared_umask def backup_folder(src, dst=None): """ Moves the given directory into a backup location. By default, the folder will be renamed by simply giving it a timestamp based suffix. Optionally, it can be moved into a different location. - ``backup_folder("/foo/bar")`` will move ``/foo/bar`` to ``/foo/bar.20160912_200426`` - ``backup_folder("/foo/bar", "/tmp")`` will move ``/foo/bar`` to ``/tmp/bar.20160912_200426`` :param src: Folder to move :param dst: Optional backup folder """ if os.path.exists(src): timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") if dst is None: backup_path = "%s.%s" % (src, timestamp) else: backup_path = os.path.join( dst, "%s.%s" % (os.path.basename(src), timestamp) ) move_folder(src, backup_path)
[docs]def create_valid_filename(value): """ Create a sanitized file name given a string. Replaces spaces and other characters with underscores 'my lovely name ' -> 'my_lovely_name' :param value: String value to sanitize :returns: sanitized string """ # regex to find non-word characters - in ascii land, that is [^A-Za-z0-9_-.] # note that we use a unicode expression, meaning that it will include other # "word" characters, not just A-Z. exp = re.compile(r"[^\w\.-]", re.UNICODE) # strip trailing whitespace value = value.strip() if isinstance(value, six.text_type): # src is unicode, so return unicode return exp.sub("_", value) else: # source is non-unicode. # assume utf-8 encoding so decode, replace # and re-encode the returned result # so that we return a string u_src = value.decode("utf-8") return exp.sub("_", u_src).encode("utf-8")
def get_permissions(path): """ Retrieve the file system permissions for the file or folder in the given path. :param filename: Path to the file to be queried for permissions :returns: permissions bits of the file :raises: OSError - if there was a problem retrieving permissions for the path """ return stat.S_IMODE(os.stat(path)[stat.ST_MODE])
[docs]def safe_delete_folder(path): """ Deletes a folder and all of its contents recursively, even if it has read-only items. .. note:: Problems deleting any items will be reported as warnings in the log output but otherwise ignored and skipped; meaning the function will continue deleting as much as it can. :param path: File system path to location to the folder to be deleted """ def _on_rm_error(func, path, exc_info): """ Error function called whenever shutil.rmtree fails to remove a file system item. Exceptions raised by this function will not be caught. :param func: The function which raised the exception; it will be: os.path.islink(), os.listdir(), os.remove() or os.rmdir(). :param path: The path name passed to function. :param exc_info: The exception information return by sys.exc_info(). """ if func == os.unlink or func == os.remove or func == os.rmdir: try: attr = get_permissions(path) if not (attr & stat.S_IWRITE): os.chmod(path, stat.S_IWRITE | attr) try: func(path) except Exception as e: log.warning("Could not delete %s: %s. Skipping" % (path, e)) else: log.warning("Could not delete %s: Skipping" % path) except Exception as e: log.warning("Could not delete %s: %s. Skipping" % (path, e)) else: log.warning("Could not delete %s. Skipping." % path) if os.path.exists(path): try: # On Windows, Python's shutil can't delete read-only files, so if we were trying to delete one, # remove the flag. # Inspired by http://stackoverflow.com/a/4829285/1074536 shutil.rmtree(path, onerror=_on_rm_error) except Exception as e: log.warning("Could not delete %s: %s" % (path, e)) else: log.warning("Could not delete: %s. Folder does not exist" % path)
[docs]def get_unused_path(base_path): """ Return an unused file path from the given base path by appending if needed a number at the end of the basename of the path, right before the first ".", if any. For example, ``/tmp/foo_1.bar.blah`` would be returned for ``/tmp/foo.bar.blah`` if it already exists. If the given path does not exist, the original path is returned. .. note:: The returned path is not _reserved_, so it is possible that other processes could create the returned path before it is used by the caller. :param str base_path: Target path. :returns: A string. """ if not os.path.exists(base_path): # Bail out quickly if everything is fine with the path return base_path # Split the base path and find an unused path folder, basename = os.path.split(base_path) # Split the basename at the first ".", if any. Make sure we always have at least # two entries. base_parts = basename.split(".", 1) + [""] numbering = 0 while True: numbering += 1 name = "%s_%d%s" % ( base_parts[0], numbering, ".%s" % base_parts[1] if base_parts[1] else "", ) path = os.path.join(folder, name) log.debug("Checking if %s exists..." % path) if not os.path.exists(path): break return path
@with_cleared_umask @contextmanager def auto_created_yml(path): """ A context manager for opening/closing an auto-generated yaml file. - clears umask - any existing files will be removed - the given path will be open for writing in text mode - a standard header will be added Usage example:: with filesystem.auto_created_yml(yaml_path) as fh: fh.write("foobar: blah") # fh is automatically closed upon exiting the context :param path: path to yml file to open for writing :return: file handle. """ log.debug("Creating auto-generated config file %s" % path) # clean out any existing file and replace it with a new one. safe_delete_file(path) with open(path, "w+") as fh: fh.write("# This file was auto generated by SGTK.\n") fh.write( "# Please do not modify by hand as it may be overwritten at any point.\n" ) fh.write( "# Created %s\n" % datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") ) fh.write("# \n") # on entering the context yield fh # on exiting the context fh.write("\n") fh.write("# End of file.\n") def open_file_browser(path): """ Opens the given path in the operating system file browser such as Windows explorer or Macosx Finder. If the path points at a file, the method will attempt to highlight the file. :param path: A path to a file or folder. :raises: RuntimeError: If the Platform is not supported or if the file browser couldn't be launched. :raises: ValueError: If the path is not a valid directory. """ if os.path.isfile(path): # open the file, attempt to select in finder/explorer _open_file_browser_for_file(path) elif os.path.isdir(path): # open the folder in finder/explorer _open_file_browser_for_folder(path) else: # possibly we have an image sequence and therefore its a symbolic path, # instead check to see if the parent folder of the path is valid and # try opening that. # # TODO: The issue with this logic is that possibly it was a directory # that just didn't exist, so we would just be gathering the next # directory up. parent_dir = os.path.dirname(path) _open_file_browser_for_folder(parent_dir) def _open_file_browser_for_folder(path): """ This method will take a path to a folder and open it in an OS's file browser. :param path: A folder path :raises: RuntimeError: If the Platform is not supported or if the file browser couldn't be launched. :raises: ValueError: If the path is not a valid directory. """ log.debug("Launching file system browser for folder %s" % path) # Check that we don't have a file path. if not os.path.isdir(path): raise ValueError('The path "%s" is not a valid directory.' % path) # build the commands for opening the folder on the various OS's if is_linux(): cmd_args = ["xdg-open", path] elif is_macos(): cmd_args = ["open", path] elif is_windows(): cmd_args = ["cmd.exe", "/C", "start", path] else: raise RuntimeError("Platform '%s' is not supported." % sys.platform) log.debug("Executing command '%s'" % cmd_args) exit_code = subprocess.call(cmd_args) if exit_code != 0: raise RuntimeError( "Failed to launch a file browser for folder '%s'. " "Error code %s" % (path, exit_code) ) def _open_file_browser_for_file(path): """ This method will take a path to a file and open it in an OS's file browser and attempt to highlight it. :param path: A file path :raises: RuntimeError: If the Platform is not supported or if the file browser couldn't be launched. :raises: ValueError: If the path is not a valid file path. """ log.debug("Launching file system browser for file %s" % path) if not os.path.isfile(path): raise ValueError('The path "%s" is not a valid file path.' % path) if is_linux(): # note: there isn't a straight forward way to do # this on linux, so just open the directory instead. cmd_args = ["xdg-open", os.path.dirname(path)] elif is_macos(): cmd_args = ["open", "-R", path] elif is_windows(): # /select makes windows select the file within the explorer window # The problem with this approach is that it always returns back an error code of 1 even if it # does behave correctly. cmd_args = ["explorer", "/select,", path] else: raise Exception("Platform '%s' is not supported." % sys.platform) log.debug("Executing command '%s'" % cmd_args) exit_code = subprocess.call(cmd_args) # cannot trust exit code from windows, see above if not is_windows() and exit_code != 0: raise RuntimeError( "Failed to launch a file browser for file '%s'. " "Error code %s" % (path, exit_code) )