pytorch/torch/distributed/elastic/control_plane.py
Tristan Rice 3d541835d5 distributed debug handlers (#126601)
This adds debug handlers as described in:
* https://gist.github.com/d4l3k/828b7be585c7615e85b2c448b308d925 (public copy)
* https://docs.google.com/document/d/1la68szcS6wUYElUUX-P6zXgkPA8lnfzpagMTPys3aQ8/edit (internal copy)

This is only adding the C++ pieces that will be used from the main process. The Python and torchrun pieces will be added in a follow up PR.

This adds 2 handlers out of the box:

* `/handler/ping` for testing purposes
* `/handler/dump_nccl_trace_pickle` as a POC integration with Flight Recorder

Test plan:

```
python test/distributed/elastic/test_control_plane.py
```

Pull Request resolved: https://github.com/pytorch/pytorch/pull/126601
Approved by: https://github.com/kurman, https://github.com/c-p-i-o
2024-05-30 02:21:08 +00:00

52 lines
1.1 KiB
Python

import os
from contextlib import contextmanager, ExitStack
from typing import Generator
from torch.distributed.elastic.multiprocessing.errors import record
__all__ = [
"worker_main",
]
TORCH_WORKER_SERVER_SOCKET = "TORCH_WORKER_SERVER_SOCKET"
@contextmanager
def _worker_server(socket_path: str) -> Generator[None, None, None]:
from torch._C._distributed_c10d import _WorkerServer
server = _WorkerServer(socket_path)
try:
yield
finally:
server.shutdown()
@contextmanager
@record
def worker_main() -> Generator[None, None, None]:
"""
This is a context manager that wraps your main entry function. This combines
the existing ``errors.record`` logic as well as a new ``_WorkerServer`` that
exposes handlers via a unix socket specified by
``Torch_WORKER_SERVER_SOCKET``.
Example
::
@worker_main()
def main():
pass
if __name__=="__main__":
main()
"""
with ExitStack() as stack:
socket_path = os.environ.get(TORCH_WORKER_SERVER_SOCKET)
if socket_path is not None:
stack.enter_context(_worker_server(socket_path))
yield