mirror of
https://github.com/zebrajr/pytorch.git
synced 2025-12-07 12:21:27 +01:00
b45cf9b81b
28 Commits
| Author | SHA1 | Message | Date | |
|---|---|---|---|---|
|
|
b45cf9b81b |
Revert D30117838: [WIP] Gate DistributedOptimizers on RPC availability
Test Plan: revert-hammer
Differential Revision:
D30117838 (
|
||
|
|
3f09485d7e |
[WIP] Gate DistributedOptimizers on RPC availability (#62774)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/62774 Gates DistributedOptimizer which relies on RRef based on if RPC is available. This should enable ZeRo to work with Windows as Windows should not try to import the DIstributedOptimizer. If this works as expected we can enable the windows tests for functional/local sgd optimizers as well. ghstack-source-id: 135216642 Test Plan: CI Reviewed By: pbelevich Differential Revision: D30117838 fbshipit-source-id: e6365a910a3d1ca40d95fa6777a7019c561957db |
||
|
|
62a90c227f |
Make _Join, _Joinable, _JoinHook public (#62605)
Summary: **Overview:** This removes the preceding `_` from `_Join`, `_Joinable`, and `_JoinHook` in preparation for adding the generic join context manager tutorial (see [here](https://github.com/pytorch/tutorials/pull/1610)). This also adds a docs page, which can be linked from the tutorial. [Here](https://github.com/pytorch/pytorch/files/6919475/render.pdf) is a render of the docs page. Pull Request resolved: https://github.com/pytorch/pytorch/pull/62605 Test Plan: `DistributedDataParallel.join()`: ``` touch /tmp/barrier && TEMP_DIR="/tmp" BACKEND="nccl" WORLD_SIZE="2" gpurun python test/distributed/test_distributed_fork.py -- TestDistBackendWithFork.test_ddp_uneven_inputs TestDistBackendWithFork.test_ddp_uneven_inputs_stop_iteration_sync_bn TestDistBackendWithFork.test_ddp_grad_div_uneven_inputs TestDistBackendWithFork.test_ddp_uneven_input_join_disable TestDistBackendWithFork.test_ddp_uneven_input_exception ``` `ZeroRedundancyOptimizer`: ``` gpurun4 python test/distributed/optim/test_zero_redundancy_optimizer.py ``` NOTE: DDP overlap tests are failing due to a landing race. See https://github.com/pytorch/pytorch/pull/62592. Once the fix is landed, I will rebase, and tests should be passing. `Join`: ``` gpurun4 python test/distributed/algorithms/test_join.py ``` Reviewed By: mrshenli Differential Revision: D30055544 Pulled By: andwgu fbshipit-source-id: a5ce1f1d9f1904de3bdd4edd0b31b0a612d87026 |
||
|
|
43327cc197 |
Refactor commonalities between two approaches (#62624)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/62624 Test Plan: Imported from OSS Reviewed By: mrshenli Differential Revision: D30058543 Pulled By: andwgu fbshipit-source-id: 73c794062b75e011868fae264f592549eed67482 |
||
|
|
e6a3967c2a |
Add invariant check (bucket indices: 0, 1, ..., k-1) (#62623)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/62623 Test Plan: Imported from OSS Reviewed By: mrshenli Differential Revision: D30058544 Pulled By: andwgu fbshipit-source-id: a56910f294c6a40118751eebe255b62700f42be9 |
||
|
|
51f687fd4b |
Add overlap with DDP to ZeRO (two approaches) (#62157)
Summary: **Overview:** This adds two approaches to overlapping `DistributedDataParallel.backward()` with `ZeroRedundancyOptimizer.step()` by providing two hook constructors: `hook_with_zero_step()` and `hook_with_zero_step_interleaved()`. The former waits for all backward computation to finish before starting optimizer computation, while the latter launches a partial optimizer computation using the contents of a gradient bucket once that bucket's all-reduce completes. The two approaches each suffer from their own weaknesses, and which one to use depends on the specific hardware configuration. Both approaches can share changes to `ZeroRedundancyOptimizer`. A user should pass `overlap_with_ddp=True` to `ZeroRedundancyOptimizer`, construct a DDP communication hook using either `hook_with_zero_step()` or `hook_with_zero_step_interleaved()`, and register that communication hook. `ZeroRedundancyOptimizer.step()` should still be called in the training loop, though the optimizer computation and communication will be offloaded to originate from the communication hook. Currently, the first two iterations are vacuous, meaning they do not result in parameter updates and the inputs are ignored. This is required to finalize the DDP bucket strategy and to then initialize the `ZeroRedundancyOptimizer`'s local optimizer based on that bucketing. Pull Request resolved: https://github.com/pytorch/pytorch/pull/62157 Test Plan: The existing `ZeroRedundancyOptimizer` tests pass, and new unit tests for both hooks pass: - ~~`test_ddp_with_zero_step_parity_cpu`~~ (removed for now due to flakiness in CI -- under investigation, could possibly be similar Gloo issue as with `hook_with_zero_step_interleaved()`) - `test_ddp_with_zero_step_parity_gpu` - `test_ddp_with_zero_step_interleaved_parity_gpu` These were tested on the AI AWS cluster. An analogous `test_ddp_with_zero_step_interleaved_parity_cpu` is missing due to existing bugs with Gloo. See https://github.com/pytorch/pytorch/pull/62302. Both approaches have been verified using an internal accuracy benchmark. Reviewed By: mrshenli Differential Revision: D29971046 Pulled By: andwgu fbshipit-source-id: a7234c23c7ea253f144a698fd7e3c0fe039de5e8 |
||
|
|
3e3acf8a9a |
Minor documentation fixes (#61785)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/61785 Test Plan: Imported from OSS Reviewed By: mrshenli Differential Revision: D29746648 Pulled By: andwgu fbshipit-source-id: 435bbd8894f2ae5c814b9acd562673affea1daf6 |
||
|
|
a50a389ca6 |
Revert D29701479: [pytorch][PR] Remove _broadcast_object() from ZeroRedundancyOptimizer
Test Plan: revert-hammer
Differential Revision:
D29701479 (
|
||
|
|
9b5d9b4049 |
Remove _broadcast_object() from ZeroRedundancyOptimizer (#61539)
Summary: Revised version of https://github.com/pytorch/pytorch/issues/60573. **Overview:** This makes two changes: - It introduces a `map_location` argument to `broadcast_object_list()`. The argument specifies the device to load tensors contained in objects received from the broadcast. This change requires modifying the implementation of `_object_to_tensor()` and `_tensor_to_object()` to use `torch.save()` and torch.load()` respectively. - It removes all calls to `_broadcast_object()` in `ZeroRedundancyOptimizer` and the corresponding test file in favor of `broadcast_object_list()`. The default value of `map_location` is `None`, in which case `_object_to_tensor()` and hence `broadcast_object_list()` preserve their original behavior. Namely, contained tensors are loaded to their original device. In `consolidate_state_dict()`, I specify `map_location=torch.device("cpu")` instead of `self._default_device`. This slightly changes the behavior from before when using `_broadcast_object()`. The reason I do so is that it saves one GPU to CPU data transfer since the action immediately after receiving the broadcasted `local_state_dict` is to copy it to CPU. Explicitly, if `map_location=self._default_device`, then the data transfer path assuming NCCL backend is as follows: `source GPU --[before serialize]--> source CPU --[before broadcast]--> source GPU --[broadcast]--> destination GPU --[before deserialize]--> destination CPU --[deserialize]--> destination GPU --[copy]--> destination CPU` Hence, by setting `map_location=torch.device("cpu")` instead, the suffix becomes: `destination CPU --[deserialize]--> destination CPU --[copy]--> destination CPU` Pull Request resolved: https://github.com/pytorch/pytorch/pull/61539 Test Plan: I added a test `test_broadcast_object_list_map_location()` that checks for both `map_location` as CPU and GPU that (1) tensors contained in broadcasted objects are appropriately loaded onto the specified device and (2) that the contents of the tensors are correct. The existing `ZeroRedundancyOptimizer` tests pass. ``` gpurun4 python test/distributed/optim/test_zero_redundancy_optimizer.py ``` The existing `broadcast_object_list()` test passes: ``` touch /tmp/barrier && TEMP_DIR="/tmp" BACKEND="nccl" WORLD_SIZE="2" gpurun python test/distributed/test_distributed_fork.py -- TestDistBackendWithFork.test_broadcast_object_list ``` Reviewed By: zou3519 Differential Revision: D29701479 Pulled By: andwgu fbshipit-source-id: c8d5f9057b32e5e9f40e8edc5b2cc25fb21414a9 |
||
|
|
57feb35474 |
Refactor non-joined process computation (#61555)
Summary:
**Overview:**
This refactors the computation on non-joined processes relating to the join context manager. The concept was inspired by a comment from pritamdamania.
**Changes:**
This introduces a `_Joinable` abstract base class, which requires a `_join_hook()` method and `_join_device()` and `_join_process_group()` property methods. Any class that we want to be compatible with the generic join context manager should inherit from `_Joinable` and implement `_join_hook()`, `_join_device()`, and `_join_process_group()`. (The `device` and `process_group` information has been moved from `_JoinHook` to `_Joinable`.)
The generic join context manager now takes in a `List[_Joinable]` instead of `List[_JoinHook]`. The motivation for this is that previously, by passing the `_JoinHook`s into the context manager, the class providing a `_JoinHook` can modify the context manager's behavior, but the context manager cannot modify the class's behavior. This is solved by giving the context manager a reference to the class's instance.
This implementation reserves the field `_join_config` in every `_Joinable` to store a `_JoinConfig` instance, which holds all dynamic fields needed from the `_Joinable` for the join context manager: `enable`, `throw_on_early_termination`, and `is_first_joinable`. ("dynamic" here means that for a given `_Joinable` instance, the values for those fields may change across different join context usages.) In particular, these fields are needed to implement a method `notify_join_context()`, which encapsulates the computation performed on non-joined processes relating to the join context manager --- (1) the all-reduce to indicate that the process has not yet joined and (2) the all-reduce to check whether to throw an exception if `throw_on_uneven_inputs=True`. The idea is that every `_Joinable` class only needs to make a call to `notify_join_context()` before its per-iteration collective communications; it is a simple one-line addition.
Only the first `_Joinable` instance passed into the context manager actually performs the collective communications in `notify_join_context()`. In that case, the method returns an async work handle for the initial all-reduce indicating that the process not yet joined. Otherwise, the method returns `None`. This conditional logic is handled internally without additional input from the user.
**New API:**
Now, the example usage would look like:
```
ddp_model = DistributedDataParallel(...)
zero_optim = ZeroRedundancyOptimizer(ddp_model.parameters(), ...)
with _Join([ddp_model, zero_optim]):
...
```
Any arguments meant for a join hook (e.g. `divide_by_initial_world_size`) must be specified as keyword arguments. For example:
```
with _Join([ddp_model, zero_optim], divide_by_initial_world_size=False):
...
```
They will be forwarded to every `_join_hook()` function via `**kwargs`. This creates a clear separation between the variables needed by the context manager (`enable` and `throw_on_early_termination`) and those needed by the `_Joinable` class (e.g. `divide_by_initial_world_size`).
**Recap:**
After this change, the relevant information to use the generic join context manager looks like the following (omitting prefix `_` from names):
- Suppose we have a class `C` (e.g. `DistributedDataParallel`) that we want to be able to use the `Join` context.
- We make `C` inherit from `Joinable` and implement `join_hook() -> JoinHook`, `join_device()`, and `join_process_group()`.
- To implement `join_hook()`, we define a `CJoinHook` class inheriting from `JoinHook` and implement `main_hook()` and `post_hook()` as needed.
- We locate a place before `C`'s per-iteration collective communications and add a call to `Join.notify_join_context()`.
- We call `Joinable.__init__(self)` in `C`'s constructor.
- The `C.join_config` field will be used internally by the context manager. This does not affect `C`'s serializability.
- Run time arguments for `C`'s join hook can be passed in as keyword arguments to the context manager: `with Join([C()], arg1=..., arg2=...):`.
Pull Request resolved: https://github.com/pytorch/pytorch/pull/61555
Test Plan:
I ran the existing DDP join tests:
```
touch /tmp/barrier && TEMP_DIR="/tmp" BACKEND="nccl" WORLD_SIZE="2" gpurun python test/distributed/test_distributed_fork.py -- TestDistBackendWithFork.test_ddp_uneven_inputs TestDistBackendWithFork.test_ddp_uneven_inputs_stop_iteration_sync_bn TestDistBackendWithFork.test_ddp_grad_div_uneven_inputs TestDistBackendWithFork.test_ddp_uneven_input_join_disable TestDistBackendWithFork.test_ddp_uneven_input_exception
```
I ran the ZeRO join tests:
```
gpurun4 python test/distributed/optim/test_zero_redundancy_optimizer.py TestZeroRedundancyOptimizerDistributed.test_zero_join_gpu TestZeroRedundancyOptimizerDistributed.test_zero_join_cpu
```
Reviewed By: zou3519
Differential Revision: D29690359
Pulled By: andwgu
fbshipit-source-id: 2950f78de755eb5fb13b95b803dd7c705879a9c7
|
||
|
|
4f4beb8286 |
Add Model Parallel Support to ZeRO (#61370)
Summary: **Overview:** The existing `ZeroRedundancyOptimizer` implementation assumes that all model parameters are stored on the same device (due to the recent [refactor](https://github.com/pytorch/pytorch/pull/59834)). This change allows model parameters to be sharded across multiple devices, as in the DDP with Model Parallelism example [here](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html). The only logic affected is the bucketing strategy used when `parameters_as_bucket_view=True`. Let `n` denote the world size and `k` denote the number of devices per process. - Previously, `k = 1`, and `self._buckets` was a `List[torch.Tensor]`, where `self._buckets[j]` is a tensor (i.e. bucket) containing the parameters assigned to rank `j` for `j = 0, ..., n - 1`. - Now, `self._buckets` is a `List[List[torch.Tensor]]`, where `self._buckets[i][j]` is a tensor containing the parameters stored on device `i` assigned to rank `j` for `i = 0, ..., k - 1` and `j = 0, ..., n - 1`. This bucket construction uses an auxiliary data structure `self._device_to_per_rank_params`, which is a `Dict[torch.device, List[List[torch.Tensor]]]`. It maps: - `dev_0` to `[rank 0's assigned parameters on dev_0, rank 1's assigned parameters on dev_1, ...]`, - `...` - `dev_{k-1}` to `[rank 0's assigned parameters on dev_{k-1}, rank 1's assigned parameters on dev_{k-1}, ...]` I removed the invariant checker `_verify_same_param_device()` and its corresponding test since it is no longer an invariant. Pull Request resolved: https://github.com/pytorch/pytorch/pull/61370 Test Plan: I added a new test `test_zero_model_parallel()` that checks for parity between a DDP model with model parallelism using `ZeroRedundancyOptimizer` and a local model with the same architecture using a local optimizer. I also verified that the existing tests still pass. Reviewed By: soulitzer Differential Revision: D29637132 Pulled By: andwgu fbshipit-source-id: 07112959fa4e94a3f40e67e88cbb58ce3cd1e033 |
||
|
|
179249084b |
Refactor DDP join() API, adding hooks (#60757)
Summary: Targets https://github.com/pytorch/pytorch/issues/54318. **Overview:** DDP offers a `join()` context manager to accommodate training on uneven inputs. This creates a new generic `_Join()` API permitting custom hooks, refactors DDP `join()` to call this generic `_Join()`, and implements a hook for ZeRO. (For now, the generic `_Join()` is implemented as private, but this may change after design discussions are cleared.) There are two classes introduced: `_JoinHook`, the class defining the customizable join hook, and `_Join`, the generic join context manager. The `_JoinHook` provides two entry points: `main_hook()`, which is called repeatedly while there exists a non-joined process, and `post_hook()`, which is called once all process have joined with the additional `bool` argument `is_last_joiner`. The class also requires `process_group` and `device` information by defining corresponding abstract property methods. Thus, to implement a join hook, (1) inherit from `_JoinHook`, (2) override `main_hook()` and `post_hook()` as appropriate, and (3) override `process_group()` and `device()` to provide process group and device information to be used by the join context manager implementation for collective communications. The `_Join` constructor requires `join_hooks: List[_JoinHook]` and optionally `enable: bool = True` and `throw_on_early_termination: bool = False`. A training loop only needs to be wrapped with `with _Join(join_hooks):` (using the appropriate `join_hooks`) to be able to train on uneven inputs without hanging/erroring. The context manager requires a `dist.all_reduce(torch.ones(1))` to be called on every non-joined process each time before it performs its collective communications in order to indicate that the process has not yet joined. It also requires that all `process_group` attributes in the `_JoinHook` objects are the same. **Notes:** - The argument `is_last_joiner` to `post_hook()` may be useful for finding an authoritative rank when synchronizing. - `enable` is a flag that can be set to `False` if the user knows the current training loop will not have uneven inputs. This may be used to disable join-related computation in the classes providing join hooks. - `throw_on_early_termination` is a flag that can be set to `True` to notify processes to terminate upon detecting uneven inputs (i.e. upon the first process joining when there exists a non-joined process). Notably, the notification requires an all-reduce, so to prevent hanging/erroring, non-joined process must participate in the all-reduce. The first-joining process raises a `RuntimeError`, and the other processes are expected (but not required) to do the same. This may be used to implement training on uneven inputs in cases that do not conform to the generic join context manager (e.g. `SyncBatchNorm`). - Classes providing a join hook should do so via a `_join_hook()` method that returns a `_JoinHook` instance with the methods appropriately overridden. - If there are multiple join hooks, the device specified by the first is used by the join context manager implementation to perform its collective communications. - If there are multiple join hooks, both the main and post-hooks are iterated in the order in which the `_JoinHook` objects are passed into the context manager constructor. Pull Request resolved: https://github.com/pytorch/pytorch/pull/60757 Test Plan: The current implementation preserves backward compatibility by not changing the existing DDP `join()` API at all. To check this, I ran through the uneven input tests (`test_ddp_grad_div_uneven_inputs`, `test_ddp_uneven_inputs_stop_iteration_sync_bn`, `test_ddp_uneven_inputs`, `test_ddp_uneven_input_join_disable`, `test_ddp_uneven_input_exception`) on the AI AWS cluster: ``` touch /tmp/barrier && TEMP_DIR="/tmp" BACKEND="nccl" WORLD_SIZE="2" gpurun python test/distributed/test_distributed_fork.py -- ``` Because the existing DDP join logic does not provide correct gradients to the joined processes if `gradient_as_bucket_view=False` and a joined process requires those gradients to correctly update its shard of the parameters in `ZeroRedundancyOptimizer.step()`, DDP and ZeRO are not fully compatible at the moment. To work around this and to test ZeRO's join hook separately, I added a test `_test_zero_join()` (with `test_zero_join_gpu()` and `test_zero_join_cpu()` flavors), which compares DDP with a local optimizer on uneven inputs against ZeRO on uneven inputs with the gradients set manually. Reviewed By: iramazanli, mrshenli Differential Revision: D29624636 Pulled By: andwgu fbshipit-source-id: ec70a290e02518b0d8b683f9fed2126705b896c7 |
||
|
|
b770c4b61a |
Fix ZeRO sort to be by numel (#60556)
Summary: **Overview:** This is a follow-up to [this PR](https://github.com/pytorch/pytorch/pull/59586) and corrects the ZeRO partitioning algorithm to sort by the number of elements in the tensor rather than the size of the first dimension. As context, that PR was meant to migrate from using a _naive greedy_ algorithm to a _sorted-greedy_ algorithm when partitioning parameters in ZeRO. **Updated Results:** The updated table for the partitions can be found [here](https://github.com/pytorch/pytorch/pull/59410#issuecomment-865203219). There, I also considered a third algorithm (sometimes known as multifit), which is more computationally expensive than the greedy and sorted-greedy algorithms but cannot perform worse. However, because of its increased complexity and lack of improved results, I chose to settle with the simpler sorted-greedy algorithm. The `step()` latencies show slight improvements, but the improvements may be in the noise. The values below are in seconds and were generated using NCCL backend (unlike in the previous PR which used Gloo): Two processes: | Model | Max `optimizer.step()` Time - Greedy (Std.) | Max `optimizer.step()` Time - Sorted-Greedy (Std.) | | --- | --- | --- | | ResNet-50 | 0.047 (0.00142) | **0.044 (0.00025)** | | ResNet-152 | 0.057 (0.00034) | **0.054 (0.00022)** | | BERT | 0.021 (0.00008) | **0.020 (0.00008)** | Four processes: | Model | Max `optimizer.step()` Time - Greedy | Max `optimizer.step()` Time - Sorted-Greedy (Std.) | | --- | --- | --- | | ResNet-50 | 0.019 (0.00065) | **0.013 (0.00040)** | | ResNet-152 | 0.045 (0.00024) | 0.045 (0.00025) | | BERT | 0.019 (0.00022) | **0.018 (0.00016)** | Pull Request resolved: https://github.com/pytorch/pytorch/pull/60556 Test Plan: I verified that the ZeRO tests pass (via the AI AWS cluster): ``` srun -p $DEV_QUEUE --cpus-per-task=16 -t 5:00:00 --gpus-per-node=4 python test/distributed/optim/test_zero_redundancy_optimizer.py ``` Reviewed By: VitalyFedyunin Differential Revision: D29335260 Pulled By: andwgu fbshipit-source-id: 469d1c6e029b77c1b300a94cd1fd94b633cd28dd |
||
|
|
f0e4e4be72 |
Clean Up ZeRO (#60285)
Summary: **Overview:** Being relatively new to PyTorch and ZeRO, I found parts of the code slightly hard to follow. This change strives to clean up the `ZeroRedundancyOptimizer` code in `zero_redundancy_optimizer.py` by reorganizing some computations, making variable names more explicit and consistent, and unifying terminology in the documentation. The goal is for the code to be easier to extend afterwards. **Changes:** 1) `state_dict()`: The [logic]( |
||
|
|
c6cdb4f113 |
Refactor ZeroRedundancyOptimizer Assuming SPSD (#59834)
Summary:
**Overview:**
This refactors the `ZeroRedundancyOptimizer` implementation to assume single-process single-device (SPSD) instead of accommodating single-process multiple-device (SPMD). `DistributedDataParallel` [retired SPMD recently](https://github.com/pytorch/pytorch/issues/47012), so this change follows the same spirit.
**Changes:**
The parent-class `Optimizer` constructor permits the input argument `params` to be both an `iterable` of `torch.Tensor` and an `iterable` of `dict`. The latter usage is for initializing the optimizer with multiple `param_group`s to start. However, currently, `ZeroRedundancyOptimizer` only supports the former usage, requiring explicit calls to `add_param_group()` for multiple `param_group`s. Given the existing implementation, the type error would be silent and not manifest until much later (e.g. since `super().__init__()` would have no issue). Hence, I added a series of checks to begin the `__init__()` function (encapsulated in `_verify_and_init_params()`). A postcondition of this validation is that `self._all_params` is a non-empty list of all model parameters.
Additionally, I added a check for SPSD usage assuming that all model parameters exist on the same device. This logic is included in `_verify_same_param_device()` and is called immediately after the `params` type-checking. Support for SPSD with model parameters sharded across devices may be added in the future.
Related to that aforementioned post-condition on `self._all_params`, previously there was undefined behavior resulting from different typing of the passed in `params` input argument. If `params` was a `List`, then the usage of `self._reference_is_trainable_mask` was as expected. However, if `params` was a generator (e.g. as in the canonical usage of passing `model.parameters()`), then the ensuing behavior was divergent. This is because after a generator is iterated over, it is empty. As a result, when we set `self._all_params = params` [in the old code](
|
||
|
|
a4e0368c99 |
Comment on tests reliance on ZeRO's partitioning algo (#59713)
Summary:
Addresses https://github.com/pytorch/pytorch/issues/59548
**Overview:**
Recently, we changed ZeRO's partitioning algorithm to first sort the parameters by decreasing size and then greedily allocate to shards. See [here](
|
||
|
|
ea1de87f4b |
Sort params by size (decreasing)
Summary: Pull Request: https://github.com/pytorch/pytorch/pull/59586 Task: https://www.internalfb.com/tasks/?t=90847711 **Overview:** Suppose we have `n` items with positive integer sizes and `k` buckets. We want to assign items to buckets with the goal of uniformity. The precise criteria for uniformity can vary: e.g. minimize the maximum size, maximize the minimum size, etc. This is known as [multiway number partitioning](https://en.wikipedia.org/wiki/Multiway_number_partitioning). ZeRO's partitioning task reduces to solving this problem. In particular, this is the subproblem to be solved for each `param_group` in `self.param_groups`, where the parameters are the items and the ranks give the buckets. The existing implementation uses the linear-time [greedy number partitioning algorithm](https://en.wikipedia.org/wiki/Greedy_number_partitioning#Linear-time_algorithm), which assigns the next tensor-parameter to the process with the smallest total parameter size so far. In this task, I explore the [extension](https://en.wikipedia.org/wiki/Greedy_number_partitioning#Improved_algorithm) where each parameter group is sorted by decreasing size before applying the greedy algorithm, requiring linearithmic time (as dominated by the sort). **Experiments** The mean number of parameters represents a perfectly uniform allocation and hence the ideal allocation (which may be even better than the optimal partition). In the following tables, I present the maximum number of parameters for any one process and the difference from the mean in parentheses for ResNet-50, ResNet-152, and BERT (the bare BERT model). The best-performing partitioning strategy for each model is bolded. Two processes: | Model | Max Num Params - Greedy (Diff) | Max Num Params - Greedy-Sorted (Diff) | Mean Num Params | | --- | --- | --- | --- | | ResNet-50 | 13,249,600 (471,084) | **12,794,816 (16,300)** | 12,778,516 | | ResNet-152 | 30,567,488 (471,084) | **30,111,424 (15,020)** | 30,096,404 | | BERT | **54,749,184 (8,064)** | 55,327,488 (586,368) | 54,741,120 | Four processes: | Model | Max Num Params - Greedy (Diff) | Max Num Params - Greedy-Sorted (Diff) | Mean Num Params | | --- | --- | --- | --- | | ResNet-50 | 7,524,864 (1,135,606) | **6,436,864 (47,606)** | 6,389,258 | | ResNet-152 | 16,232,192 (1,183,990) | **15,090,152 (41,950)** | 15,048,202 | | BERT | **28,151,040 (780,480)** | 28,352,256 (981,696) | 27,370,560 | --- I also investigated the latency of `optimizer.step()` for the different partitioning algorithms. I measured the latency for 30 iterations and took the mean latency per process (excluding the first iteration due to cache coldness). In the following tables, I present the maximum of those mean latencies over all processes and the standard deviation of the latencies contributing to that maximum. Again, the best-performing partitioning strategy for each model is bolded. All entries are presented in seconds and used `gloo` backend. Two processes: | Model | Max `optimizer.step()` Time - Greedy (Std.) | Max `optimizer.step()` Time - Greedy-Sorted (Std.) | | --- | --- | --- | | ResNet-50 | **0.060 (0.002)** | 0.061 (0.002) | | ResNet-152 | 0.166 (0.003) | **0.160 (0.004)** | | BERT | 0.220 (0.009) | **0.199 (0.006)** | Four processes: | Model | Max `optimizer.step()` Time - Greedy | Max `optimizer.step()` Time - Greedy-Sorted | | --- | --- | --- | | ResNet-50 | 0.094 (0.004) | **0.093 (0.004)** | | ResNet-152 | **0.228 (0.011)** | 0.231 (0.009) | | BERT | **0.328 (0.015)** | 0.329 (0.021) | Based on the standard deviations, the differences in the latency measurements across the different algorithms appear to be within the uncertainty in the measurement itself. Hence, it is difficult to argue that one algorithm is clearly the fastest. --- `zero.py` is my experiment script, and I use the AI AWS cluster. The run command looks like: ``` srun -p $DEV_QUEUE --cpus-per-task=16 -t 5:00:00 --gpus-per-node=4 python zero.py -b nccl greedy 2 4 ``` This runs the experiment script on an instance with 4 GPUs using `nccl` backend, outputting to a directory named `greedy/`, and using world sizes of 2 and 4. An analogous command can be used after modifying `partition_parameters()`, e.g. replacing `greedy` with `greedy_sorted` as the output directory name. Then, to run the analysis script: ``` python analyze.py greedy greedy_sorted ``` For more details on the experiment code, refer to: https://www.internalfb.com/diff/D28946756 **Notes:** There exists an optimal solution to this partitioning problem. An algorithm that finds such a solution is the [complete greedy algorithm (CGA)](https://en.wikipedia.org/wiki/Greedy_number_partitioning#An_exact_algorithm), which reduces to the brute-force combinatorial search in the worst case. There exist heuristics to improve the `k = 2` case (i.e. when there are two processes); however, given that `n` in typical use cases is very large, any algorithm that is quadratic or slower is unrealistic. Other exact algorithms are similarly exponential in the worst case, rendering them intractable. Given this, I do not currently see a need for future proofing the partitioning algorithm against the introduction of algorithms beyond the naive greedy and the sorted greedy algorithms. --- In the current ZeRO implementation, the core `partition_parameters()` computation happens twice upon initialization (i.e. call to `__init__()`): first from a call to `_param_to_rank()` (i.e. an access to `_param_to_rank`) and then from a call to `_update_trainable()`. `_update_trainable()` sees that no optimizer has been constructed yet, so it clears the cache, eliminating the first `partition_parameters()` computation and performing a redundant re-computation. Here is a typical trace: - [The ZeRO optimizer object is initialized, calling `__init__()`.]( |
||
|
|
75024e228c |
Add lint for unqualified type: ignore (#56290)
Summary: The other half of https://github.com/pytorch/pytorch/issues/56272. Pull Request resolved: https://github.com/pytorch/pytorch/pull/56290 Test Plan: CI should pass on the tip of this PR, and we know that the lint works because the following CI runs (before this PR was finished) failed: - https://github.com/pytorch/pytorch/runs/2384511062 - https://github.com/pytorch/pytorch/actions/runs/765036024 Reviewed By: seemethere Differential Revision: D27867219 Pulled By: samestep fbshipit-source-id: e648f07b6822867e70833e23ddafe7fb7eaca235 |
||
|
|
29034b9487 |
[Reland] Update and expose ZeroRedundancyOptimizer docs (#53112)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/53112 Test Plan: Imported from OSS Reviewed By: blefaudeux Differential Revision: D26752289 Pulled By: mrshenli fbshipit-source-id: 897257417b530e6e18788cb40c44e5cb7ac688d5 |
||
|
|
931100f829 |
Revert D26696938: Update and expose ZeroRedundancyOptimizer docs
Test Plan: revert-hammer
Differential Revision:
D26696938 (
|
||
|
|
a586c02962 |
Update and expose ZeroRedundancyOptimizer docs (#52937)
Summary: Pull Request resolved: https://github.com/pytorch/pytorch/pull/52937 Test Plan: Imported from OSS Reviewed By: blefaudeux Differential Revision: D26696938 Pulled By: mrshenli fbshipit-source-id: dafb00e5c9f0c0c602f471fdcb6416bde74f806b |
||
|
|
812339ca3d |
[ZeroRedundancyOptimizer] Buckets as tensor view + minimize public interface (#52987)
Summary: Updated version following https://github.com/pytorch/pytorch/issues/52764 (including comments from Shen), but this one I expect to be able to land. ZeroRedundancyOptimizer: - bucket as tensor views, optional - make a lot of attributes private - minor unit test refactor - adding coverage in the unit test for with and without bucket views Pull Request resolved: https://github.com/pytorch/pytorch/pull/52987 Reviewed By: mrshenli Differential Revision: D26728851 Pulled By: blefaudeux fbshipit-source-id: f8c745966719c9076c20a554ef56198fb838856c |
||
|
|
249c213462 |
[ZeroRedundancyOptimizer] Pytorch compliant state (#52960)
Summary: Same as https://github.com/pytorch/pytorch/issues/52760 which I could not get to land. I just could not live with ghstack/ghimport/randomly broken things, I break enough of them myself, so this is a fresh copy without ghstack shenanigans. I'm hopeful that this can land relatively bug free, and am sorry for the duplications.. What this does: - call the common_utils test runner instead of unittest, because it seems that it's how it should be done - change the returned state from ZeroRedundancyOptimizer to be PyTorch compliant, which has the added benefit of being elastic (world size independent) Pull Request resolved: https://github.com/pytorch/pytorch/pull/52960 Reviewed By: mrshenli Differential Revision: D26710932 Pulled By: blefaudeux fbshipit-source-id: 1d914bc9221442ba1bb2b48f5df10c313e674ece |
||
|
|
7ae7768617 |
[ZeroRedundancyOptimizer] Remove pseudo futures handling, not needed (#52698)
Summary: This was mostly needed for ShardedDDP, not used here, dead code removal Pull Request resolved: https://github.com/pytorch/pytorch/pull/52698 Reviewed By: mrshenli Differential Revision: D26617893 Pulled By: blefaudeux fbshipit-source-id: 9bcfca5135bf332ebc1240300978c138d2041146 |
||
|
|
58eb23378f |
Clean up usage of torch._six partially (#49785)
Summary: See https://github.com/pytorch/pytorch/issues/42919 Pull Request resolved: https://github.com/pytorch/pytorch/pull/49785 Reviewed By: mruberry Differential Revision: D25963833 Pulled By: bugra fbshipit-source-id: 11c90d6b8d3f206c9d0a4d8621b773beb10c6ba2 |
||
|
|
c311b8961a |
Revert D26113953: [pytorch][PR] [ZeroRedundancyOptimizer] Elastic and pytorch compatible checkpoints
Test Plan: revert-hammer
Differential Revision:
D26113953 (
|
||
|
|
bbe18e3527 |
[ZeroRedundancyOptimizer] Elastic and pytorch compatible checkpoints (#50956)
Summary: - Makes it possible to use non-sharded optimizer checkpoints (as long as the model/param groups are the same, of course) - Makes it possible to save with a given world size, and load with another world size - Use Torch Distributed built-in broadcast object list instead of a ad-hoc version Pull Request resolved: https://github.com/pytorch/pytorch/pull/50956 Reviewed By: malfet Differential Revision: D26113953 Pulled By: blefaudeux fbshipit-source-id: 030bfeee2c34c2d987590d45dc8efe05515f2e5c |
||
|
|
87fb3707d9 |
ZeroRedundancyOptimizer: an implementation of a standalone sharded optimizer wrapper (#46750)
Summary: Implement the first stage of ZeRO, sharding of the optimizer state, as described in [this blog post](https://www.microsoft.com/en-us/research/blog/zero-2-deepspeed-shattering-barriers-of-deep-learning-speed-scale/) and [this paper](https://arxiv.org/abs/1910.02054). This implementation is completely independent from the [DeepSpeed](https://github.com/microsoft/DeepSpeed) framework, and aims at providing ZeRO-compliant building blocks within the PyTorch scheme of things. This works by: - acting as a wrapper to a pytorch optimizer. ZeROptimizer does not optimize anything by itself, it only shards optimizers for distributed jobs - each rank distributes parameters according to a given partitioning scheme (could be updated), and owns the update of a given shard only - the .step() is called on each rank as expected, the fact that the optimizer actually works on a shard of the model is not visible from the outside - when the update is completed, each rank broadcasts the updated model shard to all the other ranks This can be used with DDP, although some communications are wasted in that case (gradients are all-reduced to all ranks). This implementation was initially developed in [Fairscale](https://github.com/facebookresearch/fairscale), and can also be used with an optimized DDP which only reduces to the relevant ranks. More context on ZeRO and PyTorch can be found in [this RFC](https://github.com/pytorch/pytorch/issues/42849) The API with respect to loading and saving the state is a known pain point and should probably be discussed an updated. Other possible follow ups include integrating more closely to a [modularized DDP](https://github.com/pytorch/pytorch/issues/37002), [making the checkpoints partition-agnostic](https://github.com/facebookresearch/fairscale/issues/164), [exposing a gradient clipping option](https://github.com/facebookresearch/fairscale/issues/98) and making sure that mixed precision states are properly handled. original authors include msbaines, min-xu-ai and myself Pull Request resolved: https://github.com/pytorch/pytorch/pull/46750 Reviewed By: mruberry Differential Revision: D25958918 Pulled By: blefaudeux fbshipit-source-id: 14280f2fd90cf251eee8ef9ac0f1fa6025ae9c50 |