Source code for tank.util.shotgun_entity

# Copyright (c) 2013 Shotgun Software Inc.
#
# CONFIDENTIAL AND PROPRIETARY
#
# This work is provided "AS IS" and subject to the Shotgun Pipeline Toolkit
# Source Code License included in this distribution package. See LICENSE.
# By accessing, using, copying or modifying this work you indicate your
# agreement to the Shotgun Pipeline Toolkit Source Code License. All rights
# not expressly granted therein are reserved by Shotgun Software Inc.

"""
Utilities relating to Shotgun entities
"""

from . import constants, sgre as re
from ..errors import TankError
from tank_vendor import six

# A dictionary for Shotgun entities which do not store their name
# in the standard "code" field.
SG_ENTITY_SPECIAL_NAME_FIELDS = {
    "ActionMenuItem": "title",
    "ApiUser": "firstname",
    "Attachment": None,
    "Booking": None,
    "ClientUser": "name",
    "Delivery": "title",
    "Department": "name",
    "EventLogEntry": None,
    "HumanUser": "name",
    "Icon": "name",
    "Note": "subject",
    "Page": "name",
    "PageHit": None,
    "PageSetting": None,
    "PlaylistShare": None,
    "Project": "name",
    "PublishedFileDependency": None,
    "Reply": None,
    "Tag": "name",
    "Task": "content",
    "TaskDependency": None,
    "Ticket": "title",
    "TimeLog": None,
}


[docs]def get_sg_entity_name_field(entity_type): """ Return the Shotgun name field to use for the specified entity type. :param str entity_type: The entity type to get the name field for. :returns: The name field for the specified entity type. """ # Deal with some known special cases and assume "code" for anything else. return SG_ENTITY_SPECIAL_NAME_FIELDS.get(entity_type, "code")
def sg_entity_to_string(tk, sg_entity_type, sg_id, sg_field_name, data): """ Generates a string value given a Shotgun value. This logic is in a hook but it typically does conversions such as: * "foo" ==> "foo" * {"type":"Shot", "id":123, "name":"foo"} ==> "foo" * 123 ==> "123" * [{"type":"Shot", "id":1, "name":"foo"}, {"type":"Shot", "id":2, "name":"bar"}] ==> "foo_bar" This method may also raise exceptions in the case the string value is not valid. :param tk: Sgtk api instance :param sg_entity_type: the Shotgun entity type e.g. 'Shot' :param sg_id: The Shotgun id for the record, e.g 1234 :param sg_field_name: The field to generate value for, e.g. 'sg_sequence' :param data: The Shotgun entity data chunk that should be converted to a string. """ # call out to core hook return tk.execute_core_hook( constants.PROCESS_FOLDER_NAME_HOOK_NAME, entity_type=sg_entity_type, entity_id=sg_id, field_name=sg_field_name, value=data, ) class EntityExpression(object): """ Represents a name expression for a Shotgun entity. A name expression converts a pattern and a set of Shotgun data into a string: Expression Shotgun Entity Data String Result ---------------------------------------------------------------------------------------- * "code" + {"code": "foo_bar"} ==> "foo_bar" * "{code}_{asset_type}" + {"code": "foo_bar", "asset_type": "car"} ==> "foo_bar_car" * "{code}/{asset_type}" + {"code": "foo_bar", "asset_type": "car"} ==> "foo_bar/car" Optional fields are [bracketed]: * "{code}[_{asset_type}]" + {"code": "foo_bar", "asset_type": "car"} ==> "foo_bar_car" * "{code}[_{asset_type}]" + {"code": "foo_bar", "asset_type": None} ==> "foo_bar" Regular expressions can be used to evaluate substrings: * "{code:^([^_]+)}/{code:^[^_]+(.+)}" + {"code": "foo_bar"} ==> "foo/bar" It it is always connected to a specific Shotgun entity type and the fields need to be Shotgun fields that exists for that entity type. """ def __init__(self, tk, entity_type, field_name_expr): """ :param str entity_type: Associated Shotgun entity type. :param str field_name_expr: Expression, e.g. '{code}/foo' """ self._tk = tk self._entity_type = entity_type self._field_name_expr = field_name_expr # now validate if "{" not in field_name_expr: # simple form - surround with brackets to turn into a expression field_name_expr = "{%s}" % field_name_expr # now get all permutations for expressions with optional fields expr_variations = self._get_expression_variations(field_name_expr) # We want them most inclusive(longest) version first self._sorted_exprs = sorted(expr_variations, key=lambda x: len(x), reverse=True) # Extract and store a bunch of data for each variation. self._variations = {} for expr_variation in expr_variations: try: # find all field names, for example: # "{xx}_{yy}_{zz.xx}" ----> ["xx", "yy", "zz.xx"] # "{code:([^_]+)}_{yy}" --> ["code:([^_]+)", "yy"] fields = set(re.findall("{([^}]*)}", expr_variation)) except Exception as error: raise TankError( "Could not parse the configuration field '%s' - " "Error: %s" % (field_name_expr, error) ) # Go over fields strings and resolve tokens # parsing 'sg_sequence.Sequence.code:^([^_]+)' resolves into # { # 'full_field_name': 'sg_sequence.Sequence.code', # 'link_field_name': 'sg_sequence', # 'regex_obj': <regex obj> # } # parsing 'code' resolves into # { # 'full_field_name': 'code', # 'link_field_name': None, # 'regex_obj': None # } resolved_fields = [] for field_token_expression in fields: full_field_name = None link_field_name = None regex_obj = None if ":" in field_token_expression: (full_field_name, regex) = field_token_expression.split(":", 1) try: regex_obj = re.compile(regex, re.UNICODE) except Exception as e: raise TankError( "Could not parse regex in configuration " "field '%s': %s" % (field_name_expr, e) ) else: full_field_name = field_token_expression # extract the link for deep query fields if "." in full_field_name: link_field_name = full_field_name.split(".")[0] resolved_fields.append( { "token": field_token_expression, "full_field_name": full_field_name, "link_field_name": link_field_name, "regex_obj": regex_obj, } ) # add this to our variations dict self._variations[expr_variation] = resolved_fields def _get_expression_variations(self, definition): """ Returns all possible optional variations for an expression. "{foo}" ==> ['{foo}'] "{foo:[xxx]}_{bar}" ==> ['{foo:[xxx]}_{bar}'] "{foo}[_{bar}]" ==> ['{foo}', '{foo}_{bar}'] "{foo}_[{bar}_{baz}]" ==> ['{foo}_', '{foo}_{bar}_{baz}'] :param str definition: Expression to process. :returns: List of variations. See example above. """ # Split definition by optional sections # Look for square brackets that contains at least one # {expression} and ignore any square bracket inside # expressions: tokens = re.split(r"(\[[^\]]*\{.*\}[^\]]*\])", definition) # seed with empty string definitions = [""] for token in tokens: temp_definitions = [] if token == "": # regex return some blank strings, skip them continue if token.startswith("["): # Add definitions skipping this optional value temp_definitions = definitions[:] # strip brackets from token token = token[1:-1] # make defintions with token appended for definition in definitions: temp_definitions.append(definition + token) definitions = temp_definitions return definitions def get_shotgun_fields(self): """ Returns the Shotgun fields that are needed in order to build this name expression. Returns all fields, including optional. :returns: Set of Shotgun field names, e.g. ('code', 'sg_sequence.Sequence.code') """ # use the longest expression - this contains the most fields longest_expr = self._sorted_exprs[0] field_defs = self._variations[longest_expr] field_names = [field["full_field_name"] for field in field_defs] return set(field_names) def get_shotgun_link_fields(self): """ Returns a list of all entity links that are used in the name expression, including optional ones. For example, if a name expression for a Shot is '{code}_{sg_sequence.Sequence.code}', the link fields for this expression is ['sg_sequence']. :returns: Set of link fields, e.g. ('sg_sequence', 'entity') """ # use the longest exprssion - this contains the most fields longest_expr = self._sorted_exprs[0] field_defs = self._variations[longest_expr] link_names = [ field["link_field_name"] for field in field_defs if field["link_field_name"] is not None ] return set(link_names) def generate_name(self, values): """ Generates a name given some fields. Assumes the name will be used as a folder name and validates that the evaluated expression is suitable for disk use. :param dict values: Dictionary of values to use. :returns: Fully resolved name string. """ # first make sure that each field is valid for field_name in self.get_shotgun_fields(): if field_name not in values: # required value was not provided! raise TankError( "Folder Configuration Error: " "A PTR field '%s' is being requested as part of the expression " "'%s' when creating folders connected to entities of type %s, " "however no such field exists in Flow Production Tracking. Please review your " "configuration!" % (field_name, self._field_name_expr, self._entity_type) ) # ok all fields are there. But some values may be none. Try to resolve our expression against # the values, starting with the longest expression first. for expr in self._sorted_exprs: val = self._generate_name(expr, values) if val is not None: # name generation worked! - do not try alternative (shorter) expressions break if val is None: # completely failed to generate a name because of missing fields. # try to make a nice descriptive name if possible if "code" in values: nice_name = "%s %s (id %s)" % ( self._entity_type, values["code"], values.get("id"), ) else: nice_name = "%s id %s" % (self._entity_type, values.get("id")) raise TankError( "Folder Configuration Error. Could not create folders for %s! " "The expression %s refers to one or more values that are blank " "in PTR and a folder can therefore " "not be created." % (nice_name, self._field_name_expr) ) return val def _generate_name(self, expression, values): """ Generates a name given some fields. Assumes the name will be used as a folder name and validates that the evaluated expression is suitable for disk use. :param values: dictionary of values to use :returns: fully resolved name string or None if it cannot be resolved. """ field_defs = self._variations[expression] # convert Shotgun values to string values # key these by their full expression str_data = {} # get the Shotgun id from the Shotgun entity dict sg_id = values.get("id") # first make sure that each field is valid for field_def in field_defs: full_sg_field_name = field_def["full_field_name"] token = field_def["token"] # get value from Shotgun data dict raw_val = values.get(full_sg_field_name) if raw_val is None: # cannot resolve this! return None # now cast the value to a string str_value = sg_entity_to_string( self._tk, self._entity_type, sg_id, full_sg_field_name, raw_val ) # see if we need to transform it via regex if field_def["regex_obj"]: str_value = self._process_regex(str_value, field_def["regex_obj"]) str_data[token] = str_value # Now str_data looks something like # {'code': 'hello', 'code:^(.)': 'h'}. # # Replace tokens in the string with actual values: resolved_expression = expression for token, value in str_data.items(): resolved_expression = resolved_expression.replace("{%s}" % token, value) # now validate the entire value! if not self._validate_name(resolved_expression): raise TankError( "The format string '%s' used in the configuration " "does not generate a valid folder name ('%s')! Valid " "values are %s." % ( expression, resolved_expression, constants.VALID_SG_ENTITY_NAME_EXPLANATION, ) ) return resolved_expression def _validate_name(self, name): """ Check that the name meets basic file system naming standards. :returns: True if valid, false otherwise """ exp = re.compile(constants.VALID_SG_ENTITY_NAME_REGEX, re.UNICODE) # split into sub-segments based on slash # and validate each one separately if name is None: return False # iterate over all tokens and validate for folder_subgroup in name.split("/"): if isinstance(folder_subgroup, six.text_type): u_name = folder_subgroup else: # try decoding from utf-8: u_name = folder_subgroup.decode("utf-8") if exp.match(u_name) is None: return False return True def _process_regex(self, value, regex_obj): """ Processes the given string value with the given regex. :param value: Value to process, either unicode or str. :param regex_obj: Regex object. :return: Processed value, same type as value input parameter. If input is None, an empty string is returned. """ if value is None: return "" # perform the regex calculation in unicode space if not isinstance(value, six.text_type): input_is_utf8 = True value_to_convert = value.decode("utf-8") else: input_is_utf8 = False value_to_convert = value # now perform extraction match = regex_obj.match(value_to_convert) if match is None: # no match. return empty string resolved_value = u"" else: # we have a match object. concatenate the groups resolved_value = "".join(match.groups()) # resolved value is now unicode. Convert it # so that it is consistent with input if isinstance(resolved_value, six.text_type) and input_is_utf8: # input was utf-8, regex result is unicode, cast it back return resolved_value.encode("utf-8") else: return resolved_value