import unittest from caffe2.python import core from hypothesis import given import hypothesis.strategies as st import caffe2.python.hypothesis_test_util as hu from caffe2.python import workspace from caffe2.python.functional import Functional import numpy as np @st.composite def _tensor_splits(draw, add_axis=False): """Generates (axis, split_info, tensor_splits) tuples.""" tensor = draw(hu.tensor(min_value=4)) # Each dim has at least 4 elements. axis = draw(st.integers(0, len(tensor.shape) - 1)) if add_axis: # Simple case: get individual slices along one axis, where each of them # is (N-1)-dimensional. The axis will be added back upon concatenation. return ( axis, np.ones(tensor.shape[axis], dtype=np.int32), [ np.array(tensor.take(i, axis=axis)) for i in range(tensor.shape[axis]) ] ) else: # General case: pick some (possibly consecutive, even non-unique) # indices at which we will split the tensor, along the given axis. splits = sorted( draw( st. lists(elements=st.integers(0, tensor.shape[axis]), max_size=4) ) + [0, tensor.shape[axis]] ) return ( axis, np.array(np.diff(splits), dtype=np.int32), [ tensor.take(range(splits[i], splits[i + 1]), axis=axis) for i in range(len(splits) - 1) ], ) class TestFunctional(hu.HypothesisTestCase): @given(X=hu.tensor(), engine=st.sampled_from(["", "CUDNN"]), **hu.gcs) def test_relu(self, X, engine, gc, dc): X += 0.02 * np.sign(X) X[X == 0.0] += 0.02 output = Functional.Relu(X, device_option=gc) Y_l = output[0] Y_d = output["output_0"] with workspace.WorkspaceGuard("tmp_workspace"): op = core.CreateOperator("Relu", ["X"], ["Y"], engine=engine) workspace.FeedBlob("X", X) workspace.RunOperatorOnce(op) Y_ref = workspace.FetchBlob("Y") np.testing.assert_array_equal( Y_l, Y_ref, err_msg='Functional Relu result mismatch' ) np.testing.assert_array_equal( Y_d, Y_ref, err_msg='Functional Relu result mismatch' ) @given(tensor_splits=_tensor_splits(), **hu.gcs) def test_concat(self, tensor_splits, gc, dc): # Input Size: 1 -> inf axis, _, splits = tensor_splits concat_result, split_info = Functional.Concat(*splits, axis=axis, device_option=gc) concat_result_ref = np.concatenate(splits, axis=axis) split_info_ref = np.array([a.shape[axis] for a in splits]) np.testing.assert_array_equal( concat_result, concat_result_ref, err_msg='Functional Concat result mismatch' ) np.testing.assert_array_equal( split_info, split_info_ref, err_msg='Functional Concat split info mismatch' ) @given(tensor_splits=_tensor_splits(), split_as_arg=st.booleans(), **hu.gcs) def test_split(self, tensor_splits, split_as_arg, gc, dc): # Output Size: 1 - inf axis, split_info, splits = tensor_splits split_as_arg = True if split_as_arg: input_tensors = [np.concatenate(splits, axis=axis)] kwargs = dict(axis=axis, split=split_info, num_output=len(splits)) else: input_tensors = [np.concatenate(splits, axis=axis), split_info] kwargs = dict(axis=axis, num_output=len(splits)) result = Functional.Split(*input_tensors, device_option=gc, **kwargs) def split_ref(input, split=split_info): s = np.cumsum([0] + list(split)) return [ np.array(input.take(np.arange(s[i], s[i + 1]), axis=axis)) for i in range(len(split)) ] result_ref = split_ref(*input_tensors) for i, ref in enumerate(result_ref): np.testing.assert_array_equal( result[i], ref, err_msg='Functional Relu result mismatch' ) if __name__ == '__main__': unittest.main()