pytorch/caffe2/python/data_workers_test.py
Aapo Kyrola 449f8997ab close blobs queues when stopping + test
Summary:
Mysterious deadlocks after epoch has finished have occured randomly but quite frequently recently for myself, vigneshr and others. Looking at a stack trace of vigneshr's job (P57129798), I noticed a couple of threads were calling BlobsQueue.blockingWrite (or something like that). That call stucks when the caffe2/c++ side queue is at capacity (we use capacity of 4 with data workers). So in cases when this call was just being made while the script was to be terminated, the thread did not close and the whole process did not close either (not completely sure why that is since thread is a daemon thread, but this might be a flow-related issue since we run inside a flow container).

This is quite easy to fix: just call CloseBlobsQueue() when terminating the process. I modified coordinator.stop() and wait_for_finish() to return a status code based on whether threads that were joined actually closed within the 1.0sec timeout. This allowed creating an unit test to test for this issue. Before my change, the unit test failed.

Reviewed By: pietern

Differential Revision: D4619638

fbshipit-source-id: d96314ca783977517274fc7aadf8db4ee5636bdf
2017-02-27 10:07:57 -08:00

90 lines
2.6 KiB
Python

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import numpy as np
import unittest
import time
from caffe2.python import workspace, cnn
from caffe2.python import timeout_guard
import caffe2.python.data_workers as data_workers
def dummy_fetcher(fetcher_id, batch_size):
# Create random amount of values
n = np.random.randint(64) + 1
data = np.zeros((n, 3))
labels = []
for j in range(n):
data[j, :] *= (j + fetcher_id)
labels.append(data[j, 0])
return [np.array(data), np.array(labels)]
class DataWorkersTest(unittest.TestCase):
def testNonParallelModel(self):
model = cnn.CNNModelHelper(name="test")
coordinator = data_workers.init_data_input_workers(
model,
["data", "label"],
dummy_fetcher,
32,
2,
)
self.assertEqual(coordinator._fetcher_id_seq, 2)
coordinator.start()
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
for i in range(500):
with timeout_guard.CompleteInTimeOrDie(5):
workspace.RunNet(model.net.Proto().name)
data = workspace.FetchBlob("data")
labels = workspace.FetchBlob("label")
self.assertEqual(data.shape[0], labels.shape[0])
self.assertEqual(data.shape[0], 32)
for j in range(32):
self.assertEqual(labels[j], data[j, 0])
self.assertEqual(labels[j], data[j, 1])
self.assertEqual(labels[j], data[j, 2])
coordinator.stop()
def testGracefulShutdown(self):
model = cnn.CNNModelHelper(name="test")
coordinator = data_workers.init_data_input_workers(
model,
["data", "label"],
dummy_fetcher,
32,
2,
)
self.assertEqual(coordinator._fetcher_id_seq, 2)
coordinator.start()
workspace.RunNetOnce(model.param_init_net)
workspace.CreateNet(model.net)
while coordinator._coordinators[0]._inputs < 100:
time.sleep(0.01)
# Run a couple of rounds
workspace.RunNet(model.net.Proto().name)
workspace.RunNet(model.net.Proto().name)
# Wait for the enqueue thread to get blocked
time.sleep(0.2)
# We don't dequeue on caffe2 side (as we don't run the net)
# so the enqueue thread should be blocked.
# Let's now shutdown and see it succeeds.
self.assertTrue(coordinator.stop())