import logging import os import tempfile from contextlib import nullcontext from typing import Any, Dict import torch log = logging.getLogger(__name__) # this arbitrary-looking assortment of functionality is provided here # to have a central place for overrideable behavior. The motivating # use is the FB build environment, where this source file is replaced # by an equivalent. if torch._running_with_deploy(): # __file__ is meaningless in the context of frozen torch used in torch deploy. # setting empty torch_parent should allow below functions to operate without crashing, # but it's unclear if there is a valid use case for them in the context of deploy. torch_parent = "" else: if os.path.basename(os.path.dirname(__file__)) == "shared": torch_parent = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) else: torch_parent = os.path.dirname(os.path.dirname(__file__)) def get_file_path(*path_components: str) -> str: return os.path.join(torch_parent, *path_components) def get_file_path_2(*path_components: str) -> str: return os.path.join(*path_components) def get_writable_path(path: str) -> str: if os.access(path, os.W_OK): return path return tempfile.mkdtemp(suffix=os.path.basename(path)) def prepare_multiprocessing_environment(path: str) -> None: pass def resolve_library_path(path: str) -> str: return os.path.realpath(path) # Meta only, see # https://www.internalfb.com/intern/wiki/ML_Workflow_Observability/User_Guides/Adding_instrumentation_to_your_code/ # # This will cause an event to get logged to Scuba via the signposts API. You # can view samples on the API at https://fburl.com/scuba/workflow_signpost/zh9wmpqs # we log to subsystem "torch", and the category and name you provide here. # Each of the arguments translate into a Scuba column. We're still figuring # out local conventions in PyTorch, but category should be something like # "dynamo" or "inductor", and name should be a specific string describing what # kind of event happened. # # Killswitch is at # https://www.internalfb.com/intern/justknobs/?name=pytorch%2Fsignpost#event def signpost_event(category: str, name: str, parameters: Dict[str, Any]): log.info("%s %s: %r", category, name, parameters) def log_compilation_event(metrics): log.info("%s", metrics) def _functionalize_sync(t): # This code lives in python instead of C++ since conditioning on a certain python subclass # is much more of a pain in C++. from torch._subclasses.functional_tensor import ( FunctionalTensor, maybe_disable_functional_mode, ) ctx = ( maybe_disable_functional_mode if isinstance(t, FunctionalTensor) else nullcontext ) if isinstance(t, FunctionalTensor): # If a FunctionalTensorMode is active while syncing, we don't want it to intercept any ops that get called # when we sync our inner tensor. # Why? # (1) If there are input mutations in the graph, then they will be re-applied during # AOTAutograd when we call _sync() from inside of our functionalization kernels. # (2) _sync() causes us to regenerate our updated the tensor from the updated base, # which dispatches to a bunch of view ops # (3) The input to these view ops is our inner FunctionalTensorWrapper # (since the sync was called from C++), not the python FunctionalTensor # (4) if a python FunctionalTensorMode is active, it will complain when it intercepts # the view op, since it will see an input that is a C++ FunctionalTensorWrapper # (aka a normal torch.Tensor) instead of a python `FunctionalTensor). maybe_functional_mode = torch._C._unset_dispatch_mode( torch._C._TorchDispatchModeKey.FUNCTIONAL ) try: torch._functionalize_sync(t.elem) finally: if maybe_functional_mode is not None: torch._C._set_dispatch_mode(maybe_functional_mode) else: torch._functionalize_sync(t) TEST_MASTER_ADDR = "127.0.0.1" TEST_MASTER_PORT = 29500 # USE_GLOBAL_DEPS controls whether __init__.py tries to load # libtorch_global_deps, see Note [Global dependencies] USE_GLOBAL_DEPS = True # USE_RTLD_GLOBAL_WITH_LIBTORCH controls whether __init__.py tries to load # _C.so with RTLD_GLOBAL during the call to dlopen. USE_RTLD_GLOBAL_WITH_LIBTORCH = False