pytorch/caffe2/queue/rebatching_queue_ops.h
Jerry Zhang aebf3b47ae Remove template parameter from Tensor (#9939)
Summary:
Pull Request resolved: https://github.com/pytorch/pytorch/pull/9939

Pull Request resolved: https://github.com/facebookresearch/weakly-supervised-action-detection/pull/13

Pull Request resolved: https://github.com/pytorch/translate/pull/166

Pull Request resolved: https://github.com/pytorch/pytorch/pull/9125

Closes https://github.com/pytorch/pytorch/pull/9125

Use inheritance for polymorphism, and remove template parameter
This is to change the templating in call sites, the core implementations will change later

Before Caffe2 Tensor class was compile-time fixed to bind to a particular device/context. With this change, we're making it a runtime property (stored inside the tensor), but preserve the same semantics. For example, one has to specify device type in order to create a Tensor - there are no uninitialized tensors. More specifically the changes are:

1. We added an extra argument *DeviceType* to most of the constructors of the tensor, e.g. (Tensor(DeviceType type)),
2. Semantics of constructor Tensor(const Tensor<SrcContext>& src, ContextForCopy* context); is changed, in this constructor, the second context is passed in to enable us to call the templated Copy function, it could be in a different context as source and target previously, now we'll enforce that the context should have same device type as src, if it is provided.
3. To preserve 'get-or-construct' semantics of Blob, we added specialized getter Blob::GetMutableTensor that verifies both that Blob contains a Tensor and that it's of a correct type
4. Specifically, Tensor type is not default-constructible any more (as we don't have unknown device tensors) and thus some of the code handling STL containers needs to change

Note: Some changes are postponed just to keep this diff a bit smaller. Please see `TODO`s.

Reviewed By: ezyang, houseroad

Differential Revision: D9024330

fbshipit-source-id: e0b8295d2dc6ebe2963383ded5af799ad17164ba
2018-07-27 10:56:39 -07:00

84 lines
2.4 KiB
C++

#pragma once
#include "rebatching_queue.h"
namespace caffe2 {
using RebatchingQueuePtr = std::unique_ptr<RebatchingQueue>;
class CreateRebatchingQueueOp : public Operator<CPUContext> {
public:
CreateRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
: Operator(operator_def, ws) {}
bool RunOnDevice() override {
*OperatorBase::Output<RebatchingQueuePtr>(0) =
RebatchingQueuePtr(new RebatchingQueue(
OperatorBase::GetSingleArgument<int>("capacity", 1),
OperatorBase::GetSingleArgument<int>("num_blobs", 1)));
return true;
}
};
class EnqueueRebatchingQueueOp : public Operator<CPUContext> {
public:
EnqueueRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
: Operator(operator_def, ws),
enqueueBatch_(
OperatorBase::GetSingleArgument<bool>("enqueue_batch", false)) {}
bool RunOnDevice() override {
auto& queue = Inputs()[0]->template Get<RebatchingQueuePtr>();
CHECK(queue);
CAFFE_ENFORCE_EQ(InputSize(), queue->numBlobs() + 1);
std::vector<const Tensor*> inputTensors;
inputTensors.reserve(InputSize() - 1);
for (int i = 1; i < InputSize(); ++i) {
inputTensors.push_back(&Input(i));
}
return enqueueBatch_ ? queue->enqueueMany(context_, inputTensors)
: queue->enqueueOne(context_, inputTensors);
}
private:
const bool enqueueBatch_;
};
class DequeueRebatchingQueueOp : public Operator<CPUContext> {
public:
DequeueRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
: Operator(operator_def, ws),
numElements_(OperatorBase::GetSingleArgument<int>("num_elements", 1)) {}
bool RunOnDevice() override {
auto& queue = Inputs()[0]->template Get<RebatchingQueuePtr>();
CHECK(queue);
std::vector<Tensor*> outputTensors;
outputTensors.reserve(OutputSize());
for (int i = 0; i < OutputSize(); ++i) {
outputTensors.push_back(Output(i));
}
return queue->dequeue(context_, numElements_, outputTensors);
}
private:
int numElements_;
};
class CloseRebatchingQueueOp : public Operator<CPUContext> {
public:
CloseRebatchingQueueOp(const OperatorDef& operator_def, Workspace* ws)
: Operator(operator_def, ws) {}
bool RunOnDevice() override {
CAFFE_ENFORCE_EQ(InputSize(), 1);
auto& queue = Inputs()[0]->template Get<RebatchingQueuePtr>();
CAFFE_ENFORCE(queue);
queue->close();
return true;
}
};
} // caffe2