diff --git a/.lintrunner.toml b/.lintrunner.toml index c3f7725f18c..dacb259c5f1 100644 --- a/.lintrunner.toml +++ b/.lintrunner.toml @@ -2563,6 +2563,7 @@ exclude_patterns = [ 'torch/utils/viz/__init__.py', 'torch/utils/viz/_cycles.py', 'torch/utils/weak.py', + 'torch/xpu/_gpu_trace.py', ] init_command = [ 'python3', diff --git a/aten/src/ATen/xpu/XPUEvent.h b/aten/src/ATen/xpu/XPUEvent.h index f82c676daf8..2417ee5f6b7 100644 --- a/aten/src/ATen/xpu/XPUEvent.h +++ b/aten/src/ATen/xpu/XPUEvent.h @@ -22,7 +22,15 @@ struct TORCH_XPU_API XPUEvent { XPUEvent(bool enable_timing = false) noexcept : enable_timing_{enable_timing} {} - ~XPUEvent() = default; + ~XPUEvent() { + if (isCreated()) { + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); + if (C10_UNLIKELY(interp)) { + (*interp)->trace_gpu_event_deletion( + at::kXPU, reinterpret_cast(event_.get())); + } + } + } XPUEvent(const XPUEvent&) = delete; XPUEvent& operator=(const XPUEvent&) = delete; @@ -77,6 +85,13 @@ struct TORCH_XPU_API XPUEvent { void record(const XPUStream& stream) { if (!isCreated()) { device_index_ = stream.device_index(); + event_ = std::make_unique( + stream.queue().ext_oneapi_submit_barrier()); + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); + if (C10_UNLIKELY(interp)) { + (*interp)->trace_gpu_event_creation( + at::kXPU, reinterpret_cast(event_.get())); + } } else { TORCH_CHECK( device_index_ == stream.device_index(), @@ -86,9 +101,16 @@ struct TORCH_XPU_API XPUEvent { stream.device_index(), "."); event_.reset(); + event_ = std::make_unique( + stream.queue().ext_oneapi_submit_barrier()); + } + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); + if (C10_UNLIKELY(interp)) { + (*interp)->trace_gpu_event_record( + at::kXPU, + reinterpret_cast(event_.get()), + reinterpret_cast(&stream.queue())); } - event_ = std::make_unique( - stream.queue().ext_oneapi_submit_barrier()); } void block(const XPUStream& stream) { @@ -96,6 +118,13 @@ struct TORCH_XPU_API XPUEvent { std::vector event_list{event()}; // Make this stream wait until event_ is completed. stream.queue().ext_oneapi_submit_barrier(event_list); + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); + if (C10_UNLIKELY(interp)) { + (*interp)->trace_gpu_event_wait( + at::kXPU, + reinterpret_cast(event_.get()), + reinterpret_cast(&stream.queue())); + } } } @@ -117,6 +146,11 @@ struct TORCH_XPU_API XPUEvent { void synchronize() const { if (isCreated()) { + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); + if (C10_UNLIKELY(interp)) { + (*interp)->trace_gpu_event_synchronization( + at::kXPU, reinterpret_cast(event_.get())); + } event().wait_and_throw(); } } diff --git a/c10/xpu/XPUCachingAllocator.cpp b/c10/xpu/XPUCachingAllocator.cpp index 3b5c4b58593..da57191fa16 100644 --- a/c10/xpu/XPUCachingAllocator.cpp +++ b/c10/xpu/XPUCachingAllocator.cpp @@ -467,6 +467,11 @@ class XPUAllocator : public Allocator { Block* block = device_allocators[device]->malloc(device, size, queue); add_allocated_block(block); *devPtr = block->ptr; + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); + if (C10_UNLIKELY(interp)) { + (*interp)->trace_gpu_memory_allocation( + c10::kXPU, reinterpret_cast(*devPtr)); + } } void free(void* ptr) { @@ -476,6 +481,11 @@ class XPUAllocator : public Allocator { Block* block = get_allocated_block(ptr, /* remove */ true); TORCH_CHECK(block, "invalid device pointer: ", ptr); device_allocators[block->device]->free(block); + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); + if (C10_UNLIKELY(interp)) { + (*interp)->trace_gpu_memory_deallocation( + c10::kXPU, reinterpret_cast(block->ptr)); + } } void emptyCache() { diff --git a/c10/xpu/XPUStream.cpp b/c10/xpu/XPUStream.cpp index 15702bd14e2..abf380f17b4 100644 --- a/c10/xpu/XPUStream.cpp +++ b/c10/xpu/XPUStream.cpp @@ -103,11 +103,17 @@ void initDeviceStreamState(DeviceIndex device) { {sycl::property::queue::in_order(), queue::priority_high()}}; for (const auto p : c10::irange(max_compile_time_stream_priorities)) { for (const auto i : c10::irange(kStreamsPerPool)) { - streams[device][p][i] = std::make_unique(sycl::queue( + auto& stream = streams[device][p][i]; + stream = std::make_unique(sycl::queue( c10::xpu::get_device_context(), c10::xpu::get_raw_device(device), c10::xpu::asyncHandler, properties[p])); + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); + if (C10_UNLIKELY(interp)) { + (*interp)->trace_gpu_stream_creation( + c10::kXPU, reinterpret_cast(stream.get())); + } } priority_counters[device][p] = 0; } @@ -280,6 +286,10 @@ void syncStreamsOnDevice(DeviceIndex device) { streams[device][p][i]->wait(); } } + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); + if (C10_UNLIKELY(interp)) { + (*interp)->trace_gpu_device_synchronization(c10::kXPU); + } } } // namespace c10::xpu diff --git a/c10/xpu/XPUStream.h b/c10/xpu/XPUStream.h index f135219dad5..9aa0838826a 100644 --- a/c10/xpu/XPUStream.h +++ b/c10/xpu/XPUStream.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include namespace c10::xpu { @@ -96,6 +97,11 @@ class C10_XPU_API XPUStream { /// stream. void synchronize() const { queue().wait_and_throw(); + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); + if (C10_UNLIKELY(interp)) { + (*interp)->trace_gpu_stream_synchronization( + c10::kXPU, reinterpret_cast(&queue())); + } } /// Return the priority that this stream is associated with. Lower numbers diff --git a/c10/xpu/impl/XPUGuardImpl.h b/c10/xpu/impl/XPUGuardImpl.h index 6647e259177..5a71a41c0e0 100644 --- a/c10/xpu/impl/XPUGuardImpl.h +++ b/c10/xpu/impl/XPUGuardImpl.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -84,6 +85,13 @@ struct XPUGuardImpl final : public c10::impl::DeviceGuardImplInterface { auto* xpu_event = reinterpret_cast(*event); const XPUStream xpu_stream{stream}; *xpu_event = xpu_stream.queue().ext_oneapi_submit_barrier(); + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); + if (C10_UNLIKELY(interp)) { + (*interp)->trace_gpu_event_record( + c10::kXPU, + reinterpret_cast(xpu_event), + reinterpret_cast(&xpu_stream.queue())); + } } void block(void* event, const Stream& stream) const override { @@ -93,6 +101,13 @@ struct XPUGuardImpl final : public c10::impl::DeviceGuardImplInterface { std::vector event_list{*xpu_event}; const XPUStream xpu_stream(stream); xpu_stream.queue().ext_oneapi_submit_barrier(event_list); + const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace(); + if (C10_UNLIKELY(interp)) { + (*interp)->trace_gpu_event_wait( + c10::kXPU, + reinterpret_cast(xpu_event), + reinterpret_cast(&xpu_stream.queue())); + } } bool queryEvent(void* event) const override { diff --git a/test/test_xpu.py b/test/test_xpu.py index 88d27851cdd..7e616b39b9d 100644 --- a/test/test_xpu.py +++ b/test/test_xpu.py @@ -4,6 +4,7 @@ import sys import unittest import torch +import torch.xpu._gpu_trace as gpu_trace from torch.testing._internal.common_device_type import ( instantiate_device_type_tests, onlyXPU, @@ -239,5 +240,67 @@ if __name__ == "__main__": instantiate_device_type_tests(TestXpu, globals(), only_for="xpu") +class TestXpuTrace(TestCase): + def setUp(self): + torch._C._activate_gpu_trace() + self.mock = unittest.mock.MagicMock() + + def test_event_creation_callback(self): + gpu_trace.register_callback_for_event_creation(self.mock) + + event = torch.xpu.Event() + event.record() + self.mock.assert_called_once_with(event._as_parameter_.value) + + def test_event_deletion_callback(self): + gpu_trace.register_callback_for_event_deletion(self.mock) + + event = torch.xpu.Event() + event.record() + event_id = event._as_parameter_.value + del event + self.mock.assert_called_once_with(event_id) + + def test_event_record_callback(self): + gpu_trace.register_callback_for_event_record(self.mock) + + event = torch.xpu.Event() + event.record() + self.mock.assert_called_once_with( + event._as_parameter_.value, torch.xpu.current_stream().sycl_queue + ) + + def test_event_wait_callback(self): + gpu_trace.register_callback_for_event_wait(self.mock) + + event = torch.xpu.Event() + event.record() + event.wait() + self.mock.assert_called_once_with( + event._as_parameter_.value, torch.xpu.current_stream().sycl_queue + ) + + def test_device_synchronization_callback(self): + gpu_trace.register_callback_for_device_synchronization(self.mock) + + torch.xpu.synchronize() + self.mock.assert_called() + + def test_stream_synchronization_callback(self): + gpu_trace.register_callback_for_stream_synchronization(self.mock) + + stream = torch.xpu.Stream() + stream.synchronize() + self.mock.assert_called_once_with(stream.sycl_queue) + + def test_event_synchronization_callback(self): + gpu_trace.register_callback_for_event_synchronization(self.mock) + + event = torch.xpu.Event() + event.record() + event.synchronize() + self.mock.assert_called_once_with(event._as_parameter_.value) + + if __name__ == "__main__": run_tests() diff --git a/torch/xpu/_gpu_trace.py b/torch/xpu/_gpu_trace.py new file mode 100644 index 00000000000..0407abbf249 --- /dev/null +++ b/torch/xpu/_gpu_trace.py @@ -0,0 +1,75 @@ +from typing import Callable + +from torch._utils import CallbackRegistry + + +EventCreationCallbacks: "CallbackRegistry[int]" = CallbackRegistry( + "XPU event creation" +) +EventDeletionCallbacks: "CallbackRegistry[int]" = CallbackRegistry( + "XPU event deletion" +) +EventRecordCallbacks: "CallbackRegistry[int, int]" = CallbackRegistry( + "XPU event record" +) +EventWaitCallbacks: "CallbackRegistry[int, int]" = CallbackRegistry( + "XPU event wait" +) +MemoryAllocationCallbacks: "CallbackRegistry[int]" = CallbackRegistry( + "XPU memory allocation" +) +MemoryDeallocationCallbacks: "CallbackRegistry[int]" = CallbackRegistry( + "XPU memory deallocation" +) +StreamCreationCallbacks: "CallbackRegistry[int]" = CallbackRegistry( + "XPU stream creation" +) +DeviceSynchronizationCallbacks: "CallbackRegistry[[]]" = CallbackRegistry( + "XPU device synchronization" +) +StreamSynchronizationCallbacks: "CallbackRegistry[int]" = CallbackRegistry( + "XPU stream synchronization" +) +EventSynchronizationCallbacks: "CallbackRegistry[int]" = CallbackRegistry( + "XPU event synchronization" +) + + +def register_callback_for_event_creation(cb: Callable[[int], None]) -> None: + EventCreationCallbacks.add_callback(cb) + + +def register_callback_for_event_deletion(cb: Callable[[int], None]) -> None: + EventDeletionCallbacks.add_callback(cb) + + +def register_callback_for_event_record(cb: Callable[[int, int], None]) -> None: + EventRecordCallbacks.add_callback(cb) + + +def register_callback_for_event_wait(cb: Callable[[int, int], None]) -> None: + EventWaitCallbacks.add_callback(cb) + + +def register_callback_for_memory_allocation(cb: Callable[[int], None]) -> None: + MemoryAllocationCallbacks.add_callback(cb) + + +def register_callback_for_memory_deallocation(cb: Callable[[int], None]) -> None: + MemoryDeallocationCallbacks.add_callback(cb) + + +def register_callback_for_stream_creation(cb: Callable[[int], None]) -> None: + StreamCreationCallbacks.add_callback(cb) + + +def register_callback_for_device_synchronization(cb: Callable[[], None]) -> None: + DeviceSynchronizationCallbacks.add_callback(cb) + + +def register_callback_for_stream_synchronization(cb: Callable[[int], None]) -> None: + StreamSynchronizationCallbacks.add_callback(cb) + + +def register_callback_for_event_synchronization(cb: Callable[[int], None]) -> None: + EventSynchronizationCallbacks.add_callback(cb)