pytorch/torch/multiprocessing/queue.py
Justin Chu 4cc1745b13 [BE] f-stringify torch/ and scripts (#105538)
This PR is a follow up on the pyupgrade series to convert more strings to use f-strings using `flynt`.

- https://docs.python.org/3/reference/lexical_analysis.html#f-strings
- https://pypi.org/project/flynt/

Command used:

```
flynt torch/ -ll 120
flynt scripts/ -ll 120
flynt tools/ -ll 120
```

and excluded `collect_env.py`

Pull Request resolved: https://github.com/pytorch/pytorch/pull/105538
Approved by: https://github.com/ezyang, https://github.com/malfet
2023-07-21 19:35:24 +00:00

46 lines
1.4 KiB
Python

import io
import multiprocessing.queues
from multiprocessing.reduction import ForkingPickler
import pickle
class ConnectionWrapper:
"""Proxy class for _multiprocessing.Connection which uses ForkingPickler to
serialize objects"""
def __init__(self, conn):
self.conn = conn
def send(self, obj):
buf = io.BytesIO()
ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
self.send_bytes(buf.getvalue())
def recv(self):
buf = self.recv_bytes()
return pickle.loads(buf)
def __getattr__(self, name):
if 'conn' in self.__dict__:
return getattr(self.conn, name)
raise AttributeError(f"'{type(self).__name__}' object has no attribute 'conn'")
class Queue(multiprocessing.queues.Queue):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._reader: ConnectionWrapper = ConnectionWrapper(self._reader)
self._writer: ConnectionWrapper = ConnectionWrapper(self._writer)
self._send = self._writer.send
self._recv = self._reader.recv
class SimpleQueue(multiprocessing.queues.SimpleQueue):
def _make_methods(self):
if not isinstance(self._reader, ConnectionWrapper):
self._reader: ConnectionWrapper = ConnectionWrapper(self._reader)
self._writer: ConnectionWrapper = ConnectionWrapper(self._writer)
super()._make_methods() # type: ignore[misc]