[Gradient Compression] Check start_PowerSGD_iter > 1 and add guidance on tuning PowerSGD configs. (#51427)

Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/51427

A user reported that `start_PowerSGD_iter` failed when it's set as 1. This is because allocating memory for error tensors somehow overlap with bucket rebuilding process at iteration 1.

Check `start_PowerSGD_iter > 1` instead of `start_PowerSGD_iter >= 1`.

Also add a unit test of `test_invalid_powerSGD_state` and some guidance on tuning PowerSGD configs.

Original PR issue: Investigate Applying PowerSGD to Communication Hook for Gradient Compression #47202
ghstack-source-id: 120834126

Test Plan: buck test mode/dev-nosan caffe2/test/distributed:c10d -- test_invalid_powerSGD_state

Reviewed By: rohan-varma

Differential Revision: D26166897

fbshipit-source-id: 34d5b64bb3dd43acb61d792626c70e6c8bb44a5d
This commit is contained in:
Yi Wang 2021-02-02 04:28:19 -08:00 committed by Facebook GitHub Bot
parent d555768e8f
commit 79e7544cb4
2 changed files with 64 additions and 11 deletions

View File

@ -3826,6 +3826,22 @@ class DistributedDataParallelTest(MultiProcessTestCase):
def test_ddp_comm_hook_allreduce_hook_nccl_grad_is_view(self):
self._test_ddp_comm_hook_allreduce_hook_nccl(gradient_as_bucket_view=True)
def test_invalid_powerSGD_state(self):
for start_powerSGD_iter, use_error_feedback, warm_start in product([0, 1], [True, False], [True, False]):
if not use_error_feedback and not warm_start:
continue
with self.assertRaisesRegex(
ValueError,
"Expect `start_powerSGD_iter` > 1 if `use_error_feedback` or `warm_start` is enabled, "
"because PowerSGD can only be applied after the first two iterations in DDP."):
state = powerSGD.PowerSGDState(
process_group=None,
matrix_approximation_rank=1,
start_powerSGD_iter=start_powerSGD_iter,
use_error_feedback=use_error_feedback,
warm_start=warm_start,
)
@requires_nccl()
@skip_if_lt_x_gpu(2)
def test_default_ddp_comm_hooks_nccl_is_view(self):

View File

@ -33,6 +33,30 @@ def _orthogonalize(matrix, epsilon=1e-8):
class PowerSGDState(object):
"""
Stores both the gradient compression configs and the internal states for all the gradients during the training.
Particularly, `matrix_approximation_rank` and `start_powerSGD_iter` are the main configs that need to be tuned by the user.
Although `use_error_feedback` and `warm_start` can also be tuned by the user,
they are typically turned on for performance.
Note [Guidance to Tune `matrix_approximation_rank` And `start_powerSGD_iter`]
~~~~~~~~~~~~~~~~~~~~~~~~~~
1) To tune `matrix_approximation_rank`, the user can increase it from 1 by factors of 2,
until a satisfying accuracy can be reached.
The increase of `matrix_approximation_rank` can substantially increase the computation costs of the compression.
However, the accuracy may not be futher improved beyond a certain `matrix_approximation_rank` value.
2) To tune `start_powerSGD_iter`, the user can typically start with 10% of total training steps,
and increase it until a satisfying accuracy can be reached.
Deferrring PowerSGD can effectively improve the accuracy,
even a relatively small `matrix_approximation_rank` is used.
This is because that, the beginning of training phase is usually very sensitive to inaccurate gradients,
and compressing gradients too early may make the training quickly take a suboptimal trajectory,
which can result in an irrecoverable impact on the accuracy.
The minimum value allowed in DDP is 2, if error feedback or warm-up is enabled.
This is because there is another internal optimization that rebuilds buckets at iteration 1 in DDP,
and this can conflict with any tensor memorized before the rebuild process.
"""
__slots__ = [
"process_group",
# The two fields below are the configs that usually need to be tuned by the user.
@ -58,6 +82,16 @@ class PowerSGDState(object):
warm_start=True,
random_seed=0,
):
logging.info(
"PowerSGD config: matrix_approximation_rank = {}; "
"start_powerSGD_iter = {}; use_error_feedback = {}; warm_start = {}.".format(
matrix_approximation_rank,
start_powerSGD_iter,
use_error_feedback,
warm_start,
)
)
self.process_group = process_group
# The low rank for matrix approximation controls the size of compressed low-rank tensors,
# which determines the computation ratio.
@ -80,7 +114,11 @@ class PowerSGDState(object):
# However, this means that the shape of input bucketized tensors is subject to change,
# which will complicate the implementations of error feedback and warm-up.
# Running vanilla allreduce in the first few iterations can avoid this complexity.
assert start_powerSGD_iter >= 1
if (use_error_feedback or warm_start) and start_powerSGD_iter <= 1:
raise ValueError(
"Expect `start_powerSGD_iter` > 1 if `use_error_feedback` or `warm_start` is enabled, "
"because PowerSGD can only be applied after the first two iterations in DDP."
)
self.start_powerSGD_iter = start_powerSGD_iter
# Error feedback is usually crucial for both for convergence and generalization,
# because PowerSGD is a biased compressor,
@ -111,16 +149,6 @@ class PowerSGDState(object):
# Iteration/step in the training loop.
self.iter = 0
logging.info(
"PowerSGD config: matrix_approximation_rank = {}; "
"start_powerSGD_iter = {}; use_error_feedback = {}; warm_start = {}.".format(
self.matrix_approximation_rank,
self.start_powerSGD_iter,
self.use_error_feedback,
self.warm_start,
)
)
def maybe_increase_iter(self, bucket):
# Since bucket 0 is the last bucket to allreduce in an iteration.
# Only increase `iter` when bucket 0 is processed.
@ -165,6 +193,7 @@ def powerSGD_hook(state: PowerSGDState, bucket) -> torch.futures.Future:
Args:
state (PowerSGDState): State information to configure the compression rate and support error feedback, warm start, etc.
To tune the compression configs, see Note [Guidance to Tune `matrix_approximation_rank` And `start_powerSGD_iter`].
bucket (dist._GradBucket): Bucket that stores a 1D flattened gradient tensor that batches multiple per-variable tensors.
Note that since DDP comm hook only supports single process single device mode at this time,
only exactly one tensor is stored in this bucket.
@ -399,6 +428,13 @@ def batched_powerSGD_hook(state: PowerSGDState, bucket) -> torch.futures.Future:
7) Computes M, which is approximately equal to PQ^T.
8) Truncates the input tensor to the original length.
This variant is faster than `powerSGD_hook` that runs layer-wise gradient compression,
but it usually results in a much lower accuracy, unless `matrix_approximation_rank` in the state is 1.
Increasing `matrix_approximation_rank` may not necessarily increase the accuracy,
because batching per-parameter tensors without column/row alignment can destroy low-rank structure.
Therefore, the user shoud always consider `powerSGD_hook` first,
and only consider this variant when a satisfying accuracy can be achieved when `matrix_approximation_rank` is 1.
Note that this communication hook enforces vanilla allreduce for the first `state.start_powerSGD_iter` iterations.
This can not only allow the user to have a finer tuning over the tradeoff between speedup and accuracy,
but also help abstract away some complexity of the internal optimization of DDP for future communication hook developers.
@ -409,6 +445,7 @@ def batched_powerSGD_hook(state: PowerSGDState, bucket) -> torch.futures.Future:
Args:
state (PowerSGDState): State information to configure the compression rate and support error feedback, warm start, etc.
To tune the compression configs, see Note [Guidance to Tune `matrix_approximation_rank` And `start_powerSGD_iter`].
bucket (dist._GradBucket): Bucket that stores a 1D flattened gradient tensor that batches multiple per-variable tensors.
Note that since DDP comm hook only supports single process single device mode at this time,
only exactly one tensor is stored in this bucket.