mirror of
https://github.com/zebrajr/pytorch.git
synced 2025-12-07 00:21:07 +01:00
This PR only adds abstract class registration logic without touching existing tests so they still trace with real script object. The added tests are only for registration APIs and test error messages. Our design is that the abstract implementation should be in Python. This is much better in terms of usability. But this also has implications for custom op that takes script object as input, which is detailed later in this stack. Pull Request resolved: https://github.com/pytorch/pytorch/pull/122622 Approved by: https://github.com/zou3519 ghstack dependencies: #122619, #122620, #122621
628 lines
26 KiB
Python
628 lines
26 KiB
Python
# Owner(s): ["oncall: export"]
|
|
|
|
import unittest
|
|
|
|
import torch
|
|
import torch.utils._pytree as pytree
|
|
from torch._higher_order_ops.torchbind import enable_torchbind_tracing
|
|
from torch._library.fake_class_registry import FakeScriptObject
|
|
from torch.export import export
|
|
from torch.export._trace import _export
|
|
from torch.fx.experimental.proxy_tensor import make_fx
|
|
from torch.testing._internal.common_utils import (
|
|
find_library_location,
|
|
instantiate_parametrized_tests,
|
|
IS_FBCODE,
|
|
IS_MACOS,
|
|
IS_SANDCASTLE,
|
|
IS_WINDOWS,
|
|
parametrize,
|
|
run_tests,
|
|
skipIfTorchDynamo,
|
|
TestCase,
|
|
)
|
|
from torch.testing._internal.torchbind_impls import register_fake_operators
|
|
|
|
|
|
def load_torchbind_test_lib():
|
|
if IS_SANDCASTLE or IS_FBCODE:
|
|
torch.ops.load_library("//caffe2/test/cpp/jit:test_custom_class_registrations")
|
|
elif IS_MACOS:
|
|
raise unittest.SkipTest("non-portable load_library call used in test")
|
|
else:
|
|
lib_file_path = find_library_location("libtorchbind_test.so")
|
|
if IS_WINDOWS:
|
|
lib_file_path = find_library_location("torchbind_test.dll")
|
|
torch.ops.load_library(str(lib_file_path))
|
|
|
|
register_fake_operators()
|
|
|
|
|
|
@skipIfTorchDynamo("torchbind not supported with dynamo yet")
|
|
class TestExportTorchbind(TestCase):
|
|
def setUp(self):
|
|
load_torchbind_test_lib()
|
|
|
|
@torch._library.register_fake_class("_TorchScriptTesting::_Foo")
|
|
class FakeFoo:
|
|
def __init__(self, x: int, y: int):
|
|
self.x = x
|
|
self.y = y
|
|
|
|
@classmethod
|
|
def from_real(cls, foo):
|
|
(x, y), _ = foo.__getstate__()
|
|
return cls(x, y)
|
|
|
|
def add_tensor(self, z):
|
|
return (self.x + self.y) * z
|
|
|
|
test = self
|
|
test.tq_push_counter = 0
|
|
test.tq_pop_counter = 0
|
|
test.tq_size_counter = 0
|
|
|
|
@torch._library.register_fake_class("_TorchScriptTesting::_TensorQueue")
|
|
class FakeTensorQueue:
|
|
def __init__(self, q):
|
|
self.queue = q
|
|
|
|
@classmethod
|
|
def from_real(cls, real_tq):
|
|
ctx = torch.library.get_ctx()
|
|
fake_queue = [ctx.to_fake_tensor(t) for t in real_tq.get_raw_queue()]
|
|
return cls(fake_queue)
|
|
|
|
def push(self, x):
|
|
test.tq_push_counter += 1
|
|
self.queue.append(x)
|
|
|
|
def pop(self):
|
|
test.tq_pop_counter += 1
|
|
return self.queue.pop(0)
|
|
|
|
def size(self):
|
|
test.tq_size_counter += 1
|
|
return len(self.queue)
|
|
|
|
def tearDown(self):
|
|
torch._library.fake_class_registry.deregister_fake_class(
|
|
"_TorchScriptTesting::_Foo"
|
|
)
|
|
torch._library.fake_class_registry.deregister_fake_class(
|
|
"_TorchScriptTesting::_TensorQueue"
|
|
)
|
|
|
|
def _assertEqualSkipScriptObject(self, exp, actual):
|
|
flat_exp = pytree.tree_leaves(exp)
|
|
flat_actual = pytree.tree_leaves(actual)
|
|
self.assertEqual(len(flat_exp), len(flat_actual))
|
|
for a, b in zip(flat_exp, flat_actual):
|
|
if isinstance(a, torch.ScriptObject) and isinstance(b, torch.ScriptObject):
|
|
continue
|
|
self.assertEqual(a, b)
|
|
|
|
def _test_export_same_as_eager(
|
|
self, f, args, kwargs=None, strict=True, pre_dispatch=False
|
|
):
|
|
kwargs = kwargs or {}
|
|
|
|
def export_wrapper(f, args, kwargs, strcit, pre_dispatch):
|
|
with enable_torchbind_tracing():
|
|
if pre_dispatch:
|
|
exported_program = _export(
|
|
f, args, kwargs, strict=strict, pre_dispatch=True
|
|
)
|
|
else:
|
|
exported_program = export(f, args, kwargs, strict=strict)
|
|
return exported_program
|
|
|
|
exported_program = export_wrapper(f, args, kwargs, strict, pre_dispatch)
|
|
reversed_kwargs = {key: kwargs[key] for key in reversed(kwargs)}
|
|
unlifted = exported_program.module()
|
|
exp = f(*args, **kwargs)
|
|
self.assertEqual(unlifted(*args, **kwargs), exp)
|
|
self.assertEqual(
|
|
unlifted(*args, **reversed_kwargs),
|
|
exp,
|
|
)
|
|
|
|
# check re-tracing
|
|
retraced_ep = export_wrapper(unlifted, args, kwargs, strict, pre_dispatch)
|
|
self.assertEqual(retraced_ep.module()(*args, **kwargs), exp)
|
|
return exported_program
|
|
|
|
@parametrize("pre_dispatch", [True, False])
|
|
def test_none(self, pre_dispatch):
|
|
class MyModule(torch.nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.attr = torch.classes._TorchScriptTesting._Foo(10, 20)
|
|
|
|
def forward(self, x, n):
|
|
return x + self.attr.add_tensor(x)
|
|
|
|
ep = self._test_export_same_as_eager(
|
|
MyModule(),
|
|
(torch.ones(2, 3), None),
|
|
strict=False,
|
|
pre_dispatch=pre_dispatch,
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.module().code.strip(),
|
|
"""\
|
|
def forward(self, arg_0, arg_1):
|
|
arg0_1, arg1_1, = fx_pytree.tree_flatten_spec(([arg_0, arg_1], {}), self._in_spec)
|
|
attr_1 = self.attr
|
|
call_torchbind = torch.ops.higher_order.call_torchbind(attr_1, 'add_tensor', arg0_1); attr_1 = None
|
|
add = torch.ops.aten.add.Tensor(arg0_1, call_torchbind); arg0_1 = call_torchbind = None
|
|
return pytree.tree_unflatten((add,), self._out_spec)""",
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.graph_module.code.strip(),
|
|
"""\
|
|
def forward(self, attr, arg0_1, arg1_1):
|
|
call_torchbind = torch.ops.higher_order.call_torchbind(attr, 'add_tensor', arg0_1); attr = None
|
|
add = torch.ops.aten.add.Tensor(arg0_1, call_torchbind); arg0_1 = call_torchbind = None
|
|
return (add,)""",
|
|
)
|
|
|
|
@parametrize("pre_dispatch", [True, False])
|
|
def test_attribute(self, pre_dispatch):
|
|
class MyModule(torch.nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.attr = torch.classes._TorchScriptTesting._Foo(10, 20)
|
|
|
|
def forward(self, x):
|
|
return x + self.attr.add_tensor(x)
|
|
|
|
ep = self._test_export_same_as_eager(
|
|
MyModule(), (torch.ones(2, 3),), strict=False, pre_dispatch=pre_dispatch
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.module().code.strip(),
|
|
"""\
|
|
def forward(self, arg_0):
|
|
arg0_1, = fx_pytree.tree_flatten_spec(([arg_0], {}), self._in_spec)
|
|
attr_1 = self.attr
|
|
call_torchbind = torch.ops.higher_order.call_torchbind(attr_1, 'add_tensor', arg0_1); attr_1 = None
|
|
add = torch.ops.aten.add.Tensor(arg0_1, call_torchbind); arg0_1 = call_torchbind = None
|
|
return pytree.tree_unflatten((add,), self._out_spec)""",
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.graph_module.code.strip(),
|
|
"""\
|
|
def forward(self, attr, arg0_1):
|
|
call_torchbind = torch.ops.higher_order.call_torchbind(attr, 'add_tensor', arg0_1); attr = None
|
|
add = torch.ops.aten.add.Tensor(arg0_1, call_torchbind); arg0_1 = call_torchbind = None
|
|
return (add,)""",
|
|
)
|
|
|
|
@parametrize("pre_dispatch", [True, False])
|
|
def test_attribute_as_custom_op_argument(self, pre_dispatch):
|
|
class MyModule(torch.nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.attr = torch.classes._TorchScriptTesting._Foo(10, 20)
|
|
|
|
def forward(self, x):
|
|
return x + torch.ops._TorchScriptTesting.takes_foo(self.attr, x)
|
|
|
|
ep = self._test_export_same_as_eager(
|
|
MyModule(), (torch.ones(2, 3),), strict=False, pre_dispatch=pre_dispatch
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.module().code.strip(),
|
|
"""\
|
|
def forward(self, arg_0):
|
|
arg1_1, = fx_pytree.tree_flatten_spec(([arg_0], {}), self._in_spec)
|
|
attr_1 = self.attr
|
|
takes_foo_default = torch.ops._TorchScriptTesting.takes_foo.default(attr_1, arg1_1); attr_1 = None
|
|
add = torch.ops.aten.add.Tensor(arg1_1, takes_foo_default); arg1_1 = takes_foo_default = None
|
|
return pytree.tree_unflatten((add,), self._out_spec)""",
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.graph_module.code.strip(),
|
|
"""\
|
|
def forward(self, arg0_1, attr, arg1_1):
|
|
with_effects = torch._higher_order_ops.effects.with_effects(arg0_1, torch.ops._TorchScriptTesting.takes_foo.default, attr, arg1_1); arg0_1 = attr = None
|
|
getitem = with_effects[0]
|
|
getitem_1 = with_effects[1]; with_effects = None
|
|
add = torch.ops.aten.add.Tensor(arg1_1, getitem_1); arg1_1 = getitem_1 = None
|
|
return (getitem, add)""", # noqa: B950
|
|
)
|
|
|
|
@parametrize("pre_dispatch", [True, False])
|
|
def test_input(self, pre_dispatch):
|
|
class MyModule(torch.nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def forward(self, x, cc):
|
|
return x + cc.add_tensor(x)
|
|
|
|
cc = torch.classes._TorchScriptTesting._Foo(10, 20)
|
|
ep = self._test_export_same_as_eager(
|
|
MyModule(), (torch.ones(2, 3), cc), strict=False, pre_dispatch=pre_dispatch
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.module().code.strip(),
|
|
"""\
|
|
def forward(self, arg_0, arg_1):
|
|
arg0_1, arg1_1, = fx_pytree.tree_flatten_spec(([arg_0, arg_1], {}), self._in_spec)
|
|
call_torchbind = torch.ops.higher_order.call_torchbind(arg1_1, 'add_tensor', arg0_1); arg1_1 = None
|
|
add = torch.ops.aten.add.Tensor(arg0_1, call_torchbind); arg0_1 = call_torchbind = None
|
|
return pytree.tree_unflatten((add,), self._out_spec)""",
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.graph_module.code.strip(),
|
|
"""\
|
|
def forward(self, arg0_1, arg1_1):
|
|
call_torchbind = torch.ops.higher_order.call_torchbind(arg1_1, 'add_tensor', arg0_1); arg1_1 = None
|
|
add = torch.ops.aten.add.Tensor(arg0_1, call_torchbind); arg0_1 = call_torchbind = None
|
|
return (add,)""",
|
|
)
|
|
|
|
@parametrize("pre_dispatch", [True, False])
|
|
def test_input_as_custom_op_argument(self, pre_dispatch):
|
|
class MyModule(torch.nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def forward(self, x, cc):
|
|
return x + torch.ops._TorchScriptTesting.takes_foo(cc, x)
|
|
|
|
cc = torch.classes._TorchScriptTesting._Foo(10, 20)
|
|
ep = self._test_export_same_as_eager(
|
|
MyModule(), (torch.ones(2, 3), cc), strict=False, pre_dispatch=pre_dispatch
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.module().code.strip(),
|
|
"""\
|
|
def forward(self, arg_0, arg_1):
|
|
arg1_1, arg2_1, = fx_pytree.tree_flatten_spec(([arg_0, arg_1], {}), self._in_spec)
|
|
takes_foo_default = torch.ops._TorchScriptTesting.takes_foo.default(arg2_1, arg1_1); arg2_1 = None
|
|
add = torch.ops.aten.add.Tensor(arg1_1, takes_foo_default); arg1_1 = takes_foo_default = None
|
|
return pytree.tree_unflatten((add,), self._out_spec)""",
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.graph_module.code.strip(),
|
|
"""\
|
|
def forward(self, arg0_1, arg1_1, arg2_1):
|
|
with_effects = torch._higher_order_ops.effects.with_effects(arg0_1, torch.ops._TorchScriptTesting.takes_foo.default, arg2_1, arg1_1); arg0_1 = arg2_1 = None
|
|
getitem = with_effects[0]
|
|
getitem_1 = with_effects[1]; with_effects = None
|
|
add = torch.ops.aten.add.Tensor(arg1_1, getitem_1); arg1_1 = getitem_1 = None
|
|
return (getitem, add)""", # noqa: B950
|
|
)
|
|
|
|
@parametrize("pre_dispatch", [True, False])
|
|
def test_unlift_custom_obj(self, pre_dispatch):
|
|
class MyModule(torch.nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.attr = torch.classes._TorchScriptTesting._Foo(10, 20)
|
|
|
|
def forward(self, x):
|
|
a = torch.ops._TorchScriptTesting.takes_foo(self.attr, x)
|
|
b = torch.ops._TorchScriptTesting.takes_foo(self.attr, a)
|
|
return x + b
|
|
|
|
input = torch.ones(2, 3)
|
|
ep = self._test_export_same_as_eager(
|
|
MyModule(), (input,), strict=False, pre_dispatch=pre_dispatch
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.module().code.strip(),
|
|
"""\
|
|
def forward(self, arg_0):
|
|
arg1_1, = fx_pytree.tree_flatten_spec(([arg_0], {}), self._in_spec)
|
|
attr_1 = self.attr
|
|
takes_foo_default_1 = torch.ops._TorchScriptTesting.takes_foo.default(attr_1, arg1_1)
|
|
takes_foo_default = torch.ops._TorchScriptTesting.takes_foo.default(attr_1, takes_foo_default_1); attr_1 = takes_foo_default_1 = None
|
|
add = torch.ops.aten.add.Tensor(arg1_1, takes_foo_default); arg1_1 = takes_foo_default = None
|
|
return pytree.tree_unflatten((add,), self._out_spec)""", # noqa: B950
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.graph_module.code.strip(),
|
|
"""\
|
|
def forward(self, arg0_1, attr, arg1_1):
|
|
with_effects = torch._higher_order_ops.effects.with_effects(arg0_1, torch.ops._TorchScriptTesting.takes_foo.default, attr, arg1_1); arg0_1 = None
|
|
getitem = with_effects[0]
|
|
getitem_1 = with_effects[1]; with_effects = None
|
|
with_effects_1 = torch._higher_order_ops.effects.with_effects(getitem, torch.ops._TorchScriptTesting.takes_foo.default, attr, getitem_1); getitem = attr = getitem_1 = None
|
|
getitem_2 = with_effects_1[0]
|
|
getitem_3 = with_effects_1[1]; with_effects_1 = None
|
|
add = torch.ops.aten.add.Tensor(arg1_1, getitem_3); arg1_1 = getitem_3 = None
|
|
return (getitem_2, add)""", # noqa: B950
|
|
)
|
|
|
|
@parametrize("pre_dispatch", [True, False])
|
|
def test_custom_obj_list_out(self, pre_dispatch):
|
|
class MyModule(torch.nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.attr = torch.classes._TorchScriptTesting._Foo(10, 20)
|
|
|
|
def forward(self, x):
|
|
a = torch.ops._TorchScriptTesting.takes_foo_list_return(self.attr, x)
|
|
y = a[0] + a[1] + a[2]
|
|
b = torch.ops._TorchScriptTesting.takes_foo(self.attr, y)
|
|
return x + b
|
|
|
|
input = torch.ones(2, 3)
|
|
ep = self._test_export_same_as_eager(
|
|
MyModule(), (input,), strict=False, pre_dispatch=pre_dispatch
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.module().code.strip(),
|
|
"""\
|
|
def forward(self, arg_0):
|
|
arg1_1, = fx_pytree.tree_flatten_spec(([arg_0], {}), self._in_spec)
|
|
attr_1 = self.attr
|
|
takes_foo_list_return_default = torch.ops._TorchScriptTesting.takes_foo_list_return.default(attr_1, arg1_1)
|
|
getitem_2 = takes_foo_list_return_default[0]
|
|
getitem_3 = takes_foo_list_return_default[1]
|
|
getitem_4 = takes_foo_list_return_default[2]; takes_foo_list_return_default = None
|
|
add = torch.ops.aten.add.Tensor(getitem_2, getitem_3); getitem_2 = getitem_3 = None
|
|
add_1 = torch.ops.aten.add.Tensor(add, getitem_4); add = getitem_4 = None
|
|
takes_foo_default = torch.ops._TorchScriptTesting.takes_foo.default(attr_1, add_1); attr_1 = add_1 = None
|
|
add_2 = torch.ops.aten.add.Tensor(arg1_1, takes_foo_default); arg1_1 = takes_foo_default = None
|
|
return pytree.tree_unflatten((add_2,), self._out_spec)""",
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.graph_module.code.strip(),
|
|
"""\
|
|
def forward(self, arg0_1, attr, arg1_1):
|
|
with_effects = torch._higher_order_ops.effects.with_effects(arg0_1, torch.ops._TorchScriptTesting.takes_foo_list_return.default, attr, arg1_1); arg0_1 = None
|
|
getitem = with_effects[0]
|
|
getitem_1 = with_effects[1]; with_effects = None
|
|
getitem_2 = getitem_1[0]
|
|
getitem_3 = getitem_1[1]
|
|
getitem_4 = getitem_1[2]; getitem_1 = None
|
|
add = torch.ops.aten.add.Tensor(getitem_2, getitem_3); getitem_2 = getitem_3 = None
|
|
add_1 = torch.ops.aten.add.Tensor(add, getitem_4); add = getitem_4 = None
|
|
with_effects_1 = torch._higher_order_ops.effects.with_effects(getitem, torch.ops._TorchScriptTesting.takes_foo.default, attr, add_1); getitem = attr = add_1 = None
|
|
getitem_5 = with_effects_1[0]
|
|
getitem_6 = with_effects_1[1]; with_effects_1 = None
|
|
add_2 = torch.ops.aten.add.Tensor(arg1_1, getitem_6); arg1_1 = getitem_6 = None
|
|
return (getitem_5, add_2)""", # noqa: B950
|
|
)
|
|
|
|
@parametrize("pre_dispatch", [True, False])
|
|
def test_custom_obj_tuple_out(self, pre_dispatch):
|
|
class MyModule(torch.nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.attr = torch.classes._TorchScriptTesting._Foo(10, 20)
|
|
|
|
def forward(self, x):
|
|
a = torch.ops._TorchScriptTesting.takes_foo_tuple_return(self.attr, x)
|
|
y = a[0] + a[1]
|
|
b = torch.ops._TorchScriptTesting.takes_foo(self.attr, y)
|
|
return x + b
|
|
|
|
input = torch.ones(2, 3)
|
|
ep = self._test_export_same_as_eager(
|
|
MyModule(), (input,), strict=False, pre_dispatch=pre_dispatch
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.module().code.strip(),
|
|
"""\
|
|
def forward(self, arg_0):
|
|
arg1_1, = fx_pytree.tree_flatten_spec(([arg_0], {}), self._in_spec)
|
|
attr_1 = self.attr
|
|
takes_foo_tuple_return_default = torch.ops._TorchScriptTesting.takes_foo_tuple_return.default(attr_1, arg1_1)
|
|
getitem_1 = takes_foo_tuple_return_default[0]
|
|
getitem_2 = takes_foo_tuple_return_default[1]; takes_foo_tuple_return_default = None
|
|
add = torch.ops.aten.add.Tensor(getitem_1, getitem_2); getitem_1 = getitem_2 = None
|
|
takes_foo_default = torch.ops._TorchScriptTesting.takes_foo.default(attr_1, add); attr_1 = add = None
|
|
add_1 = torch.ops.aten.add.Tensor(arg1_1, takes_foo_default); arg1_1 = takes_foo_default = None
|
|
return pytree.tree_unflatten((add_1,), self._out_spec)""",
|
|
)
|
|
self.assertExpectedInline(
|
|
ep.graph_module.code.strip(),
|
|
"""\
|
|
def forward(self, arg0_1, attr, arg1_1):
|
|
with_effects = torch._higher_order_ops.effects.with_effects(arg0_1, torch.ops._TorchScriptTesting.takes_foo_tuple_return.default, attr, arg1_1); arg0_1 = None
|
|
getitem = with_effects[0]
|
|
getitem_1 = with_effects[1]
|
|
getitem_2 = with_effects[2]; with_effects = None
|
|
add = torch.ops.aten.add.Tensor(getitem_1, getitem_2); getitem_1 = getitem_2 = None
|
|
with_effects_1 = torch._higher_order_ops.effects.with_effects(getitem, torch.ops._TorchScriptTesting.takes_foo.default, attr, add); getitem = attr = add = None
|
|
getitem_3 = with_effects_1[0]
|
|
getitem_4 = with_effects_1[1]; with_effects_1 = None
|
|
add_1 = torch.ops.aten.add.Tensor(arg1_1, getitem_4); arg1_1 = getitem_4 = None
|
|
return (getitem_3, add_1)""", # noqa: B950
|
|
)
|
|
|
|
@parametrize("make_fx_tracing_mode", ["fake", "symbolic"])
|
|
def test_make_fx_tensor_queue_methods(self, make_fx_tracing_mode):
|
|
test = self
|
|
|
|
class Model(torch.nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.linear = torch.nn.Linear(3, 2)
|
|
self.check_tq_is_fake = True
|
|
|
|
def forward(self, tq, x):
|
|
if self.check_tq_is_fake:
|
|
test.assertTrue(isinstance(tq, FakeScriptObject))
|
|
tq.push(x.cos())
|
|
tq.push(x.sin())
|
|
x_cos = tq.pop() + tq.size()
|
|
x_sin = tq.pop() - tq.size()
|
|
return x_sin, x_cos, tq
|
|
|
|
mod = Model()
|
|
tq = torch.classes._TorchScriptTesting._TensorQueue(
|
|
torch.empty(
|
|
0,
|
|
).fill_(-1)
|
|
)
|
|
tq1 = torch.classes._TorchScriptTesting._TensorQueue(
|
|
torch.empty(
|
|
0,
|
|
).fill_(-1)
|
|
)
|
|
x = torch.ones(2, 3)
|
|
gm = make_fx(mod, tracing_mode=make_fx_tracing_mode)(tq, x)
|
|
self.assertEqual(self.tq_push_counter, 2)
|
|
self.assertEqual(self.tq_pop_counter, 2)
|
|
self.assertEqual(self.tq_size_counter, 2)
|
|
self.assertEqual(tq.size(), 0)
|
|
self.assertExpectedInline(
|
|
gm.code.strip("\n"),
|
|
"""\
|
|
def forward(self, arg0_1, arg1_1):
|
|
cos = torch.ops.aten.cos.default(arg1_1)
|
|
call_torchbind = torch.ops.higher_order.call_torchbind(arg0_1, 'push', cos); cos = None
|
|
sin = torch.ops.aten.sin.default(arg1_1); arg1_1 = None
|
|
call_torchbind_1 = torch.ops.higher_order.call_torchbind(arg0_1, 'push', sin); sin = None
|
|
call_torchbind_2 = torch.ops.higher_order.call_torchbind(arg0_1, 'pop')
|
|
call_torchbind_3 = torch.ops.higher_order.call_torchbind(arg0_1, 'size')
|
|
add = torch.ops.aten.add.Tensor(call_torchbind_2, 1); call_torchbind_2 = None
|
|
call_torchbind_4 = torch.ops.higher_order.call_torchbind(arg0_1, 'pop')
|
|
call_torchbind_5 = torch.ops.higher_order.call_torchbind(arg0_1, 'size')
|
|
sub = torch.ops.aten.sub.Tensor(call_torchbind_4, 0); call_torchbind_4 = None
|
|
return (sub, add, arg0_1)
|
|
""",
|
|
)
|
|
mod.check_tq_is_fake = False
|
|
self._assertEqualSkipScriptObject(gm(tq, x), mod(tq1, x))
|
|
|
|
@parametrize("make_fx_tracing_mode", ["fake", "symbolic"])
|
|
def test_make_fx_tensor_queue_methods_fakify_internal_states(
|
|
self, make_fx_tracing_mode
|
|
):
|
|
test = self
|
|
|
|
class Model(torch.nn.Module):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.linear = torch.nn.Linear(3, 2)
|
|
self.check_tq_is_fake = True
|
|
self.current_test = test
|
|
|
|
def forward(self, tq, x):
|
|
if self.check_tq_is_fake:
|
|
self.current_test.assertTrue(isinstance(tq, FakeScriptObject))
|
|
x_cos = tq.pop() + tq.size() + x
|
|
x_sin = tq.pop() - tq.size() + x
|
|
return x_sin, x_cos, tq
|
|
|
|
mod = Model()
|
|
tq = torch.classes._TorchScriptTesting._TensorQueue(
|
|
torch.empty(
|
|
0,
|
|
).fill_(-1)
|
|
)
|
|
tq1 = torch.classes._TorchScriptTesting._TensorQueue(
|
|
torch.empty(
|
|
0,
|
|
).fill_(-1)
|
|
)
|
|
for _ in range(2):
|
|
tq.push(torch.ones(2, 3))
|
|
tq1.push(torch.ones(2, 3))
|
|
x = torch.ones(2, 3)
|
|
prev_size = tq.size()
|
|
gm = make_fx(mod, tracing_mode=make_fx_tracing_mode)(tq, x)
|
|
self.assertEqual(self.tq_push_counter, 0)
|
|
self.assertEqual(self.tq_pop_counter, 2)
|
|
self.assertEqual(self.tq_size_counter, 2)
|
|
self.assertEqual(tq.size(), prev_size)
|
|
self.assertExpectedInline(
|
|
gm.code.strip("\n"),
|
|
"""\
|
|
def forward(self, arg0_1, arg1_1):
|
|
call_torchbind = torch.ops.higher_order.call_torchbind(arg0_1, 'pop')
|
|
call_torchbind_1 = torch.ops.higher_order.call_torchbind(arg0_1, 'size')
|
|
add = torch.ops.aten.add.Tensor(call_torchbind, 1); call_torchbind = None
|
|
add_1 = torch.ops.aten.add.Tensor(add, arg1_1); add = None
|
|
call_torchbind_2 = torch.ops.higher_order.call_torchbind(arg0_1, 'pop')
|
|
call_torchbind_3 = torch.ops.higher_order.call_torchbind(arg0_1, 'size')
|
|
sub = torch.ops.aten.sub.Tensor(call_torchbind_2, 0); call_torchbind_2 = None
|
|
add_2 = torch.ops.aten.add.Tensor(sub, arg1_1); sub = arg1_1 = None
|
|
return (add_2, add_1, arg0_1)
|
|
""",
|
|
)
|
|
# turn off tq type checking in eager execution
|
|
mod.check_tq_is_fake = False
|
|
self._assertEqualSkipScriptObject(gm(tq, x), mod(tq1, x))
|
|
self.assertEqual(tq.size(), 0)
|
|
self.assertEqual(tq1.size(), 0)
|
|
|
|
|
|
@skipIfTorchDynamo("torchbind not supported with dynamo yet")
|
|
class TestRegisterFakeClass(TestCase):
|
|
def setUp(self):
|
|
load_torchbind_test_lib()
|
|
|
|
def tearDown(self):
|
|
torch._library.fake_class_registry.global_fake_class_registry.clear()
|
|
|
|
def test_register_fake_class_no_torch_bind_class(self):
|
|
with self.assertRaisesRegex(RuntimeError, "Tried to instantiate class"):
|
|
|
|
@torch._library.register_fake_class("_TorchScriptTesting::NOT_A_VALID_NAME")
|
|
class Invalid:
|
|
pass
|
|
|
|
def test_register_fake_class_no_from_real(self):
|
|
with self.assertRaisesRegex(RuntimeError, "define a classmethod from_real"):
|
|
|
|
@torch._library.register_fake_class("_TorchScriptTesting::_Foo")
|
|
class InvalidFakeFoo:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def test_register_fake_class_from_real_not_classmethod(self):
|
|
with self.assertRaisesRegex(RuntimeError, "is not a classmethod"):
|
|
|
|
@torch._library.register_fake_class("_TorchScriptTesting::_Foo")
|
|
class FakeFoo:
|
|
def __init__(self, x, y):
|
|
self.x = x
|
|
self.y = y
|
|
|
|
def from_real(self, foo_obj):
|
|
x, y = foo_obj.__getstate__()
|
|
return FakeFoo(x, y)
|
|
|
|
def test_register_fake_class_valid(self):
|
|
class FakeFoo:
|
|
def __init__(self, x, y):
|
|
self.x = x
|
|
self.y = y
|
|
|
|
@classmethod
|
|
def from_real(cls, foo_obj):
|
|
x, y = foo_obj.__getstate__()
|
|
return cls(x, y)
|
|
|
|
torch._library.register_fake_class("_TorchScriptTesting::_Foo", FakeFoo)
|
|
|
|
def test_register_fake_class_duplicate_registration(self):
|
|
@torch._library.register_fake_class("_TorchScriptTesting::_Foo")
|
|
class FakeFoo:
|
|
def __init__(self, x, y):
|
|
self.x = x
|
|
self.y = y
|
|
|
|
@classmethod
|
|
def from_real(cls, foo_obj):
|
|
x, y = foo_obj.__getstate__()
|
|
return cls(x, y)
|
|
|
|
with self.assertWarnsRegex(UserWarning, "already registered"):
|
|
torch._library.register_fake_class("_TorchScriptTesting::_Foo", FakeFoo)
|
|
|
|
|
|
instantiate_parametrized_tests(TestExportTorchbind)
|
|
|
|
if __name__ == "__main__":
|
|
run_tests()
|