Our CP codebase now contains several files and we are adding more. This
PR refactors the code to consolidate the files into a context_parallel
folder but keep the import so that the existing users of CP won't be
affected.
Unfortunately, we have to split this PR into two PRs as the PyTorch
infra cannot accept a PR with 3000+ LoC change and git cannot recognize
that _context_parallel/_attention.py is moved from _attention.py because
we want to keep BC.
This is the second PR.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/166501
Approved by: https://github.com/Skylion007
ghstack dependencies: #166456
Our CP codebase now contains several files and we are adding more. This PR refactors the code to consolidate the files into a context_parallel folder but keep the import so that the existing users of CP won't be affected.
Unfortunately, we have to split this PR into two PRs as the PyTorch infra cannot accept a PR with 3000+ LoC change and git cannot recognize that _context_parallel/_attention.py is moved from _attention.py because we want to keep BC.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/166456
Approved by: https://github.com/Skylion007
Summary:
- in torchft we have multiple default pg's, 1 for each task group
- for flight recorder to work, each of these need to have a different name, so entries can be matched
- change the `init_process_group` api to optionally take a list of ranks. if provided, we use the hash of the ranks as the name of the pg. for torchft, we'll pass global ranks here so the default pg have a different name on each task group
Pull Request resolved: https://github.com/pytorch/pytorch/pull/166182
Approved by: https://github.com/fduwjj
# Context
Previously, we would modify the parent process's NUMA bindings in order to force child process to inherit them.
However, this would not work correctly if `start_method="forkserver"`, because the subprocesses would actually inherit their bindings from the forkserver middleman process. In this case, the inherited affinity would actually be incorrect for all but the first subprocess (because the forkserver process would get created lazily, and hence inherit and then stick with the bindings intended for the first subprocess).
# This PR
* `str` entrypoints: Use `numactl` CLI
* `Callable` entrypoints: Wrap the `Callable` entrypoint and call `os.sched_setaffinity` inside it.
Hopefully this will be the last necessary iteration.
# Test Plan
## Automated
`$ pytest test/test_numa_binding.py`
## Manual
Verified flops/sec and memory locality wins on several different types of jobs
* `Callable` with forkserver
* `str` entrypoint with spawn
* `Callable` entrypoint with spawn
More details in [this doc (Meta-only).](https://docs.google.com/document/d/1vxD-OKYBTT27jbBwtW9iz9g0tNM0u-i0tiTJg_ieQA8/edit?tab=t.scjv58yswi64)
# Later PR
Update all the documentation when we're confident this has stabilized.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/166026
Approved by: https://github.com/d4l3k
Co-authored-by: PyTorch MergeBot <pytorchmergebot@users.noreply.github.com>
**Summary:** When operations are done on partial placements, we use sharding logic to incorrectly determine whether we should redistribute the tensor to replicate. By delaying the redistribution, we do the operation first, and then the partial reduction. This leads to incorrect results for max, min, gradient norm clipping, and more. We solve this by setting reduction_linear to False when there is a Partial placement to force the redistribution before completing the op.
**Test Cases**
1. pytest test/distributed/tensor/test_math_ops.py -k test_partial_reduction_ops
2. pytest test/distributed/tensor/test_math_ops.py -k test_matching_partial_reduction_ops
Pull Request resolved: https://github.com/pytorch/pytorch/pull/165962
Approved by: https://github.com/wconstab
Summary:
Since we are already share a flattened tensor `_rank_map` across all meshes from a same root mesh, we can just use a flattened list of it to replace the comparison of root_mesh and flattened_mesh_list (because with same _rank_map and layout, the mesh tensor is guaranteed to be the same). This way we can also give back the CPU overhead added in https://github.com/pytorch/pytorch/pull/164510 and further simply the code.
We do have a more ambitious universe-based change here: https://github.com/pytorch/pytorch/pull/165680 but it needs more discussions and would lead to BC breaking. We might eventually merge that PR but probably not now and this is a change which is not BC breaking and will help concatenate and 2D integration with concatenate.
cc H-Huang awgu wanchaol fegin wz337 wconstab d4l3k pragupta msaroufim dcci
imported-using-ghimport
Test Plan: Imported from OSS
Differential Revision: D85526705
Pulled By: fduwjj
Pull Request resolved: https://github.com/pytorch/pytorch/pull/166264
Approved by: https://github.com/XilunWu
Redistribute test exercise extensively various sharding schemes and
redistribution between them. These tests uncovered more edge cases
that were not supported by the local tensor primarily different flavors
of uneven sharding. In order to handle these cases this change implements
missing functional collectives and adds support for uneven sharding
case where sharding group (ranks) is larger than the size of the dimension
being sharded. In the latter case the "missing" shards are represented
by zero sized tensors so that the rest of the local tensor machinery
can stay oblivious to this special case.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/166081
Approved by: https://github.com/ezyang
Since we are already share a flattened tensor `_rank_map` across all meshes from a same root mesh, we can just use a flattened list of it to replace the comparison of root_mesh and flattened_mesh_list (because with same _rank_map and layout, the mesh tensor is guaranteed to be the same). This way we can also give back the CPU overhead added in https://github.com/pytorch/pytorch/pull/164510 and further simply the code.
We do have a more ambitious universe-based change here: https://github.com/pytorch/pytorch/pull/165680 but it needs more discussions and would lead to BC breaking. We might eventually merge that PR but probably not now and this is a change which is not BC breaking and will help concatenate and 2D integration with concatenate.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/166003
Approved by: https://github.com/Skylion007, https://github.com/fegin
Summary:
Part of an effort to extract some important error logs (e.g. [#157996](https://github.com/pytorch/pytorch/pull/157996)) that was `tee`'ed to `stdout` and `stderr`.
The general idea is to:
- Duplicate the `tee`s on `stdout` and `stderr` to a separate file, `filtered_stdout.log` and `filtered_stderr.log`, respectively.
- In these files, as its name suggests, only log lines matching a customizable filter.
- Later on in another PR, append the contents of these files to the reply file.
Outline of changes in this PR:
- Enhance `TailLog` to be able to 1) stream to a file, and 2) only write when the line matches the passed filter.
- Add `filtered_stdout` and `filtered_stderr` to `LogsDest` and have `LogsSpecs` `reify` them.
- In `start_processes()` and `PContext`, add params `duplicate_stdout_filters` and `duplicate_stderr_filters` to filter and write the duplicated stream to the files above. When no filters are passed in, no duplicated streams are created.
Test Plan:
```
$ buck test 'fbcode//mode/opt' caffe2/test/distributed/elastic/multiprocessing:api_test
```
```
Buck UI: https://www.internalfb.com/buck2/f5c6b7da-217d-4a0b-872a-c7cd3d05587f
Test UI: https://www.internalfb.com/intern/testinfra/testrun/4222124951617688
Network: Up: 398B Down: 44MiB (reSessionID-a489a961-b602-45be-b851-3490ebb7a26a)
Analyzing targets. Remaining 0/200
Executing actions. Remaining 0/12856 0.1s exec time total
Command: test. Finished 1 local
Time elapsed: 17:37.9s
Tests finished: Pass 52. Fail 0. Fatal 0. Skip 0. Build failure 0
```
```
$ buck test 'fbcode//mode/opt' caffe2/test/distributed/elastic/multiprocessing:tail_log_test
```
```
Buck UI: https://www.internalfb.com/buck2/d6d5c1c1-db98-4d9c-b608-7ba6fbb5e3ee
Test UI: https://www.internalfb.com/intern/testinfra/testrun/13510798985149262
Network: Up: 94KiB Down: 417MiB (reSessionID-27b46fba-d31c-4c04-8ede-a506454e6922)
Analyzing targets. Remaining 0/3 536 actions, 555 artifacts declared
Executing actions. Remaining 0/186 1:05.5s exec time total
Command: test. Finished 7 local, 1 remote, 115 cache (93% hit) 37.0s exec time cached (56%)
Time elapsed: 1:11.5s
Tests finished: Pass 7. Fail 0. Fatal 0. Skip 0. Build failure 0
```
Rollback Plan:
Differential Revision: D80188995
Pull Request resolved: https://github.com/pytorch/pytorch/pull/160712
Approved by: https://github.com/fduwjj
Summary:
This PR contains three changes -
1. We are losing non-blocking flag value and defaulting to False during the deep_copy. This is introducing a cuda synchronize after each tensor. This is slowing the staging.
2. Adding the capability to skip pinning for scalar tensors to reduce initial staging buffer creation cost. Setting it by default to 65 to avoid pinning small tensors.
3. Tensor share storage but each storage needs to be processed only once in the deep_copy with offloading logic. so, use the memoization table to cache storage ids.
Test Plan:
1. Verified non-blocking copies via kineto profile.
2. ran A/B jobs old and new staging with fixes such that it crashes after ever 2 checkpoints and restarts for several hours and compared loss curves and they are exactly identical.
3. tests
Differential Revision: D85180484
Pull Request resolved: https://github.com/pytorch/pytorch/pull/166025
Approved by: https://github.com/pradeepfn