import numpy as np import unittest from caffe2.python import core, workspace, muji, test_util @unittest.skipIf(not workspace.has_gpu_support, "no gpu") class TestMuji(test_util.TestCase): def RunningAllreduceWithGPUs(self, gpu_ids, allreduce_function): """A base function to test different scenarios.""" net = core.Net("mujitest") for id in gpu_ids: net.ConstantFill( [], "testblob_gpu_" + str(id), shape=[1, 2, 3, 4], value=float(id + 1), device_option=muji.OnGPU(id) ) allreduce_function( net, ["testblob_gpu_" + str(i) for i in gpu_ids], "_reduced", gpu_ids ) workspace.RunNetOnce(net) target_value = sum(gpu_ids) + len(gpu_ids) all_blobs = workspace.Blobs() all_blobs.sort() for blob in all_blobs: print('{} {}'.format(blob, workspace.FetchBlob(blob))) for idx in gpu_ids: blob = workspace.FetchBlob("testblob_gpu_" + str(idx) + "_reduced") np.testing.assert_array_equal( blob, target_value, err_msg="gpu id %d of %s" % (idx, str(gpu_ids)) ) def testAllreduceFallback(self): self.RunningAllreduceWithGPUs( list(range(workspace.NumGpuDevices())), muji.AllreduceFallback ) def testAllreduceSingleGPU(self): for i in range(workspace.NumGpuDevices()): self.RunningAllreduceWithGPUs([i], muji.Allreduce) def testAllreduceWithTwoGPUs(self): pattern = workspace.GetGpuPeerAccessPattern() if pattern.shape[0] >= 2 and np.all(pattern[:2, :2]): self.RunningAllreduceWithGPUs([0, 1], muji.Allreduce2) else: print('Skipping allreduce with 2 gpus. Not peer access ready.') def testAllreduceWithFourGPUs(self): pattern = workspace.GetGpuPeerAccessPattern() if pattern.shape[0] >= 4 and np.all(pattern[:4, :4]): self.RunningAllreduceWithGPUs([0, 1, 2, 3], muji.Allreduce4) else: print('Skipping allreduce with 4 gpus. Not peer access ready.') def testAllreduceWithFourGPUsAndTwoGroups(self): pattern = workspace.GetGpuPeerAccessPattern() if pattern.shape[0] >= 4 and np.all(pattern[:2, :2]) and np.all(pattern[2:4, 2:4]): self.RunningAllreduceWithGPUs([0, 1, 2, 3], muji.Allreduce4Group2) else: print('Skipping allreduce with 4 gpus and 2 groups. Not peer access ready.') def testAllreduceWithEightGPUs(self): pattern = workspace.GetGpuPeerAccessPattern() if ( pattern.shape[0] >= 8 and np.all(pattern[:4, :4]) and np.all(pattern[4:, 4:]) ): self.RunningAllreduceWithGPUs( list(range(8)), muji.Allreduce8) else: print('Skipping allreduce with 8 gpus. Not peer access ready.') if __name__ == '__main__': unittest.main()