Fix sharding algo + test it (#53942)

Summary:
This PR:
1. moves sharding algorithm from run_test.py to framework_utils.py (let me know if you have a better place for it)
2. adds tests for the algorithm in test_testing.py
3. fixes the algorithm so that it doesn't tack on the unknown jobs all to the shard with the minimum time, but instead distributes them around the shards.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/53942

Test Plan: python test/test_testing.py -k TestFrameworkUtils

Reviewed By: samestep

Differential Revision: D27047223

Pulled By: janeyx99

fbshipit-source-id: 824b20009c0bb707aa5361de445cdec795d5e3f1
This commit is contained in:
Jane Xu 2021-03-15 16:31:47 -07:00 committed by Facebook GitHub Bot
parent e91aeb0470
commit ee35060888
4 changed files with 126 additions and 18 deletions

View File

@ -38,6 +38,7 @@ files = tools/codegen/gen.py,
tools/autograd/*.py, tools/autograd/*.py,
tools/pyi/*.py, tools/pyi/*.py,
tools/test_history.py, tools/test_history.py,
torch/testing/_internal/framework_utils.py,
torch/testing/_internal/mypy_wrapper.py, torch/testing/_internal/mypy_wrapper.py,
torch/testing/_internal/print_test_stats.py, torch/testing/_internal/print_test_stats.py,
torch/utils/benchmark/utils/common.py, torch/utils/benchmark/utils/common.py,

View File

@ -16,6 +16,7 @@ import tempfile
import torch import torch
from torch.utils import cpp_extension from torch.utils import cpp_extension
from torch.testing._internal.common_utils import TEST_WITH_ROCM, shell, set_cwd, FILE_SCHEMA from torch.testing._internal.common_utils import TEST_WITH_ROCM, shell, set_cwd, FILE_SCHEMA
from torch.testing._internal.framework_utils import calculate_shards
import torch.distributed as dist import torch.distributed as dist
from typing import Dict, Optional, Tuple, List, Any from typing import Dict, Optional, Tuple, List, Any
@ -421,25 +422,7 @@ def calculate_job_times(reports: List[Dict[str, Any]]) -> Dict[str, Tuple[float,
return jobs_to_times return jobs_to_times
def calculate_shards(num_shards: int, tests: List[str], job_times: Dict[str, Tuple[float, int]]) -> List[Tuple[float, List[str]]]:
filtered_job_times: Dict[str, float] = dict()
for test in tests:
if test in job_times:
avg_time, _ = job_times[test]
filtered_job_times[test] = avg_time
else:
filtered_job_times[test] = 0.0
# The following attempts to implement a partition approximation greedy algorithm
# See more at https://en.wikipedia.org/wiki/Greedy_number_partitioning
sorted_jobs = sorted(filtered_job_times, key=lambda j: filtered_job_times[j], reverse=True)
sharded_jobs: List[Tuple[float, List[str]]] = [(0.0, []) for _ in range(num_shards)]
for job in sorted_jobs:
min_shard_index = sorted(range(num_shards), key=lambda i: sharded_jobs[i][0])[0]
curr_shard_time, curr_shard_jobs = sharded_jobs[min_shard_index]
curr_shard_jobs.append(job)
sharded_jobs[min_shard_index] = (curr_shard_time + filtered_job_times[job], curr_shard_jobs)
return sharded_jobs
def pull_job_times_from_S3() -> Dict[str, Tuple[float, int]]: def pull_job_times_from_S3() -> Dict[str, Tuple[float, int]]:

View File

@ -2,9 +2,11 @@ import torch
import math import math
from pathlib import PurePosixPath from pathlib import PurePosixPath
import random
from torch.testing._internal.common_utils import \ from torch.testing._internal.common_utils import \
(TestCase, make_tensor, run_tests, slowTest) (TestCase, make_tensor, run_tests, slowTest)
from torch.testing._internal.framework_utils import calculate_shards
from torch.testing._internal.common_device_type import \ from torch.testing._internal.common_device_type import \
(instantiate_device_type_tests, onlyCUDA, onlyOnCPUAndCUDA, dtypes) (instantiate_device_type_tests, onlyCUDA, onlyOnCPUAndCUDA, dtypes)
from torch.testing._internal import mypy_wrapper from torch.testing._internal import mypy_wrapper
@ -1305,5 +1307,99 @@ Added (across 1 suite) 2 tests, totaling + 3.02s
) )
class TestFrameworkUtils(TestCase):
tests = [
'super_long_test',
'long_test1',
'long_test2',
'normal_test1',
'normal_test2',
'normal_test3',
'short_test1',
'short_test2',
'short_test3',
'short_test4',
'short_test5',
]
test_times = {
'super_long_test': (55, 1),
'long_test1': (22, 2),
'long_test2': (18, 2),
'normal_test1': (9, 2),
'normal_test2': (7, 2),
'normal_test3': (5, 2),
'short_test1': (1, 2),
'short_test2': (0.6, 3),
'short_test3': (0.4, 5),
'short_test4': (0.3, 1),
'short_test5': (0.01, 2),
}
def test_calculate_2_shards_with_complete_test_times(self):
expected_shards = [
(60, ['super_long_test', 'normal_test3']),
(58.31, ['long_test1', 'long_test2', 'normal_test1', 'normal_test2', 'short_test1', 'short_test2',
'short_test3', 'short_test4', 'short_test5'])
]
self.assertEqual(expected_shards, calculate_shards(2, self.tests, self.test_times))
def test_calculate_5_shards_with_complete_test_times(self):
expected_shards = [
(55, ['super_long_test']),
(22, ['long_test1', ]),
(18, ['long_test2', ]),
(11.31, ['normal_test1', 'short_test1', 'short_test2', 'short_test3', 'short_test4', 'short_test5']),
(12, ['normal_test2', 'normal_test3']),
]
self.assertEqual(expected_shards, calculate_shards(5, self.tests, self.test_times))
def test_calculate_2_shards_with_incomplete_test_times(self):
incomplete_test_times = {k: v for k, v in self.test_times.items() if 'test1' in k}
expected_shards = [
(22, ['long_test1', 'long_test2', 'normal_test3', 'short_test3', 'short_test5']),
(10, ['normal_test1', 'short_test1', 'super_long_test', 'normal_test2', 'short_test2', 'short_test4']),
]
self.assertEqual(expected_shards, calculate_shards(2, self.tests, incomplete_test_times))
def test_calculate_5_shards_with_incomplete_test_times(self):
incomplete_test_times = {k: v for k, v in self.test_times.items() if 'test1' in k}
expected_shards = [
(22, ['long_test1', 'normal_test2', 'short_test5']),
(9, ['normal_test1', 'normal_test3']),
(1, ['short_test1', 'short_test2']),
(0, ['super_long_test', 'short_test3']),
(0, ['long_test2', 'short_test4']),
]
self.assertEqual(expected_shards, calculate_shards(5, self.tests, incomplete_test_times))
def test_calculate_2_shards_against_optimal_shards(self):
for _ in range(100):
random.seed(120)
random_times = {k: (random.random() * 10, 1) for k in self.tests}
# all test times except first two
rest_of_tests = [i for (k, (i, _)) in random_times.items() if k != 'super_long_test' and k != 'long_test1']
sum_of_rest = sum(rest_of_tests)
random_times['super_long_test'] = (max(sum_of_rest / 2, max(rest_of_tests)), 1)
random_times['long_test1'] = (sum_of_rest - random_times['super_long_test'][0], 1)
# An optimal sharding would look like the below, but we don't need to compute this for the test:
# optimal_shards = [
# (sum_of_rest, ['super_long_test', 'long_test1']),
# (sum_of_rest, [i for i in self.tests if i != 'super_long_test' and i != 'long_test1']),
# ]
calculated_shards = calculate_shards(2, self.tests, random_times)
max_shard_time = max(calculated_shards[0][0], calculated_shards[1][0])
if sum_of_rest != 0:
# The calculated shard should not have a ratio worse than 7/6 for num_shards = 2
self.assertGreaterEqual(7.0 / 6.0, max_shard_time / sum_of_rest)
sorted_tests = sorted(self.tests)
sorted_shard_tests = sorted(calculated_shards[0][1] + calculated_shards[1][1])
# All the tests should be represented by some shard
self.assertEqual(sorted_tests, sorted_shard_tests)
if __name__ == '__main__': if __name__ == '__main__':
run_tests() run_tests()

View File

@ -0,0 +1,28 @@
from typing import Dict, Tuple, List
def calculate_shards(num_shards: int, tests: List[str], job_times: Dict[str, Tuple[float, int]]) -> List[Tuple[float, List[str]]]:
filtered_job_times: Dict[str, float] = dict()
unknown_jobs : List[str] = []
for test in tests:
if test in job_times:
avg_time, _ = job_times[test]
filtered_job_times[test] = avg_time
else:
unknown_jobs.append(test)
# The following attempts to implement a partition approximation greedy algorithm
# See more at https://en.wikipedia.org/wiki/Greedy_number_partitioning
sorted_jobs = sorted(filtered_job_times, key=lambda j: filtered_job_times[j], reverse=True)
sharded_jobs: List[Tuple[float, List[str]]] = [(0.0, []) for _ in range(num_shards)]
for job in sorted_jobs:
min_shard_index = sorted(range(num_shards), key=lambda i: sharded_jobs[i][0])[0]
curr_shard_time, curr_shard_jobs = sharded_jobs[min_shard_index]
curr_shard_jobs.append(job)
sharded_jobs[min_shard_index] = (curr_shard_time + filtered_job_times[job], curr_shard_jobs)
# Round robin the unknown jobs starting with the smallest shard
index = sorted(range(num_shards), key=lambda i: sharded_jobs[i][0])[0]
for job in unknown_jobs:
sharded_jobs[index][1].append(job)
index = (index + 1) % num_shards
return sharded_jobs