Remove NO_MULTIPROCESSING_SPAWN checks (#146705)

py 3.9 has spawn.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/146705
Approved by: https://github.com/colesbury
This commit is contained in:
cyyever 2025-02-28 00:15:32 +00:00 committed by PyTorch MergeBot
parent 2978771c9d
commit 40ad5e01df
11 changed files with 24 additions and 157 deletions

View File

@ -17,7 +17,6 @@ from torch.testing._internal.common_distributed import (
skip_if_rocm_multiprocess, skip_if_rocm_multiprocess,
) )
from torch.testing._internal.common_utils import ( from torch.testing._internal.common_utils import (
NO_MULTIPROCESSING_SPAWN,
run_tests, run_tests,
skip_but_pass_in_sandcastle_if, skip_but_pass_in_sandcastle_if,
TEST_WITH_DEV_DBG_ASAN, TEST_WITH_DEV_DBG_ASAN,
@ -47,10 +46,6 @@ if TEST_WITH_DEV_DBG_ASAN:
) )
sys.exit(0) sys.exit(0)
if NO_MULTIPROCESSING_SPAWN:
print("Spawn not available, skipping tests.", file=sys.stderr)
sys.exit(0)
BACKEND = os.environ["BACKEND"] BACKEND = os.environ["BACKEND"]
if BACKEND == "gloo" or BACKEND == "nccl": if BACKEND == "gloo" or BACKEND == "nccl":

View File

@ -37,7 +37,6 @@ from torch.testing._internal.common_utils import (
IS_CI, IS_CI,
IS_MACOS, IS_MACOS,
IS_WINDOWS, IS_WINDOWS,
NO_MULTIPROCESSING_SPAWN,
run_tests, run_tests,
skip_but_pass_in_sandcastle_if, skip_but_pass_in_sandcastle_if,
skip_if_pytest, skip_if_pytest,
@ -509,11 +508,6 @@ if not (TEST_WITH_DEV_DBG_ASAN or IS_WINDOWS or IS_MACOS):
mpc._poll() mpc._poll()
self.assertEqual(4, mock_join.call_count) self.assertEqual(4, mock_join.call_count)
@skip_but_pass_in_sandcastle_if(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
def test_multiprocessing_context_poll_raises_exception(self): def test_multiprocessing_context_poll_raises_exception(self):
mp_context = MultiprocessContext( mp_context = MultiprocessContext(
name="test_mp", name="test_mp",

View File

@ -8,7 +8,7 @@ import torch
import torch.distributed as c10d import torch.distributed as c10d
import torch.multiprocessing as mp import torch.multiprocessing as mp
from torch.testing._internal.common_distributed import MultiProcessTestCase from torch.testing._internal.common_distributed import MultiProcessTestCase
from torch.testing._internal.common_utils import load_tests, NO_MULTIPROCESSING_SPAWN from torch.testing._internal.common_utils import load_tests
# Torch distributed.nn is not available in windows # Torch distributed.nn is not available in windows
@ -27,10 +27,6 @@ if not c10d.is_available():
print("c10d not available, skipping tests", file=sys.stderr) print("c10d not available, skipping tests", file=sys.stderr)
sys.exit(0) sys.exit(0)
if NO_MULTIPROCESSING_SPAWN:
print("spawn not available, skipping tests", file=sys.stderr)
sys.exit(0)
class AbstractProcessGroupShareTensorTest: class AbstractProcessGroupShareTensorTest:
world_size = 2 world_size = 2

View File

@ -13,11 +13,7 @@ if not dist.is_available():
print("Distributed not available, skipping tests", file=sys.stderr) print("Distributed not available, skipping tests", file=sys.stderr)
sys.exit(0) sys.exit(0)
from torch.testing._internal.common_utils import ( from torch.testing._internal.common_utils import run_tests, TEST_WITH_DEV_DBG_ASAN
NO_MULTIPROCESSING_SPAWN,
run_tests,
TEST_WITH_DEV_DBG_ASAN,
)
from torch.testing._internal.distributed.distributed_test import ( from torch.testing._internal.distributed.distributed_test import (
DistributedTest, DistributedTest,
TestDistBackend, TestDistBackend,
@ -31,10 +27,6 @@ if TEST_WITH_DEV_DBG_ASAN:
) )
sys.exit(0) sys.exit(0)
if NO_MULTIPROCESSING_SPAWN:
print("Spawn not available, skipping tests.", file=sys.stderr)
sys.exit(0)
_allowed_backends = ("gloo", "nccl", "ucc") _allowed_backends = ("gloo", "nccl", "ucc")
if ( if (
"BACKEND" not in os.environ "BACKEND" not in os.environ

View File

@ -59,8 +59,7 @@ from torch.testing._internal.common_utils import (
IS_LINUX, IS_LINUX,
IS_SANDCASTLE, IS_SANDCASTLE,
IS_WINDOWS, IS_WINDOWS,
load_tests, load_tests,
NO_MULTIPROCESSING_SPAWN,
parametrize, parametrize,
run_tests, run_tests,
serialTest, serialTest,
@ -1088,11 +1087,6 @@ except RuntimeError as e:
@slowTest @slowTest
@unittest.skipIf(TEST_WITH_ROCM, "ROCm doesn't support device side asserts") @unittest.skipIf(TEST_WITH_ROCM, "ROCm doesn't support device side asserts")
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
def test_multinomial_invalid_probs_cuda(self): def test_multinomial_invalid_probs_cuda(self):
self._spawn_test_multinomial_invalid_probs_cuda([1.0, -1.0, 1.0]) self._spawn_test_multinomial_invalid_probs_cuda([1.0, -1.0, 1.0])
self._spawn_test_multinomial_invalid_probs_cuda([1.0, inf, 1.0]) self._spawn_test_multinomial_invalid_probs_cuda([1.0, inf, 1.0])
@ -1121,11 +1115,6 @@ except RuntimeError as e:
return err return err
@slowTest @slowTest
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@skipIfRocm @skipIfRocm
def test_index_out_of_bounds_exception_cuda(self): def test_index_out_of_bounds_exception_cuda(self):
test_method = TestCuda._test_index_bounds_cuda test_method = TestCuda._test_index_bounds_cuda

View File

@ -28,7 +28,6 @@ from torch.testing._internal.common_utils import (
IS_SANDCASTLE, IS_SANDCASTLE,
IS_WINDOWS, IS_WINDOWS,
load_tests, load_tests,
NO_MULTIPROCESSING_SPAWN,
parametrize, parametrize,
run_tests, run_tests,
skipIfNoDill, skipIfNoDill,
@ -101,21 +100,20 @@ TEST_CUDA_IPC = (
TEST_MULTIGPU = TEST_CUDA_IPC and torch.cuda.device_count() > 1 TEST_MULTIGPU = TEST_CUDA_IPC and torch.cuda.device_count() > 1
if not NO_MULTIPROCESSING_SPAWN: # We want to use `spawn` if able because some of our tests check that the
# We want to use `spawn` if able because some of our tests check that the # data loader terminates gracefully. To prevent hanging in the testing
# data loader terminiates gracefully. To prevent hanging in the testing # process, such data loaders are run in a separate subprocess.
# process, such data loaders are run in a separate subprocess. #
# # We also want to test the `pin_memory=True` configuration, thus `spawn` is
# We also want to test the `pin_memory=True` configuration, thus `spawn` is # required to launch such processes and they initialize the CUDA context.
# required to launch such processes and they initialize the CUDA context. #
# # Mixing different start method is a recipe for disaster (e.g., using a fork
# Mixing different start method is a recipe for disaster (e.g., using a fork # `mp.Event` with a spawn `mp.Process` segfaults). So we set this globally
# `mp.Event` with a spawn `mp.Process` segfaults). So we set this globally # to avoid bugs.
# to avoid bugs. #
# # Get a multiprocessing context because some test / third party library will
# Get a multiprocessing context because some test / third party library will # set start_method when imported, and setting again triggers `RuntimeError`.
# set start_method when imported, and setting again triggers `RuntimeError`. mp = mp.get_context(method="spawn")
mp = mp.get_context(method="spawn")
# 60s of timeout? # 60s of timeout?
@ -1430,7 +1428,7 @@ except RuntimeError as e:
p.terminate() p.terminate()
def test_timeout(self): def test_timeout(self):
if TEST_CUDA and not NO_MULTIPROCESSING_SPAWN: if TEST_CUDA:
# This test runs in a subprocess, which can only initialize CUDA with spawn. # This test runs in a subprocess, which can only initialize CUDA with spawn.
# _test_timeout_pin_memory with pin_memory=True initializes CUDA when the iterator is # _test_timeout_pin_memory with pin_memory=True initializes CUDA when the iterator is
# constructed. # constructed.
@ -2313,8 +2311,7 @@ except RuntimeError as e:
def test_sampler(self): def test_sampler(self):
self._test_sampler() self._test_sampler()
self._test_sampler(num_workers=4) self._test_sampler(num_workers=4)
if not NO_MULTIPROCESSING_SPAWN: self._test_batch_sampler(num_workers=4, multiprocessing_context="spawn")
self._test_batch_sampler(num_workers=4, multiprocessing_context="spawn")
def _test_batch_sampler(self, **kwargs): def _test_batch_sampler(self, **kwargs):
# [(0, 1), (2, 3, 4), (5, 6), (7, 8, 9), ...] # [(0, 1), (2, 3, 4), (5, 6), (7, 8, 9), ...]
@ -2338,8 +2335,7 @@ except RuntimeError as e:
def test_batch_sampler(self): def test_batch_sampler(self):
self._test_batch_sampler() self._test_batch_sampler()
self._test_batch_sampler(num_workers=4) self._test_batch_sampler(num_workers=4)
if not NO_MULTIPROCESSING_SPAWN: self._test_batch_sampler(num_workers=4, multiprocessing_context="spawn")
self._test_batch_sampler(num_workers=4, multiprocessing_context="spawn")
@unittest.skipIf(not TEST_CUDA, "CUDA unavailable") @unittest.skipIf(not TEST_CUDA, "CUDA unavailable")
def test_shuffle_pin_memory(self): def test_shuffle_pin_memory(self):
@ -2496,7 +2492,7 @@ except RuntimeError as e:
# not be called before process end. It is important to see that the # not be called before process end. It is important to see that the
# processes still exit in both cases. # processes still exit in both cases.
if pin_memory and (not TEST_CUDA or NO_MULTIPROCESSING_SPAWN or IS_WINDOWS): if pin_memory and (not TEST_CUDA or IS_WINDOWS):
# This test runs in a subprocess, which can only initialize CUDA with spawn. # This test runs in a subprocess, which can only initialize CUDA with spawn.
# DataLoader with pin_memory=True initializes CUDA when its iterator is constructed. # DataLoader with pin_memory=True initializes CUDA when its iterator is constructed.
# For windows, pin_memory sometimes causes CUDA oom. # For windows, pin_memory sometimes causes CUDA oom.

View File

@ -19,7 +19,6 @@ from torch.testing._internal.common_utils import (
IS_MACOS, IS_MACOS,
IS_WINDOWS, IS_WINDOWS,
load_tests, load_tests,
NO_MULTIPROCESSING_SPAWN,
run_tests, run_tests,
slowTest, slowTest,
TEST_WITH_ASAN, TEST_WITH_ASAN,
@ -471,30 +470,17 @@ class TestMultiprocessing(TestCase):
with ctx.Pool(3) as pool: with ctx.Pool(3) as pool:
pool.map(simple_autograd_function, [1, 2, 3]) pool.map(simple_autograd_function, [1, 2, 3])
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN, "Test needs to use spawn multiprocessing"
)
def test_autograd_fine_with_spawn(self): def test_autograd_fine_with_spawn(self):
ctx = mp.get_context("spawn") ctx = mp.get_context("spawn")
simple_autograd_function() simple_autograd_function()
with ctx.Pool(3) as pool: with ctx.Pool(3) as pool:
pool.map(simple_autograd_function, [1, 2, 3]) pool.map(simple_autograd_function, [1, 2, 3])
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_simple(self): def test_cuda_simple(self):
torch.cuda.FloatTensor([1]) # initialize CUDA outside of leak checker torch.cuda.FloatTensor([1]) # initialize CUDA outside of leak checker
self._test_sharing(mp.get_context("spawn"), "cuda", torch.float) self._test_sharing(mp.get_context("spawn"), "cuda", torch.float)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_memory_allocation(self): def test_cuda_memory_allocation(self):
ctx = mp.get_context("spawn") ctx = mp.get_context("spawn")
@ -512,11 +498,6 @@ class TestMultiprocessing(TestCase):
e.set() e.set()
p.join(1) p.join(1)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_ipc_deadlock(self): def test_cuda_ipc_deadlock(self):
ctx = mp.get_context("spawn") ctx = mp.get_context("spawn")
@ -536,11 +517,6 @@ class TestMultiprocessing(TestCase):
self.assertFalse(p.is_alive()) self.assertFalse(p.is_alive())
@slowTest @slowTest
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_send_many(self, name=None, size=5, count=100000): def test_cuda_send_many(self, name=None, size=5, count=100000):
ctx = mp.get_context("spawn") ctx = mp.get_context("spawn")
@ -572,11 +548,6 @@ class TestMultiprocessing(TestCase):
p2.join(1) p2.join(1)
p3.join(1) p3.join(1)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
@unittest.skipIf(not TEST_MULTIGPU, "found only 1 GPU") @unittest.skipIf(not TEST_MULTIGPU, "found only 1 GPU")
def test_cuda_small_tensors(self): def test_cuda_small_tensors(self):
@ -659,11 +630,6 @@ if __name__ == "__main__":
) )
self.assertRegex(stderr, "Cannot re-initialize CUDA in forked subprocess.") self.assertRegex(stderr, "Cannot re-initialize CUDA in forked subprocess.")
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_event(self): def test_event(self):
ctx = mp.get_context("spawn") ctx = mp.get_context("spawn")
@ -695,11 +661,6 @@ if __name__ == "__main__":
event.synchronize() event.synchronize()
c2p.put(1) # notify parent synchronization is done c2p.put(1) # notify parent synchronization is done
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_event_multiprocess(self): def test_event_multiprocess(self):
event = torch.cuda.Event(enable_timing=False, interprocess=True) event = torch.cuda.Event(enable_timing=False, interprocess=True)
@ -724,11 +685,6 @@ if __name__ == "__main__":
self.assertTrue(event.query()) self.assertTrue(event.query())
p.join() p.join()
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
@unittest.skipIf(not TEST_MULTIGPU, "found only 1 GPU") @unittest.skipIf(not TEST_MULTIGPU, "found only 1 GPU")
def test_event_handle_multi_gpu(self): def test_event_handle_multi_gpu(self):
@ -760,11 +716,6 @@ if __name__ == "__main__":
c2p.put(1) # notify synchronization is done in child c2p.put(1) # notify synchronization is done in child
p2c.get() # wait for parent to finish before destructing child event p2c.get() # wait for parent to finish before destructing child event
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_event_handle_importer(self): def test_event_handle_importer(self):
e0 = torch.cuda.Event(enable_timing=False, interprocess=True) e0 = torch.cuda.Event(enable_timing=False, interprocess=True)
@ -802,11 +753,6 @@ if __name__ == "__main__":
# destructing e1 # destructing e1
p2c.get() p2c.get()
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_event_handle_exporter(self): def test_event_handle_exporter(self):
e0 = torch.cuda.Event(enable_timing=False, interprocess=True) e0 = torch.cuda.Event(enable_timing=False, interprocess=True)
@ -933,7 +879,7 @@ if __name__ == "__main__":
@unittest.skipIf(TEST_WITH_ASAN, "non-deterministically hangs with ASAN") @unittest.skipIf(TEST_WITH_ASAN, "non-deterministically hangs with ASAN")
def test_leaf_variable_sharing(self): def test_leaf_variable_sharing(self):
devices = ["cpu"] devices = ["cpu"]
if torch.cuda.is_available() and not NO_MULTIPROCESSING_SPAWN and TEST_CUDA_IPC: if torch.cuda.is_available() and TEST_CUDA_IPC:
devices.append("cuda") devices.append("cuda")
for device in devices: for device in devices:
for requires_grad in [True, False]: for requires_grad in [True, False]:
@ -968,11 +914,6 @@ if __name__ == "__main__":
RuntimeError, r"requires_grad", lambda: queue.put(var) RuntimeError, r"requires_grad", lambda: queue.put(var)
) )
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_variable_sharing(self): def test_cuda_variable_sharing(self):
for requires_grad in [True, False]: for requires_grad in [True, False]:
@ -983,11 +924,6 @@ if __name__ == "__main__":
) )
self._test_autograd_sharing(var, mp.get_context("spawn")) self._test_autograd_sharing(var, mp.get_context("spawn"))
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_mixed_types_cuda_sharing(self): def test_mixed_types_cuda_sharing(self):
self._test_mixed_types_cuda_sharing(mp.get_context("spawn")) self._test_mixed_types_cuda_sharing(mp.get_context("spawn"))
@ -996,29 +932,14 @@ if __name__ == "__main__":
param = Parameter(torch.arange(1.0, 26).view(5, 5)) param = Parameter(torch.arange(1.0, 26).view(5, 5))
self._test_autograd_sharing(param, is_parameter=True) self._test_autograd_sharing(param, is_parameter=True)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_cuda_parameter_sharing(self): def test_cuda_parameter_sharing(self):
param = Parameter(torch.arange(1.0, 26, device="cuda").view(5, 5)) param = Parameter(torch.arange(1.0, 26, device="cuda").view(5, 5))
self._test_autograd_sharing(param, mp.get_context("spawn"), is_parameter=True) self._test_autograd_sharing(param, mp.get_context("spawn"), is_parameter=True)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
def test_integer_parameter_serialization_cpu(self): def test_integer_parameter_serialization_cpu(self):
self._test_integer_parameter_serialization(device="cpu") self._test_integer_parameter_serialization(device="cpu")
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available") @unittest.skipIf(not TEST_CUDA_IPC, "CUDA IPC not available")
def test_integer_parameter_serialization_cuda(self): def test_integer_parameter_serialization_cuda(self):
self._test_integer_parameter_serialization(device="cuda") self._test_integer_parameter_serialization(device="cuda")

View File

@ -12,7 +12,6 @@ import torch.multiprocessing as mp
from torch.testing._internal.common_utils import ( from torch.testing._internal.common_utils import (
IS_WINDOWS, IS_WINDOWS,
NO_MULTIPROCESSING_SPAWN,
run_tests, run_tests,
TestCase, TestCase,
parametrize, parametrize,
@ -212,9 +211,6 @@ class _TestMultiProcessing:
self.assertLess(time.time() - start, nested_child_sleep / 2) self.assertLess(time.time() - start, nested_child_sleep / 2)
time.sleep(0.1) time.sleep(0.1)
@unittest.skipIf(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that don't support the spawn start method")
class SpawnTest(TestCase, _TestMultiProcessing): class SpawnTest(TestCase, _TestMultiProcessing):
start_method = 'spawn' start_method = 'spawn'

View File

@ -36,7 +36,7 @@ from torch.testing._internal.common_optimizers import (
from torch.testing._internal.common_utils import ( # type: ignore[attr-defined] from torch.testing._internal.common_utils import ( # type: ignore[attr-defined]
MI300_ARCH, TEST_WITH_TORCHINDUCTOR, TEST_WITH_ROCM, run_tests, IS_JETSON, MI300_ARCH, TEST_WITH_TORCHINDUCTOR, TEST_WITH_ROCM, run_tests, IS_JETSON,
IS_FILESYSTEM_UTF8_ENCODING, NO_MULTIPROCESSING_SPAWN, IS_FILESYSTEM_UTF8_ENCODING,
IS_SANDCASTLE, IS_FBCODE, IS_REMOTE_GPU, skipIfRocmArch, skipIfTorchInductor, load_tests, slowTest, slowTestIf, IS_SANDCASTLE, IS_FBCODE, IS_REMOTE_GPU, skipIfRocmArch, skipIfTorchInductor, load_tests, slowTest, slowTestIf,
skipIfCrossRef, TEST_WITH_CROSSREF, skipIfTorchDynamo, skipRocmIfTorchInductor, set_default_dtype, skipIfCrossRef, TEST_WITH_CROSSREF, skipIfTorchDynamo, skipRocmIfTorchInductor, set_default_dtype,
skipCUDAMemoryLeakCheckIf, BytesIOContext, skipCUDAMemoryLeakCheckIf, BytesIOContext,
@ -9603,8 +9603,6 @@ tensor([[[1.+1.j, 1.+1.j, 1.+1.j, ..., 1.+1.j, 1.+1.j, 1.+1.j],
# FIXME: port to a distributed test suite # FIXME: port to a distributed test suite
@slowTest @slowTest
@unittest.skipIf(NO_MULTIPROCESSING_SPAWN, "Disabled for environments that \
don't support multiprocessing with spawn start method")
def test_multinomial_invalid_probs(self): def test_multinomial_invalid_probs(self):
def _spawn_method(self, method, arg): def _spawn_method(self, method, arg):
try: try:

View File

@ -1437,11 +1437,7 @@ NOTEST_CPU = "cpu" in split_if_not_empty(os.getenv('PYTORCH_TESTING_DEVICE_EXCEP
skipIfNoDill = unittest.skipIf(not TEST_DILL, "no dill") skipIfNoDill = unittest.skipIf(not TEST_DILL, "no dill")
# Python 2.7 doesn't have spawn NO_MULTIPROCESSING_SPAWN: bool = False
NO_MULTIPROCESSING_SPAWN: bool = TestEnvironment.def_flag(
"NO_MULTIPROCESSING_SPAWN",
env_var="NO_MULTIPROCESSING_SPAWN",
)
TEST_WITH_ASAN: bool = TestEnvironment.def_flag( TEST_WITH_ASAN: bool = TestEnvironment.def_flag(
"TEST_WITH_ASAN", "TEST_WITH_ASAN",
env_var="PYTORCH_TEST_WITH_ASAN", env_var="PYTORCH_TEST_WITH_ASAN",

View File

@ -82,7 +82,6 @@ from torch.testing._internal.common_utils import (
IS_WINDOWS, IS_WINDOWS,
FILE_SCHEMA, FILE_SCHEMA,
IS_FBCODE, IS_FBCODE,
NO_MULTIPROCESSING_SPAWN,
IS_SANDCASTLE, IS_SANDCASTLE,
skip_but_pass_in_sandcastle, skip_but_pass_in_sandcastle,
skip_but_pass_in_sandcastle_if, skip_but_pass_in_sandcastle_if,
@ -5119,11 +5118,6 @@ class DistributedTest:
BACKEND not in DistTestCases.backend_feature["cuda"], BACKEND not in DistTestCases.backend_feature["cuda"],
f"The {BACKEND} backend does not support DDP communication hook on CUDA devices", f"The {BACKEND} backend does not support DDP communication hook on CUDA devices",
) )
@skip_but_pass_in_sandcastle_if(
NO_MULTIPROCESSING_SPAWN,
"Disabled for environments that \
don't support multiprocessing with spawn start method",
)
@skip_if_lt_x_gpu(int(os.environ["WORLD_SIZE"])) @skip_if_lt_x_gpu(int(os.environ["WORLD_SIZE"]))
def test_ddp_hook_parity_post_localSGD(self): def test_ddp_hook_parity_post_localSGD(self):
# Although we start run local SGD at iteration 10, since we still use the global process group to run it, # Although we start run local SGD at iteration 10, since we still use the global process group to run it,