mirror of
https://github.com/zebrajr/tensorflow.git
synced 2025-12-07 12:20:24 +01:00
Split up session_test.py -> session_clusterspec_prop_test.py
session_test.py has gotten very large. Additionally, recently it has become flaky. In order to both (1) improve overall code health, and (2) to facilitate root-causing the test flakiness, this CL begins to split apart session_test into focused subsets. I've suffixed the scoping of the session_test in order to preserve filesystem sort-order grouping. PiperOrigin-RevId: 157658981
This commit is contained in:
parent
b09932d749
commit
787381ca52
|
|
@ -2975,6 +2975,30 @@ tf_cuda_library(
|
|||
# ],
|
||||
# )
|
||||
|
||||
py_test(
|
||||
name = "session_clusterspec_prop_test",
|
||||
size = "small",
|
||||
srcs = ["client/session_clusterspec_prop_test.py"],
|
||||
srcs_version = "PY2AND3",
|
||||
tags = [
|
||||
"no_gpu",
|
||||
],
|
||||
deps = [
|
||||
":array_ops",
|
||||
":client",
|
||||
":framework",
|
||||
":framework_for_generated_wrappers",
|
||||
":framework_test_lib",
|
||||
":math_ops",
|
||||
":platform_test",
|
||||
":state_ops",
|
||||
":training",
|
||||
":util",
|
||||
":variables",
|
||||
"//third_party/py/numpy",
|
||||
],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "session_list_devices_test",
|
||||
size = "small",
|
||||
|
|
|
|||
326
tensorflow/python/client/session_clusterspec_prop_test.py
Normal file
326
tensorflow/python/client/session_clusterspec_prop_test.py
Normal file
|
|
@ -0,0 +1,326 @@
|
|||
# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# ==============================================================================
|
||||
|
||||
"""Tests for tensorflow.python.client.session.Session's ClusterSpec Propagation.
|
||||
|
||||
These tests exercise the ClusterSpec Propagation capabilities of distributed
|
||||
Sessions.
|
||||
"""
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import numpy as np
|
||||
|
||||
from tensorflow.core.protobuf import cluster_pb2
|
||||
from tensorflow.core.protobuf import config_pb2
|
||||
from tensorflow.python.client import session
|
||||
from tensorflow.python.framework import common_shapes
|
||||
from tensorflow.python.framework import constant_op
|
||||
from tensorflow.python.framework import dtypes
|
||||
from tensorflow.python.framework import ops
|
||||
from tensorflow.python.framework import test_util
|
||||
from tensorflow.python.ops import array_ops
|
||||
from tensorflow.python.ops import math_ops
|
||||
# Import resource_variable_ops for the variables-to-tensor implicit conversion.
|
||||
from tensorflow.python.ops import resource_variable_ops # pylint: disable=unused-import
|
||||
from tensorflow.python.ops import state_ops
|
||||
from tensorflow.python.ops import variables
|
||||
from tensorflow.python.platform import googletest
|
||||
from tensorflow.python.training import server_lib
|
||||
|
||||
ops._USE_C_API = True
|
||||
|
||||
# NOTE(mrry): Dummy shape registration for ops used in the tests, since they
|
||||
# don't have C++ op registrations on which to attach C++ shape fns.
|
||||
ops.RegisterShape('ConstructionFails')(common_shapes.unknown_shape)
|
||||
|
||||
|
||||
class SessionClusterSpecPropagationTest(test_util.TensorFlowTestCase):
|
||||
|
||||
@test_util.disable_c_api # Operation._set_device doesn't work with C API
|
||||
def testClusterSpecPropagationSimple(self):
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
cluster_def = cluster_pb2.ClusterDef()
|
||||
job = cluster_def.job.add()
|
||||
job.name = 'worker'
|
||||
job.tasks[0] = server1.target[len('grpc://'):]
|
||||
job.tasks[1] = server2.target[len('grpc://'):]
|
||||
config = config_pb2.ConfigProto(cluster_def=cluster_def)
|
||||
|
||||
const = constant_op.constant(17)
|
||||
sess = session.Session(server1.target, config=config)
|
||||
output = sess.run(const)
|
||||
self.assertEqual(17, output)
|
||||
|
||||
@test_util.disable_c_api # Operation._set_device doesn't work with C API
|
||||
def testClusterSpecPropagationWorker2Placement(self):
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
cluster_def = cluster_pb2.ClusterDef()
|
||||
job = cluster_def.job.add()
|
||||
job.name = 'worker'
|
||||
job.tasks[0] = server1.target[len('grpc://'):]
|
||||
job.tasks[1] = server2.target[len('grpc://'):]
|
||||
config = config_pb2.ConfigProto(cluster_def=cluster_def)
|
||||
|
||||
with ops.Graph().as_default() as g, ops.device('/job:worker/task:1'):
|
||||
const = constant_op.constant(17)
|
||||
sess = session.Session(server1.target, config=config, graph=g)
|
||||
run_options = config_pb2.RunOptions(
|
||||
trace_level=config_pb2.RunOptions.FULL_TRACE)
|
||||
run_metadata = config_pb2.RunMetadata()
|
||||
output = sess.run(const, options=run_options, run_metadata=run_metadata)
|
||||
self.assertEqual(17, output)
|
||||
self.assertEqual(1,
|
||||
len([
|
||||
node_stats
|
||||
for dev_stats in run_metadata.step_stats.dev_stats
|
||||
for node_stats in dev_stats.node_stats
|
||||
if '/job:worker/replica:0/task:1/device:CPU:0' ==
|
||||
dev_stats.device and 'Const' == node_stats.node_name
|
||||
]))
|
||||
|
||||
@test_util.disable_c_api # Operation._set_device doesn't work with C API
|
||||
def testClusterSpecPropagationWorker1Placement(self):
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
cluster_def = cluster_pb2.ClusterDef()
|
||||
job = cluster_def.job.add()
|
||||
job.name = 'worker'
|
||||
job.tasks[0] = server1.target[len('grpc://'):]
|
||||
job.tasks[1] = server2.target[len('grpc://'):]
|
||||
config = config_pb2.ConfigProto(cluster_def=cluster_def)
|
||||
|
||||
with ops.Graph().as_default() as g, ops.device('/job:worker/task:0'):
|
||||
const = constant_op.constant(17)
|
||||
sess = session.Session(server1.target, config=config, graph=g)
|
||||
output = sess.run(const)
|
||||
self.assertEqual(17, output)
|
||||
|
||||
@test_util.disable_c_api # Operation._set_device doesn't work with C API
|
||||
def testClusterSpecPropagationThreeServers2Graphs(self):
|
||||
"""Boots 3 servers, creates 2 sessions, ensures appropriate operations.
|
||||
|
||||
We create 2 clusterspecs:
|
||||
1. server2 as the master, server1 as a worker
|
||||
2. server2 as the master, server3 as a worker
|
||||
|
||||
We ensure that variables on the workers are independent.
|
||||
"""
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
server3 = server_lib.Server.create_local_server()
|
||||
cluster_def1 = cluster_pb2.ClusterDef()
|
||||
job1 = cluster_def1.job.add()
|
||||
job1.name = 'worker1'
|
||||
job1.tasks[0] = server2.target[len('grpc://'):]
|
||||
job1.tasks[1] = server1.target[len('grpc://'):]
|
||||
|
||||
cluster_def2 = cluster_pb2.ClusterDef()
|
||||
job2 = cluster_def2.job.add()
|
||||
job2.name = 'worker2'
|
||||
job2.tasks[0] = server2.target[len('grpc://'):]
|
||||
job2.tasks[1] = server3.target[len('grpc://'):]
|
||||
|
||||
config1 = config_pb2.ConfigProto(cluster_def=cluster_def1)
|
||||
config2 = config_pb2.ConfigProto(cluster_def=cluster_def2)
|
||||
|
||||
with ops.Graph().as_default() as g1:
|
||||
with ops.device('/job:worker1/task:1'):
|
||||
var1 = variables.Variable(array_ops.zeros([2]), name='var1')
|
||||
update_op1 = state_ops.assign_add(
|
||||
var1, array_ops.ones([2]), name='var1_assign_add')
|
||||
init1 = variables.global_variables_initializer()
|
||||
|
||||
with ops.Graph().as_default() as g2:
|
||||
with ops.device('/job:worker2/task:1'):
|
||||
var2 = variables.Variable(array_ops.zeros([2]), name='var2')
|
||||
update_op2 = state_ops.assign_add(
|
||||
var2, array_ops.ones([2]), name='var2_assign_add')
|
||||
init2 = variables.global_variables_initializer()
|
||||
|
||||
sess1 = session.Session(server2.target, graph=g1, config=config1)
|
||||
sess2 = session.Session(server2.target, graph=g2, config=config2)
|
||||
|
||||
init1.run(session=sess1)
|
||||
init2.run(session=sess2)
|
||||
|
||||
expected_zeros = np.zeros([2])
|
||||
expected_ones = np.ones([2])
|
||||
|
||||
self.assertAllEqual(expected_zeros, sess1.run(var1))
|
||||
self.assertAllEqual(expected_zeros, sess2.run(var2))
|
||||
|
||||
self.assertAllEqual(expected_ones, sess1.run(update_op1))
|
||||
self.assertAllEqual(expected_ones, sess1.run(var1))
|
||||
self.assertAllEqual(expected_zeros, sess2.run(var2))
|
||||
self.assertAllEqual(expected_ones, sess2.run(update_op2))
|
||||
self.assertAllEqual(expected_ones + expected_ones, sess1.run(update_op1))
|
||||
self.assertAllEqual(expected_ones, sess2.run(var2))
|
||||
self.assertAllEqual(expected_ones + expected_ones, sess1.run(var1))
|
||||
|
||||
@test_util.disable_c_api # Operation._set_device doesn't work with C API
|
||||
def testClusterSpecPropagationThreeServers(self):
|
||||
"""Boots 3 servers, creates 2 sessions, ensures appropriate operations.
|
||||
|
||||
We create 2 clusterspecs:
|
||||
1. server2 as the master, server1 as a worker
|
||||
2. server2 as the master, server3 as a worker
|
||||
|
||||
We ensure that variables on the workers are independent.
|
||||
"""
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
server3 = server_lib.Server.create_local_server()
|
||||
cluster_def1 = cluster_pb2.ClusterDef()
|
||||
job1 = cluster_def1.job.add()
|
||||
job1.name = 'worker'
|
||||
job1.tasks[0] = server2.target[len('grpc://'):]
|
||||
job1.tasks[1] = server1.target[len('grpc://'):]
|
||||
|
||||
cluster_def2 = cluster_pb2.ClusterDef()
|
||||
job2 = cluster_def2.job.add()
|
||||
job2.name = 'worker'
|
||||
job2.tasks[0] = server2.target[len('grpc://'):]
|
||||
job2.tasks[1] = server3.target[len('grpc://'):]
|
||||
|
||||
config1 = config_pb2.ConfigProto(cluster_def=cluster_def1)
|
||||
config2 = config_pb2.ConfigProto(cluster_def=cluster_def2)
|
||||
|
||||
with ops.device('/job:worker/task:1'):
|
||||
var = variables.Variable(array_ops.zeros([2]), name='var')
|
||||
feed = array_ops.placeholder(dtypes.float32, shape=(2))
|
||||
update_op = var.assign_add(feed)
|
||||
|
||||
sess1 = session.Session(server2.target, config=config1)
|
||||
sess2 = session.Session(server2.target, config=config2)
|
||||
|
||||
variables.global_variables_initializer().run(session=sess1)
|
||||
variables.global_variables_initializer().run(session=sess2)
|
||||
|
||||
expected_zeros = np.zeros([2])
|
||||
expected_ones = np.ones([2])
|
||||
|
||||
self.assertAllEqual(expected_zeros, sess1.run(var))
|
||||
self.assertAllEqual(expected_zeros, sess2.run(var))
|
||||
self.assertAllEqual(expected_ones,
|
||||
sess1.run(update_op, feed_dict={feed: expected_ones}))
|
||||
self.assertAllEqual(expected_ones, sess1.run(var))
|
||||
self.assertAllEqual(expected_zeros, sess2.run(var))
|
||||
self.assertAllEqual(expected_ones,
|
||||
sess2.run(update_op, feed_dict={feed: expected_ones}))
|
||||
self.assertAllEqual(expected_ones + expected_ones,
|
||||
sess1.run(update_op, feed_dict={feed: expected_ones}))
|
||||
self.assertAllEqual(expected_ones, sess2.run(var))
|
||||
self.assertAllEqual(expected_ones + expected_ones, sess1.run(var))
|
||||
|
||||
@test_util.disable_c_api # Operation._set_device doesn't work with C API
|
||||
def testClusterSpecPropagationThreeServersOneCluster(self):
|
||||
"""Boots 3 servers, ensures appropriate communication across workers.
|
||||
|
||||
Additionally, in this cluster, we ensure the master is not the 0-th worker.
|
||||
|
||||
Note: this test only uses one session.
|
||||
"""
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
server3 = server_lib.Server.create_local_server()
|
||||
cluster_def = cluster_pb2.ClusterDef()
|
||||
job = cluster_def.job.add()
|
||||
job.name = 'worker'
|
||||
job.tasks[0] = server3.target[len('grpc://'):]
|
||||
job.tasks[1] = server2.target[len('grpc://'):]
|
||||
job.tasks[2] = server1.target[len('grpc://'):]
|
||||
config = config_pb2.ConfigProto(cluster_def=cluster_def)
|
||||
|
||||
# Add ops to the devices in non-linear order.
|
||||
|
||||
with ops.device('/job:worker/task:1'):
|
||||
feed1 = array_ops.placeholder(dtypes.float32, shape=(2))
|
||||
const1 = constant_op.constant(2.0)
|
||||
mul1 = const1 * feed1
|
||||
|
||||
with ops.device('/job:worker/task:2'):
|
||||
feed2 = array_ops.placeholder(dtypes.float32, shape=(2))
|
||||
const2 = constant_op.constant(2.0)
|
||||
mul2 = const2 * feed2
|
||||
|
||||
with ops.device('/job:worker/task:0'):
|
||||
feed0 = array_ops.placeholder(dtypes.float32, shape=(2))
|
||||
const0 = constant_op.constant(2.0)
|
||||
mul0 = const0 * feed0
|
||||
|
||||
sum_op = mul0 + mul1 + mul2
|
||||
|
||||
ones = np.ones([2])
|
||||
run_options = config_pb2.RunOptions(
|
||||
trace_level=config_pb2.RunOptions.FULL_TRACE)
|
||||
run_metadata = config_pb2.RunMetadata()
|
||||
|
||||
# Run!
|
||||
with session.Session(server1.target, config=config) as sess:
|
||||
output = sess.run(
|
||||
sum_op,
|
||||
options=run_options,
|
||||
run_metadata=run_metadata,
|
||||
feed_dict={feed1: ones,
|
||||
feed2: ones,
|
||||
feed0: ones})
|
||||
self.assertAllEqual(6 * ones, output)
|
||||
|
||||
self.assertEqual(
|
||||
3,
|
||||
len([
|
||||
dev_stats.device
|
||||
for dev_stats in run_metadata.step_stats.dev_stats
|
||||
for node_stats in dev_stats.node_stats
|
||||
if '/job:worker/replica:0/task:' in dev_stats.device and
|
||||
node_stats.node_name.startswith('Const')
|
||||
]), run_metadata)
|
||||
|
||||
@test_util.disable_c_api # Partial runs don't work with C API
|
||||
def testClusterSpecPropagationPartialRun(self):
|
||||
"""Test successful partial run with ClusterSpec propagation."""
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
|
||||
cluster_def = cluster_pb2.ClusterDef()
|
||||
job = cluster_def.job.add()
|
||||
job.name = 'worker'
|
||||
job.tasks[0] = server1.target[len('grpc://'):]
|
||||
job.tasks[1] = server2.target[len('grpc://'):]
|
||||
config = config_pb2.ConfigProto(cluster_def=cluster_def)
|
||||
|
||||
with ops.device('/job:worker/task:0'):
|
||||
a = array_ops.placeholder(dtypes.float32, shape=[])
|
||||
with ops.device('/job:worker/task:1'):
|
||||
b = array_ops.placeholder(dtypes.float32, shape=[])
|
||||
c = array_ops.placeholder(dtypes.float32, shape=[])
|
||||
r1 = math_ops.add(a, b)
|
||||
with ops.device('/job:worker/task:0'):
|
||||
r2 = math_ops.multiply(r1, c)
|
||||
|
||||
with session.Session(server1.target, config=config) as sess:
|
||||
h = sess.partial_run_setup([r1, r2], [a, b, c])
|
||||
res = sess.partial_run(h, r1, feed_dict={a: 1, b: 2})
|
||||
self.assertEqual(3, res)
|
||||
res = sess.partial_run(h, r2, feed_dict={c: 3})
|
||||
self.assertEqual(9, res)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
googletest.main()
|
||||
|
|
@ -29,7 +29,6 @@ import six
|
|||
from six.moves import xrange # pylint: disable=redefined-builtin
|
||||
|
||||
from tensorflow.core.lib.core import error_codes_pb2
|
||||
from tensorflow.core.protobuf import cluster_pb2
|
||||
from tensorflow.core.protobuf import config_pb2
|
||||
from tensorflow.core.protobuf import rewriter_config_pb2
|
||||
from tensorflow.python.client import session
|
||||
|
|
@ -1644,277 +1643,6 @@ class SessionTest(test_util.TensorFlowTestCase):
|
|||
server = server_lib.Server.create_local_server()
|
||||
self.runTestBuildGraphError(session.Session(server.target))
|
||||
|
||||
@test_util.disable_c_api # Operation._set_device doesn't work with C API
|
||||
def testClusterSpecPropagationSimple(self):
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
cluster_def = cluster_pb2.ClusterDef()
|
||||
job = cluster_def.job.add()
|
||||
job.name = 'worker'
|
||||
job.tasks[0] = server1.target[len('grpc://'):]
|
||||
job.tasks[1] = server2.target[len('grpc://'):]
|
||||
config = config_pb2.ConfigProto(cluster_def=cluster_def)
|
||||
|
||||
const = constant_op.constant(17)
|
||||
sess = session.Session(server1.target, config=config)
|
||||
output = sess.run(const)
|
||||
self.assertEqual(17, output)
|
||||
|
||||
@test_util.disable_c_api # Operation._set_device doesn't work with C API
|
||||
def testClusterSpecPropagationWorker2Placement(self):
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
cluster_def = cluster_pb2.ClusterDef()
|
||||
job = cluster_def.job.add()
|
||||
job.name = 'worker'
|
||||
job.tasks[0] = server1.target[len('grpc://'):]
|
||||
job.tasks[1] = server2.target[len('grpc://'):]
|
||||
config = config_pb2.ConfigProto(cluster_def=cluster_def)
|
||||
|
||||
with ops.Graph().as_default() as g, ops.device('/job:worker/task:1'):
|
||||
const = constant_op.constant(17)
|
||||
sess = session.Session(server1.target, config=config, graph=g)
|
||||
run_options = config_pb2.RunOptions(
|
||||
trace_level=config_pb2.RunOptions.FULL_TRACE)
|
||||
run_metadata = config_pb2.RunMetadata()
|
||||
output = sess.run(const, options=run_options, run_metadata=run_metadata)
|
||||
self.assertEqual(17, output)
|
||||
self.assertEqual(1,
|
||||
len([
|
||||
node_stats
|
||||
for dev_stats in run_metadata.step_stats.dev_stats
|
||||
for node_stats in dev_stats.node_stats
|
||||
if '/job:worker/replica:0/task:1/device:CPU:0' ==
|
||||
dev_stats.device and 'Const' == node_stats.node_name
|
||||
]))
|
||||
|
||||
@test_util.disable_c_api # Operation._set_device doesn't work with C API
|
||||
def testClusterSpecPropagationWorker1Placement(self):
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
cluster_def = cluster_pb2.ClusterDef()
|
||||
job = cluster_def.job.add()
|
||||
job.name = 'worker'
|
||||
job.tasks[0] = server1.target[len('grpc://'):]
|
||||
job.tasks[1] = server2.target[len('grpc://'):]
|
||||
config = config_pb2.ConfigProto(cluster_def=cluster_def)
|
||||
|
||||
with ops.Graph().as_default() as g, ops.device('/job:worker/task:0'):
|
||||
const = constant_op.constant(17)
|
||||
sess = session.Session(server1.target, config=config, graph=g)
|
||||
output = sess.run(const)
|
||||
self.assertEqual(17, output)
|
||||
|
||||
@test_util.disable_c_api # Operation._set_device doesn't work with C API
|
||||
def testClusterSpecPropagationThreeServers2Graphs(self):
|
||||
"""Boots 3 servers, creates 2 sessions, ensures appropriate operations.
|
||||
|
||||
We create 2 clusterspecs:
|
||||
1. server2 as the master, server1 as a worker
|
||||
2. server2 as the master, server3 as a worker
|
||||
|
||||
We ensure that variables on the workers are independent.
|
||||
"""
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
server3 = server_lib.Server.create_local_server()
|
||||
cluster_def1 = cluster_pb2.ClusterDef()
|
||||
job1 = cluster_def1.job.add()
|
||||
job1.name = 'worker1'
|
||||
job1.tasks[0] = server2.target[len('grpc://'):]
|
||||
job1.tasks[1] = server1.target[len('grpc://'):]
|
||||
|
||||
cluster_def2 = cluster_pb2.ClusterDef()
|
||||
job2 = cluster_def2.job.add()
|
||||
job2.name = 'worker2'
|
||||
job2.tasks[0] = server2.target[len('grpc://'):]
|
||||
job2.tasks[1] = server3.target[len('grpc://'):]
|
||||
|
||||
config1 = config_pb2.ConfigProto(cluster_def=cluster_def1)
|
||||
config2 = config_pb2.ConfigProto(cluster_def=cluster_def2)
|
||||
|
||||
with ops.Graph().as_default() as g1:
|
||||
with ops.device('/job:worker1/task:1'):
|
||||
var1 = variables.Variable(array_ops.zeros([2]), name='var1')
|
||||
update_op1 = state_ops.assign_add(
|
||||
var1, array_ops.ones([2]), name='var1_assign_add')
|
||||
init1 = variables.global_variables_initializer()
|
||||
|
||||
with ops.Graph().as_default() as g2:
|
||||
with ops.device('/job:worker2/task:1'):
|
||||
var2 = variables.Variable(array_ops.zeros([2]), name='var2')
|
||||
update_op2 = state_ops.assign_add(
|
||||
var2, array_ops.ones([2]), name='var2_assign_add')
|
||||
init2 = variables.global_variables_initializer()
|
||||
|
||||
sess1 = session.Session(server2.target, graph=g1, config=config1)
|
||||
sess2 = session.Session(server2.target, graph=g2, config=config2)
|
||||
|
||||
init1.run(session=sess1)
|
||||
init2.run(session=sess2)
|
||||
|
||||
expected_zeros = np.zeros([2])
|
||||
expected_ones = np.ones([2])
|
||||
|
||||
self.assertAllEqual(expected_zeros, sess1.run(var1))
|
||||
self.assertAllEqual(expected_zeros, sess2.run(var2))
|
||||
|
||||
self.assertAllEqual(expected_ones, sess1.run(update_op1))
|
||||
self.assertAllEqual(expected_ones, sess1.run(var1))
|
||||
self.assertAllEqual(expected_zeros, sess2.run(var2))
|
||||
self.assertAllEqual(expected_ones, sess2.run(update_op2))
|
||||
self.assertAllEqual(expected_ones + expected_ones, sess1.run(update_op1))
|
||||
self.assertAllEqual(expected_ones, sess2.run(var2))
|
||||
self.assertAllEqual(expected_ones + expected_ones, sess1.run(var1))
|
||||
|
||||
@test_util.disable_c_api # Operation._set_device doesn't work with C API
|
||||
def testClusterSpecPropagationThreeServers(self):
|
||||
"""Boots 3 servers, creates 2 sessions, ensures appropriate operations.
|
||||
|
||||
We create 2 clusterspecs:
|
||||
1. server2 as the master, server1 as a worker
|
||||
2. server2 as the master, server3 as a worker
|
||||
|
||||
We ensure that variables on the workers are independent.
|
||||
"""
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
server3 = server_lib.Server.create_local_server()
|
||||
cluster_def1 = cluster_pb2.ClusterDef()
|
||||
job1 = cluster_def1.job.add()
|
||||
job1.name = 'worker'
|
||||
job1.tasks[0] = server2.target[len('grpc://'):]
|
||||
job1.tasks[1] = server1.target[len('grpc://'):]
|
||||
|
||||
cluster_def2 = cluster_pb2.ClusterDef()
|
||||
job2 = cluster_def2.job.add()
|
||||
job2.name = 'worker'
|
||||
job2.tasks[0] = server2.target[len('grpc://'):]
|
||||
job2.tasks[1] = server3.target[len('grpc://'):]
|
||||
|
||||
config1 = config_pb2.ConfigProto(cluster_def=cluster_def1)
|
||||
config2 = config_pb2.ConfigProto(cluster_def=cluster_def2)
|
||||
|
||||
with ops.device('/job:worker/task:1'):
|
||||
var = variables.Variable(array_ops.zeros([2]), name='var')
|
||||
feed = array_ops.placeholder(dtypes.float32, shape=(2))
|
||||
update_op = var.assign_add(feed)
|
||||
|
||||
sess1 = session.Session(server2.target, config=config1)
|
||||
sess2 = session.Session(server2.target, config=config2)
|
||||
|
||||
variables.global_variables_initializer().run(session=sess1)
|
||||
variables.global_variables_initializer().run(session=sess2)
|
||||
|
||||
expected_zeros = np.zeros([2])
|
||||
expected_ones = np.ones([2])
|
||||
|
||||
self.assertAllEqual(expected_zeros, sess1.run(var))
|
||||
self.assertAllEqual(expected_zeros, sess2.run(var))
|
||||
self.assertAllEqual(expected_ones,
|
||||
sess1.run(update_op, feed_dict={feed: expected_ones}))
|
||||
self.assertAllEqual(expected_ones, sess1.run(var))
|
||||
self.assertAllEqual(expected_zeros, sess2.run(var))
|
||||
self.assertAllEqual(expected_ones,
|
||||
sess2.run(update_op, feed_dict={feed: expected_ones}))
|
||||
self.assertAllEqual(expected_ones + expected_ones,
|
||||
sess1.run(update_op, feed_dict={feed: expected_ones}))
|
||||
self.assertAllEqual(expected_ones, sess2.run(var))
|
||||
self.assertAllEqual(expected_ones + expected_ones, sess1.run(var))
|
||||
|
||||
@test_util.disable_c_api # Operation._set_device doesn't work with C API
|
||||
def testClusterSpecPropagationThreeServersOneCluster(self):
|
||||
"""Boots 3 servers, ensures appropriate communication across workers.
|
||||
|
||||
Additionally, in this cluster, we ensure the master is not the 0-th worker.
|
||||
|
||||
Note: this test only uses one session.
|
||||
"""
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
server3 = server_lib.Server.create_local_server()
|
||||
cluster_def = cluster_pb2.ClusterDef()
|
||||
job = cluster_def.job.add()
|
||||
job.name = 'worker'
|
||||
job.tasks[0] = server3.target[len('grpc://'):]
|
||||
job.tasks[1] = server2.target[len('grpc://'):]
|
||||
job.tasks[2] = server1.target[len('grpc://'):]
|
||||
config = config_pb2.ConfigProto(cluster_def=cluster_def)
|
||||
|
||||
# Add ops to the devices in non-linear order.
|
||||
|
||||
with ops.device('/job:worker/task:1'):
|
||||
feed1 = array_ops.placeholder(dtypes.float32, shape=(2))
|
||||
const1 = constant_op.constant(2.0)
|
||||
mul1 = const1 * feed1
|
||||
|
||||
with ops.device('/job:worker/task:2'):
|
||||
feed2 = array_ops.placeholder(dtypes.float32, shape=(2))
|
||||
const2 = constant_op.constant(2.0)
|
||||
mul2 = const2 * feed2
|
||||
|
||||
with ops.device('/job:worker/task:0'):
|
||||
feed0 = array_ops.placeholder(dtypes.float32, shape=(2))
|
||||
const0 = constant_op.constant(2.0)
|
||||
mul0 = const0 * feed0
|
||||
|
||||
sum_op = mul0 + mul1 + mul2
|
||||
|
||||
ones = np.ones([2])
|
||||
run_options = config_pb2.RunOptions(
|
||||
trace_level=config_pb2.RunOptions.FULL_TRACE)
|
||||
run_metadata = config_pb2.RunMetadata()
|
||||
|
||||
# Run!
|
||||
with session.Session(server1.target, config=config) as sess:
|
||||
output = sess.run(
|
||||
sum_op,
|
||||
options=run_options,
|
||||
run_metadata=run_metadata,
|
||||
feed_dict={feed1: ones,
|
||||
feed2: ones,
|
||||
feed0: ones})
|
||||
self.assertAllEqual(6 * ones, output)
|
||||
|
||||
self.assertEqual(
|
||||
3,
|
||||
len([
|
||||
dev_stats.device
|
||||
for dev_stats in run_metadata.step_stats.dev_stats
|
||||
for node_stats in dev_stats.node_stats
|
||||
if '/job:worker/replica:0/task:' in dev_stats.device and
|
||||
node_stats.node_name.startswith('Const')
|
||||
]), run_metadata)
|
||||
|
||||
@test_util.disable_c_api # Partial runs don't work with C API
|
||||
def testClusterSpecPropagationPartialRun(self):
|
||||
"""Test successful partial run with ClusterSpec propagation."""
|
||||
server1 = server_lib.Server.create_local_server()
|
||||
server2 = server_lib.Server.create_local_server()
|
||||
|
||||
cluster_def = cluster_pb2.ClusterDef()
|
||||
job = cluster_def.job.add()
|
||||
job.name = 'worker'
|
||||
job.tasks[0] = server1.target[len('grpc://'):]
|
||||
job.tasks[1] = server2.target[len('grpc://'):]
|
||||
config = config_pb2.ConfigProto(cluster_def=cluster_def)
|
||||
|
||||
with ops.device('/job:worker/task:0'):
|
||||
a = array_ops.placeholder(dtypes.float32, shape=[])
|
||||
with ops.device('/job:worker/task:1'):
|
||||
b = array_ops.placeholder(dtypes.float32, shape=[])
|
||||
c = array_ops.placeholder(dtypes.float32, shape=[])
|
||||
r1 = math_ops.add(a, b)
|
||||
with ops.device('/job:worker/task:0'):
|
||||
r2 = math_ops.multiply(r1, c)
|
||||
|
||||
with session.Session(server1.target, config=config) as sess:
|
||||
h = sess.partial_run_setup([r1, r2], [a, b, c])
|
||||
res = sess.partial_run(h, r1, feed_dict={a: 1, b: 2})
|
||||
self.assertEqual(3, res)
|
||||
res = sess.partial_run(h, r2, feed_dict={c: 3})
|
||||
self.assertEqual(9, res)
|
||||
|
||||
def testGraphOptimizer(self):
|
||||
rewrite_options = rewriter_config_pb2.RewriterConfig(
|
||||
disable_model_pruning=False, constant_folding=True)
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user