From e41875493da9f760aec4a9bbb8f05387b9d3c2ee Mon Sep 17 00:00:00 2001 From: zhttjd Date: Wed, 10 Sep 2025 14:27:59 +0800 Subject: [PATCH] Update ut for patcher --- tests/README.md | 1 + tests/requirements.txt | 5 +- tests/torch/test_patcher_mmcv.py | 228 ++++----- tests/torch/test_patcher_mmdet.py | 29 +- tests/torch/test_patcher_mmengine.py | 694 ++++++++++++++++++++++++++- 5 files changed, 800 insertions(+), 157 deletions(-) diff --git a/tests/README.md b/tests/README.md index aae82d60..13f8a25d 100644 --- a/tests/README.md +++ b/tests/README.md @@ -17,6 +17,7 @@ ```Bash git clone -b 1.x https://github.com/open-mmlab/mmcv.git cd mmcv + MMCV_WITH_OP=1 FORCE_NPU=1 python setup.py build_ext MMCV_WITH_OP=1 FORCE_NPU=1 python setup.py install cd ../ ``` diff --git a/tests/requirements.txt b/tests/requirements.txt index 4873e1df..45c10eb4 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,3 +1,6 @@ torch_scatter==2.1.2 torchvision==0.16.0 -hypothesis \ No newline at end of file +hypothesis +expecttest +pyyaml +prettytable \ No newline at end of file diff --git a/tests/torch/test_patcher_mmcv.py b/tests/torch/test_patcher_mmcv.py index d8df9460..4998177c 100644 --- a/tests/torch/test_patcher_mmcv.py +++ b/tests/torch/test_patcher_mmcv.py @@ -12,42 +12,40 @@ from torch_npu.testing.testcase import TestCase, run_tests from mx_driving.patcher import mmcv_patch, mmengine_patch - def assertIsNotInstance(obj, cls): assert not isinstance(obj, cls), f"Expected {repr(obj)} to NOT be an instance of {cls.__name__}" -class TestPatcherMMCV(TestCase): - def setUp(self): - pass +class EmptyAttribute: + pass + + +class TestMultiScaleDeformableAttnPatch(TestCase): + def test_monkey_patch(self): + """Test monkeypatching for multi_scale_deformable_attn""" + mock_mmcv = MagicMock() + + # Call msda function + mmcv_patch.msda(mock_mmcv, {}) + + # Assert function replacements + assertIsNotInstance(mock_mmcv.ops.multi_scale_deform_attn.MultiScaleDeformableAttnFunction.forward, MagicMock) + assertIsNotInstance(mock_mmcv.ops.multi_scale_deform_attn.MultiScaleDeformableAttnFunction.backward, MagicMock) - def test_patch_mmcv_version_found(self): - """Test successful import of mmcv and version patching""" - with patch('importlib.import_module') as mock_import: - # Mock mmcv module - mock_mmcv = MagicMock() - mock_mmcv.__version__ = "1.7.2" - mock_import.return_value = mock_mmcv - - # Call patching function - mmcv_patch.patch_mmcv_version("2.1.0") - - # Assert version restoration - self.assertEqual(mock_mmcv.__version__, "1.7.2", "Version should be restored to original") - - # Assert import attempts - mock_import.assert_any_call("mmdet") - mock_import.assert_any_call("mmdet3d") + def test_patch_failure(self): + mock_mmcv = MagicMock() + with self.assertRaises(AttributeError): + mock_mmcv.ops = EmptyAttribute + mmcv_patch.msda(mock_mmcv, {}) + + mock_mmcv = MagicMock() + with self.assertRaises(AttributeError): + mock_mmcv.ops = EmptyAttribute + mmcv_patch.msda(mock_mmcv, {}) - def test_patch_mmcv_version_not_found(self): - """Test handling when mmcv cannot be imported""" - with patch('importlib.import_module') as mock_import: - mock_import.side_effect = ImportError - # Assert no exception raised - mmcv_patch.patch_mmcv_version("666.888.2333") - mock_import.assert_called_once_with("mmcv") - def test_dc(self): +class TestDeformConv2dPatch(TestCase): + def test_monkey_patch(self): """Test monkeypatching for deform_conv2d""" mock_mmcv = MagicMock() @@ -57,8 +55,21 @@ class TestPatcherMMCV(TestCase): # Assert function replacements assertIsNotInstance(mock_mmcv.ops.deform_conv.DeformConv2dFunction, MagicMock) assertIsNotInstance(mock_mmcv.ops.deform_conv.deform_conv2d, MagicMock) + + def test_patch_failure(self): + mock_mmcv = MagicMock() + with self.assertRaises(AttributeError): + mock_mmcv.ops = EmptyAttribute + mmcv_patch.dc(mock_mmcv, {}) + + mock_mmcv = MagicMock() + with self.assertRaises(AttributeError): + mock_mmcv.ops = EmptyAttribute + mmcv_patch.dc(mock_mmcv, {}) - def test_mdc(self): + +class TestModulatedDeformConv2dPatch(TestCase): + def test_monkey_patch(self): """Test monkeypatching for modulated_deform_conv2d""" mock_mmcv = MagicMock() @@ -69,120 +80,57 @@ class TestPatcherMMCV(TestCase): assertIsNotInstance(mock_mmcv.ops.modulated_deform_conv.ModulatedDeformConv2dFunction, MagicMock) assertIsNotInstance(mock_mmcv.ops.modulated_deform_conv.modulated_deform_conv2d, MagicMock) - def test_msda(self): - """Test monkeypatching for multi_scale_deformable_attn""" + def test_patch_failure(self): mock_mmcv = MagicMock() - - # Call msda function - mmcv_patch.msda(mock_mmcv, {}) + with self.assertRaises(AttributeError): + mock_mmcv.ops = EmptyAttribute + mmcv_patch.mdc(mock_mmcv, {}) - # Assert function replacements - assertIsNotInstance(mock_mmcv.ops.multi_scale_deform_attn.MultiScaleDeformableAttnFunction.forward, MagicMock) - assertIsNotInstance(mock_mmcv.ops.multi_scale_deform_attn.MultiScaleDeformableAttnFunction.backward, MagicMock) - - def test_ddp_patch(self): mock_mmcv = MagicMock() - mock_mmcv.device.npu.NPUDistributedDataParallel = types.ModuleType('npuddp') - mmddp_b4replacement = mock_mmcv.parallel.distributed.MMDistributedDataParallel - - # Apply monkey patch - mmengine_patch.ddp(mock_mmcv, {}) - assertIsNotInstance(mmddp_b4replacement._run_ddp_forward, MagicMock) - assertIsNotInstance(mock_mmcv.parallel.distributed.MMDistributedDataParallel, MagicMock) - - -class TestPatcherStream(TestCase): - def setUp(self): - # Create mock mmcvparallel module - self.mock_mmcv = types.ModuleType('mmcv') - self.mock_mmcv.parallel = types.ModuleType('mmcvparallel') - self.mock_mmcv.parallel._functions = types.ModuleType('_functions') - self.mock_mmcv.parallel._functions.Scatter = MagicMock() - - # Add the missing attributes for torch - self.mock_mmcv.parallel._functions.torch = types.ModuleType('torch') - self.mock_mmcv.parallel._functions.torch.device = torch.device - - # Set up necessary functions and types - self.mock_mmcv.parallel._functions.get_input_device = MagicMock() - self.mock_mmcv.parallel._functions.scatter = MagicMock() - self.mock_mmcv.parallel._functions.synchronize_stream = MagicMock() - self.mock_mmcv.parallel._functions._get_stream = MagicMock() - self.mock_mmcv.parallel._functions.Tensor = torch.Tensor - - # Set default return values - self.mock_mmcv.parallel._functions.get_input_device.return_value = -1 - self.mock_mmcv.parallel._functions.scatter.return_value = ["scatter_output"] - - # Dynamically return target # of gpu - def scatter_mock(input_, target_gpus, streams=None): - return [f"output_{i}" for i in range(len(target_gpus))] - - self.mock_mmcv.parallel._functions.scatter = MagicMock(side_effect=scatter_mock) - - def test_monkeypatch(self): - """Verify forward method is correctly replaced""" - options = {} - - # Apply monkeypatch using stream function - mmengine_patch.stream(self.mock_mmcv, options) - - # Verify Scatter.forward has been replaced with new_forward - assertIsNotInstance(self.mock_mmcv.parallel._functions.Scatter.forward, MagicMock) - - def test_new_forward_input_device_neg_one(self): - """Test stream behavior when input device is -1 and target GPUs are not [-1]""" - mmengine_patch.stream(self.mock_mmcv, {}) - - # Create mock input - test_input = MagicMock(spec=torch.Tensor) - target_gpus = [0, 1] - - # Execute new forward method - result = self.mock_mmcv.parallel._functions.Scatter.forward.__func__(target_gpus, test_input) - - # Verify stream handling logic - self.mock_mmcv.parallel._functions._get_stream.assert_called() - self.mock_mmcv.parallel._functions.scatter.assert_called_once() - self.mock_mmcv.parallel._functions.synchronize_stream.assert_called_once() - - # Verify output format - self.assertEqual(len(result), len(target_gpus)) - self.assertIsInstance(result, tuple) + with self.assertRaises(AttributeError): + mock_mmcv.ops = EmptyAttribute + mmcv_patch.mdc(mock_mmcv, {}) - def test_new_forward_non_neg_input_device(self): - """Test behavior when input device is not -1""" - mmengine_patch.stream(self.mock_mmcv, {}) - - # Set input device to non-negative value - self.mock_mmcv.parallel._functions.get_input_device.return_value = 0 - test_input = MagicMock(spec=torch.Tensor) - target_gpus = [0, 1] - - # Execute new forward method - result = self.mock_mmcv.parallel._functions.Scatter.forward.__func__(target_gpus, test_input) - - # Verify no stream handling occurs - self.mock_mmcv.parallel._functions._get_stream.assert_not_called() - self.mock_mmcv.parallel._functions.scatter.assert_called_once() - self.mock_mmcv.parallel._functions.synchronize_stream.assert_not_called() - self.assertIsInstance(result, tuple) - - def test_new_forward_list_input(self): - """Test handling of list input""" - mmengine_patch.stream(self.mock_mmcv, {}) - - # Create list input - test_input = [torch.tensor([1]), torch.tensor([2])] - target_gpus = [0, 1] - - # Execute new forward method - result = self.mock_mmcv.parallel._functions.Scatter.forward.__func__(target_gpus, test_input) - - # Verify processing logic - self.mock_mmcv.parallel._functions.get_input_device.assert_called_once() - self.mock_mmcv.parallel._functions.scatter.assert_called_once() - self.assertIsInstance(result, tuple) + +class TestResolveMMCV_VersionConflict(TestCase): + def test_mmcv_version_found(self): + """Test successful import of mmcv and version patching""" + with patch('importlib.import_module') as mock_import: + # Mock mmcv module + mock_mmcv = MagicMock() + mock_mmcv.__version__ = "1.7.2" + mock_import.return_value = mock_mmcv + + # Call patching function + mmcv_patch.patch_mmcv_version("2.1.0") + + # Assert version restoration + self.assertEqual(mock_mmcv.__version__, "1.7.2", "Version should be restored to original") + + # Assert import attempts + mock_import.assert_any_call("mmdet") + mock_import.assert_any_call("mmdet3d") + + def test_mmcv_version_not_found(self): + """Test handling when mmcv cannot be imported""" + with patch('importlib.import_module') as mock_import: + mock_import.side_effect = ImportError + # Assert no exception raised + mmcv_patch.patch_mmcv_version("666.888.2333") + mock_import.assert_called_once_with("mmcv") + + def test_no_version_conflict(self): + with patch('importlib.import_module') as mock_import: + # Mock mmcv module + mock_mmcv = MagicMock() + mock_mmcv.__version__ = "1.7.2" + mock_import.return_value = mock_mmcv + + # Call patching function + mmcv_patch.patch_mmcv_version("1.7.2") + + # Assert version restoration + self.assertEqual(mock_mmcv.__version__, "1.7.2", "Version should be restored to original") if __name__ == '__main__': diff --git a/tests/torch/test_patcher_mmdet.py b/tests/torch/test_patcher_mmdet.py index 8478528d..b8f91077 100644 --- a/tests/torch/test_patcher_mmdet.py +++ b/tests/torch/test_patcher_mmdet.py @@ -13,7 +13,11 @@ def assertIsNotInstance(obj, cls): assert not isinstance(obj, cls), f"Expected {repr(obj)} to NOT be an instance of {cls.__name__}" -class TestPseudoSamplerPatcher(TestCase): +class EmptyAttribute: + pass + + +class TestPseudoSamplerPatch(TestCase): def setUp(self): # mmdet.core.bbox.samplers.pseudo_sampler @@ -67,8 +71,14 @@ class TestPseudoSamplerPatcher(TestCase): self.assertRtolEqual(pos_inds, torch.tensor([True, False, True])) self.assertRtolEqual(neg_inds, torch.tensor([False, True, False])) + def test_patch_failure(self): + mock_mmdet = MagicMock() + with self.assertRaises(AttributeError): + mock_mmdet.core.bbox.samplers.pseudo_sampler.PseudoSampler = EmptyAttribute + pseudo_sampler(mock_mmdet, {}) + -class TestResNetAddReLU(TestCase): +class TestResNetAddReLUPatch(TestCase): def setUp(self): # mmdet.models.backbones.resnet self.mock_mmdet = types.ModuleType('mmdet') @@ -179,8 +189,14 @@ class TestResNetAddReLU(TestCase): result = block(x) self.assertRtolEqual(result, torch.tensor([2.0]).npu()) + def test_patch_failure(self): + mock_mmdet = MagicMock() + with self.assertRaises(AttributeError): + mock_mmdet.models.backbones.resnet.BasicBlock = EmptyAttribute + resnet_add_relu(mock_mmdet, {}) -class TestResNetMaxPool(TestCase): + +class TestResNetMaxPoolPatch(TestCase): def setUp(self): # mmdet.models.backbones.resnet self.mock_mmdet = types.ModuleType('mmdet') @@ -245,5 +261,12 @@ class TestResNetMaxPool(TestCase): result = model(x) self.assertEqual(len(result), 1) # verify only output 1 layer + def test_patch_failure(self): + mock_mmdet = MagicMock() + with self.assertRaises(AttributeError): + mock_mmdet.models.backbones.resnet.ResNet = EmptyAttribute + resnet_maxpool(mock_mmdet, {}) + + if __name__ == "__main__": run_tests() \ No newline at end of file diff --git a/tests/torch/test_patcher_mmengine.py b/tests/torch/test_patcher_mmengine.py index b6cf179b..e5eb05cd 100644 --- a/tests/torch/test_patcher_mmengine.py +++ b/tests/torch/test_patcher_mmengine.py @@ -9,8 +9,21 @@ import torch import torch_npu from torch_npu.testing.testcase import TestCase, run_tests from mx_driving.patcher import optimizer_hooks, optimizer_wrapper +from mx_driving.patcher import mmcv_patch, mmengine_patch -# MMCV 1.x modules = MMCV 2.x moudles + MMEngine modules, therefore, some module tested here correponds to mmcv instead of mmengine + +''' + MMCV 1.x modules = MMCV 2.x moudles + MMEngine modules, + therefore, some module tested here correponds to mmcv instead of mmengine +''' + + +def assertIsNotInstance(obj, cls): + assert not isinstance(obj, cls), f"Expected {repr(obj)} to NOT be an instance of {cls.__name__}" + + +class EmptyAttribute: + pass # For mmcv 1.x @@ -110,7 +123,7 @@ class TestOptimizerHooks(TestCase): runner.optimizer.zero_grad.assert_called_once() runner.outputs['loss'].backward.assert_called_once() runner.optimizer.step.assert_called_once() - + def test_oh_anomaly_detection(self): hook = self.OptimizerHook(detect_anomalous_params=True) runner = MagicMock() @@ -141,7 +154,7 @@ class TestOptimizerHooks(TestCase): # Cover dectection called by after_train_iter hook.after_train_iter(runner) runner.logger.log.assert_called() - + # ------Test for GradientCumulativeOptimizerHook(OptimizerHook)--------# def test_gch_constructor_exceptions(self): @@ -150,7 +163,7 @@ class TestOptimizerHooks(TestCase): with unittest.TestCase.assertRaises(self, ValueError): self.GradientCumulativeHook(cumulative_iters=-1) - + def test_gch_has_batch(self): hook = self.GradientCumulativeHook(cumulative_iters=1) @@ -172,7 +185,7 @@ class TestOptimizerHooks(TestCase): no_bn_module = MagicMock() no_bn_module.children.return_value = [MagicMock(), MagicMock()] self.assertFalse(hook.has_batch_norm(no_bn_module)) - + def test_gch_init(self): hook = self.GradientCumulativeHook(cumulative_iters=5) @@ -186,7 +199,7 @@ class TestOptimizerHooks(TestCase): hook._init(runner) self.assertEqual(hook.divisible_iters, 5) self.assertEqual(hook.remainder_iters, 3) - + def test_gch_loss_factor(self): hook = self.GradientCumulativeHook(cumulative_iters=5) runner = MagicMock() @@ -207,7 +220,7 @@ class TestOptimizerHooks(TestCase): hook.remainder_iters = -1 with unittest.TestCase.assertRaises(self, ValueError): hook._get_loss_factor(runner) - + def test_gch_after_train_iter(self): hook = self.GradientCumulativeHook(cumulative_iters=4, grad_clip={'max_norm': 10}) runner = MagicMock() @@ -225,7 +238,7 @@ class TestOptimizerHooks(TestCase): runner.outputs['loss'].__truediv__.assert_called_once() runner.optimizer.step.assert_called_once() runner.optimizer.zero_grad.assert_called_once() - + # ---------------Test for Fp16OptimizerHook(OptimizerHook)-----------------# def test_f16h_constructor(self): # Test dynamic loss scale @@ -287,7 +300,7 @@ class TestOptimizerHooks(TestCase): self.assertIn('fp16', runner.meta) self.assertIn('loss_scaler', runner.meta['fp16']) hook.loss_scaler.state_dict.assert_called_once() - + def test_f16h_grad_clipping(self): # Test gradient clipping in FP16 mix precision optimizier hook hook = self.Fp16Hook(grad_clip={'max_norm': 10}) @@ -301,7 +314,7 @@ class TestOptimizerHooks(TestCase): hook.loss_scaler.unscale_.assert_called_once() runner.optimizer.clip_grad_norm_fused_.assert_called_once() - + def test_f16h_copy_grads_to_fp32(self, device="npu"): hook = self.Fp16Hook(loss_scale=512.0) @@ -329,7 +342,7 @@ class TestOptimizerHooks(TestCase): hook.copy_params_to_fp16(fp16_net, [fp32_weight]) self.assertRtolEqual(fp16_param, fp32_weight.half()) - + # ---Test for GradientCumulativeFp16OptimizerHook(GradientCumulativeOptimizerHook, Fp16OptimizerHook)---# def test_gcf16h_after_train_iter(self): hook = self.GradientCumulativeFp16Hook(cumulative_iters=3) @@ -344,8 +357,8 @@ class TestOptimizerHooks(TestCase): hook.loss_scaler.step.assert_called_once() runner.model.zero_grad.assert_called_once() runner.optimizer.zero_grad.assert_called_once() - - + + class TestOptimizerWrapperPatch(TestCase): def test_optimizer_wrapper_patch(self): # Create mock @@ -379,5 +392,660 @@ class TestOptimizerWrapperPatch(TestCase): wrapper.clip_grads(params='params', max_norm=10) mock_optimizer.clip_grad_norm_fused_.assert_called_once_with(max_norm=10) + def test_patch_failure(self): + mock_mmengine = MagicMock() + with self.assertRaises(AttributeError): + mock_mmengine.optim.optimizer.optimizer_wrapper = EmptyAttribute + mmengine_patch.optimizer_wrapper(mock_mmengine, {}) + + +class TestStreamPatch(TestCase): + def setUp(self): + # Create mock mmcvparallel module + self.mock_mmcv = types.ModuleType('mmcv') + self.mock_mmcv.parallel = types.ModuleType('mmcvparallel') + self.mock_mmcv.parallel._functions = types.ModuleType('_functions') + self.mock_mmcv.parallel._functions.Scatter = MagicMock() + + # Add the missing attributes for torch + self.mock_mmcv.parallel._functions.torch = types.ModuleType('torch') + self.mock_mmcv.parallel._functions.torch.device = torch.device + + # Set up necessary functions and types + self.mock_mmcv.parallel._functions.get_input_device = MagicMock() + self.mock_mmcv.parallel._functions.scatter = MagicMock() + self.mock_mmcv.parallel._functions.synchronize_stream = MagicMock() + self.mock_mmcv.parallel._functions._get_stream = MagicMock() + self.mock_mmcv.parallel._functions.Tensor = torch.Tensor + + # Set default return values + self.mock_mmcv.parallel._functions.get_input_device.return_value = -1 + self.mock_mmcv.parallel._functions.scatter.return_value = ["scatter_output"] + + # Dynamically return target # of gpu + def scatter_mock(input_, target_gpus, streams=None): + return [f"output_{i}" for i in range(len(target_gpus))] + + self.mock_mmcv.parallel._functions.scatter = MagicMock(side_effect=scatter_mock) + + def test_monkeypatch(self): + """Verify forward method is correctly replaced""" + options = {} + + # Apply monkeypatch using stream function + mmengine_patch.stream(self.mock_mmcv, options) + + # Verify Scatter.forward has been replaced with new_forward + assertIsNotInstance(self.mock_mmcv.parallel._functions.Scatter.forward, MagicMock) + + def test_patch_failure(self): + _mock_mmcv = MagicMock() + with self.assertRaises(AttributeError): + _mock_mmcv.parallel._functions.Scatter = EmptyAttribute + mmengine_patch.stream(_mock_mmcv, {}) + + def test_new_forward_input_device_neg_one(self): + """Test stream behavior when input device is -1 and target GPUs are not [-1]""" + mmengine_patch.stream(self.mock_mmcv, {}) + + # Create mock input + test_input = MagicMock(spec=torch.Tensor) + target_gpus = [0, 1] + + # Execute new forward method + result = self.mock_mmcv.parallel._functions.Scatter.forward.__func__(target_gpus, test_input) + + # Verify stream handling logic + self.mock_mmcv.parallel._functions._get_stream.assert_called() + self.mock_mmcv.parallel._functions.scatter.assert_called_once() + self.mock_mmcv.parallel._functions.synchronize_stream.assert_called_once() + + # Verify output format + self.assertEqual(len(result), len(target_gpus)) + self.assertIsInstance(result, tuple) + + def test_new_forward_non_neg_input_device(self): + """Test behavior when input device is not -1""" + mmengine_patch.stream(self.mock_mmcv, {}) + + # Set input device to non-negative value + self.mock_mmcv.parallel._functions.get_input_device.return_value = 0 + test_input = MagicMock(spec=torch.Tensor) + target_gpus = [0, 1] + + # Execute new forward method + result = self.mock_mmcv.parallel._functions.Scatter.forward.__func__(target_gpus, test_input) + + # Verify no stream handling occurs + self.mock_mmcv.parallel._functions._get_stream.assert_not_called() + self.mock_mmcv.parallel._functions.scatter.assert_called_once() + self.mock_mmcv.parallel._functions.synchronize_stream.assert_not_called() + self.assertIsInstance(result, tuple) + + def test_new_forward_list_input(self): + """Test handling of list input""" + mmengine_patch.stream(self.mock_mmcv, {}) + + # Create list input + test_input = [torch.tensor([1]), torch.tensor([2])] + target_gpus = [0, 1] + + # Execute new forward method + result = self.mock_mmcv.parallel._functions.Scatter.forward.__func__(target_gpus, test_input) + + # Verify processing logic + self.mock_mmcv.parallel._functions.get_input_device.assert_called_once() + self.mock_mmcv.parallel._functions.scatter.assert_called_once() + self.assertIsInstance(result, tuple) + + +class TestDdpPatch(TestCase): + def test_monkey_patch(self): + mock_mmcv = MagicMock() + mock_mmcv.device.npu.NPUDistributedDataParallel = types.ModuleType('npuddp') + mmddp_b4replacement = mock_mmcv.parallel.distributed.MMDistributedDataParallel + + # Apply monkey patch + mmengine_patch.ddp(mock_mmcv, {}) + assertIsNotInstance(mmddp_b4replacement._run_ddp_forward, MagicMock) + assertIsNotInstance(mock_mmcv.parallel.distributed.MMDistributedDataParallel, MagicMock) + + def test_patch_failure(self): + mock_mmcv = MagicMock() + + with self.assertRaises(AttributeError): + mock_mmcv.parallel.distributed.MMDistributedDataParallel = EmptyAttribute + mmengine_patch.ddp(mock_mmcv, {}) + + def test_with_device_ids(self): + mock_self = MagicMock() + + mock_self.device_ids = [0] + + mock_inputs = [('input1', 'input2')] + mock_kwargs = [{'key1': 'value1'}] + mock_self.to_kwargs.return_value = (mock_inputs, mock_kwargs) + + expected_result = "mock_result" + mock_self.module.return_value = expected_result + + test_inputs = ('arg1', 'arg2') + test_kwargs = {'kwarg1': 'value1'} + + mock_mmcv = MagicMock() + + # In actual MMCV library, npuddp extends from the mmddp class, here setting them as the same + mock_mmcv.device.npu.NPUDistributedDataParallel = mock_mmcv.parallel.distributed.MMDistributedDataParallel + + mmengine_patch.ddp(mock_mmcv, {}) + result = mock_mmcv.parallel.distributed.MMDistributedDataParallel._run_ddp_forward(mock_self, *test_inputs, **test_kwargs) + + mock_self.to_kwargs.assert_called_once_with(test_inputs, test_kwargs, 0) + + self.assertEqual(result, expected_result) + + def test_without_device_ids(self): + mock_self = MagicMock() + + mock_self.device_ids = None + + expected_result = "mock_result_no_devices" + mock_self.module.return_value = expected_result + + test_inputs = ('arg1', 'arg2') + test_kwargs = {'kwarg1': 'value1'} + + mock_mmcv = MagicMock() + + # In actual MMCV library, npuddp extends from the mmddp class, here setting them as the same + mock_mmcv.device.npu.NPUDistributedDataParallel = mock_mmcv.parallel.distributed.MMDistributedDataParallel + + + mmengine_patch.ddp(mock_mmcv, {}) + result = mock_mmcv.parallel.distributed.MMDistributedDataParallel._run_ddp_forward(mock_self, *test_inputs, **test_kwargs) + + mock_self.to_kwargs.assert_not_called() + + mock_self.module.assert_called_once_with(*test_inputs, **test_kwargs) + + self.assertEqual(result, expected_result) + + +class TestProfiler(TestCase): + def test_parse_profiler_options_level0(self): + options = {} + options['profiling_path'] = "/xxx/yyy/level0" + options['profiling_level'] = 0 + path, level, activities, profiler_level = mmengine_patch._parse_profiler_options(options) + self.assertEqual(path, options['profiling_path']) + self.assertEqual(level, options['profiling_level']) + self.assertIn(torch_npu.profiler.ProfilerActivity.NPU, activities) + self.assertNotIn(torch_npu.profiler.ProfilerActivity.CPU, activities) + + def test_parse_profiler_options_level1(self): + options = {} + options['profiling_path'] = "/xxx/yyy/level1" + options['profiling_level'] = 1 + path, level, activities, profiler_level = mmengine_patch._parse_profiler_options(options) + self.assertEqual(path, options['profiling_path']) + self.assertEqual(level, options['profiling_level']) + self.assertIn(torch_npu.profiler.ProfilerActivity.NPU, activities) + self.assertIn(torch_npu.profiler.ProfilerActivity.CPU, activities) + + def test_parse_profiler_options_level2(self): + options = {} + options['profiling_path'] = "/xxx/yyy/level2" + options['profiling_level'] = 2 + path, level, activities, profiler_level = mmengine_patch._parse_profiler_options(options) + self.assertEqual(path, options['profiling_path']) + self.assertEqual(level, options['profiling_level']) + self.assertIn(torch_npu.profiler.ProfilerActivity.NPU, activities) + self.assertIn(torch_npu.profiler.ProfilerActivity.CPU, activities) + + def _execute_mock_profiler(self, profiler_level, dataloader_len, max_iter, extra_args=None, initial_iter=0): + if extra_args is None: + val_begin = 0 + val_interval = 1 + workflow = [('train', 1)] + batch_size = 1 + else: + val_begin, val_interval, workflow, batch_size = extra_args + + mock_mmcv = MagicMock() + mock_mmcv.runner.EpochBasedRunner._iter = initial_iter + mmengine_patch.epoch_runner(mock_mmcv, {'enable_profiler': True, + 'enable_brake': True, + 'brake_step': 1000, + 'profiling_path': "/.../profiling_path/", + 'profiling_level': profiler_level}) + with patch('torch_npu.profiler.profile') as mock_profiler, \ + patch('torch_npu.profiler.schedule') as mock_schedule, \ + patch('torch_npu.profiler._ExperimentalConfig') as mock_exp_config, \ + patch('torch_npu.profiler.tensorboard_trace_handler') as mock_trace_handler: + # __enter__ invoked by "with torch_npu.profiler.profile(...) as prof" needs to return itself as "prof" + mock_profiler.return_value.__enter__.return_value = mock_profiler + + mock_dataloader = MagicMock() + mock_dataloader.__iter__.return_value = (torch.randn(3, 256, 256) for _ in range(dataloader_len)) + mock_dataloader.__len__.return_value = dataloader_len + + # execute EpochBasedRunner.train + mock_mmcv.runner.EpochBasedRunner.train(mock_mmcv.runner.EpochBasedRunner, mock_dataloader) + + epoch_runner_profiler = mock_profiler + + + mock_mmcv = MagicMock() + mock_mmcv.runner.EpochBasedTrainLoop._iter = initial_iter + mmengine_patch.epoch_train_loop(mock_mmcv, {'enable_profiler': True, + 'enable_brake': True, + 'brake_step': 1000, + 'profiling_path': "/.../profiling_path/", + 'profiling_level': profiler_level}) + with patch('torch_npu.profiler.profile') as mock_profiler, \ + patch('torch_npu.profiler.schedule') as mock_schedule, \ + patch('torch_npu.profiler._ExperimentalConfig') as mock_exp_config, \ + patch('torch_npu.profiler.tensorboard_trace_handler') as mock_trace_handler: + # __enter__ invoked by "with torch_npu.profiler.profile(...) as prof" needs to return itself as "prof" + mock_profiler.return_value.__enter__.return_value = mock_profiler + + mock_dataloader = MagicMock() + mock_dataloader.__iter__.return_value = (torch.randn(3, 256, 256) for _ in range(dataloader_len)) + mock_dataloader.__len__.return_value = dataloader_len + + mock_mmcv.runner.EpochBasedTrainLoop.dataloader = mock_dataloader + + # execute EpochBasedTrainLoop.run_epoch + mock_mmcv.runner.EpochBasedTrainLoop.run_epoch(mock_mmcv.runner.EpochBasedTrainLoop) + + epoch_trainloop_profiler = mock_profiler + + + mock_mmcv = MagicMock() + mmengine_patch.iter_train_loop(mock_mmcv, {'enable_profiler': True, + 'enable_brake': True, + 'brake_step': 1001, + 'profiling_path': "/.../profiling_path/", + 'profiling_level': profiler_level}) + with patch('torch_npu.profiler.profile') as mock_profiler, \ + patch('torch_npu.profiler.schedule') as mock_schedule, \ + patch('torch_npu.profiler._ExperimentalConfig') as mock_exp_config, \ + patch('torch_npu.profiler.tensorboard_trace_handler') as mock_trace_handler: + # __enter__ invoked by "with torch_npu.profiler.profile(...) as prof" needs to return itself as "prof" + mock_profiler.return_value.__enter__.return_value = mock_profiler + + # execute IterBasedTrainLoop.run (corresponds to "run" function in patcher's profiler.py) + mock_mmcv.runner.IterBasedTrainLoop._iter = initial_iter + mock_mmcv.runner.IterBasedTrainLoop._max_iters = max_iter + mock_mmcv.runner.IterBasedTrainLoop.stop_training = False + mock_mmcv.runner.IterBasedTrainLoop.val_loop = MagicMock() + mock_mmcv.runner.IterBasedTrainLoop.val_begin = max_iter + mock_mmcv.runner.IterBasedTrainLoop.val_interval = max_iter + + def mock_run_iter(self, data_batch): + self._iter += 1 + + mock_mmcv.runner.IterBasedTrainLoop.run_iter = types.MethodType(mock_run_iter, + mock_mmcv.runner.IterBasedTrainLoop) + mock_mmcv.runner.IterBasedTrainLoop.run(mock_mmcv.runner.IterBasedTrainLoop) + + iter_trainloop_profiler = mock_profiler + + + mock_mmcv = MagicMock() + mmengine_patch.iter_runner(mock_mmcv, {'enable_profiler': True, + 'enable_brake': True, + 'brake_step': 1001, + 'profiling_path': "/.../profiling_path/", + 'profiling_level': profiler_level}) + with patch('torch_npu.profiler.profile') as mock_profiler, \ + patch('torch_npu.profiler.schedule') as mock_schedule, \ + patch('torch_npu.profiler._ExperimentalConfig') as mock_exp_config, \ + patch('torch_npu.profiler.tensorboard_trace_handler') as mock_trace_handler: + # __enter__ invoked by "with torch_npu.profiler.profile(...) as prof" needs to return itself as "prof" + mock_profiler.return_value.__enter__.return_value = mock_profiler + + mock_dataloader = MagicMock() + mock_dataloader.__iter__.return_value = (torch.randn(3, 256, 256) for _ in range(dataloader_len)) + mock_dataloader.__len__.return_value = dataloader_len + + # execute IterBasedRunner.run (corresponds to "run_iter" function in patcher's profiler.py) + mock_mmcv.runner.IterBasedRunner.iter = initial_iter + mock_mmcv.runner.IterBasedRunner._max_iters = max_iter + + # run actually calls iter_runner, which came from getattr of mode param input as either train or val + def mock_train(self, data_loader, **kwargs): + self.iter += 1 + + mock_mmcv.runner.IterBasedRunner.train = types.MethodType(mock_train, mock_mmcv.runner.IterBasedRunner) + + def mock_val(self, data_loader, **kwargs): + self.iter += 1 + + mock_mmcv.runner.IterBasedRunner.val = types.MethodType(mock_val, mock_mmcv.runner.IterBasedRunner) + mock_mmcv.runner.IterBasedRunner.run(mock_mmcv.runner.IterBasedRunner, + [mock_dataloader] * len(workflow), + workflow) + + iter_runner_profiler = mock_profiler + + return (epoch_runner_profiler, epoch_trainloop_profiler, iter_trainloop_profiler, iter_runner_profiler) + + def test_profiler_and_brake(self): + with unittest.TestCase.assertRaises(self, SystemExit) as se: + self._execute_mock_profiler(profiler_level=1, dataloader_len=1001, max_iter=1001, initial_iter=1000) + + def test_level0_configs(self): + level = 0 + golden_level0_configs = { + 'activities': [torch_npu.profiler.ProfilerActivity.NPU], + 'with_stack': False, + 'record_shapes': False, + 'profile_memory': False, + 'schedule': ANY, + 'experimental_config': ANY, + 'on_trace_ready': ANY + } + + epoch_runner_profiler, epoch_trainloop_profiler, iter_trainloop_profiler, iter_runner_profiler = \ + self._execute_mock_profiler(profiler_level=level, dataloader_len=1, max_iter=1) + + # check profiler configs + golden_configs = golden_level0_configs + epoch_runner_profiler.assert_called_with(**golden_configs) + epoch_trainloop_profiler.assert_called_with(**golden_configs) + iter_trainloop_profiler.assert_called_with(**golden_configs) + iter_runner_profiler.assert_called_with(**golden_configs) + # check step() getting called + epoch_runner_profiler.step.assert_called_once() + epoch_trainloop_profiler.step.assert_called_once() + iter_trainloop_profiler.step.assert_called_once() + iter_runner_profiler.step.assert_called_once() + + def test_level1_configs(self): + level = 1 + golden_level1_configs = { + 'activities': [torch_npu.profiler.ProfilerActivity.NPU, + torch_npu.profiler.ProfilerActivity.CPU], + 'with_stack': False, + 'record_shapes': True, + 'profile_memory': False, + 'schedule': ANY, + 'experimental_config': ANY, + 'on_trace_ready': ANY + } + + epoch_runner_profiler, epoch_trainloop_profiler, iter_trainloop_profiler, iter_runner_profiler = \ + self._execute_mock_profiler(profiler_level=level, dataloader_len=1, max_iter=1) + + # check profiler configs + golden_configs = golden_level1_configs + epoch_runner_profiler.assert_called_with(**golden_configs) + epoch_trainloop_profiler.assert_called_with(**golden_configs) + iter_trainloop_profiler.assert_called_with(**golden_configs) + iter_runner_profiler.assert_called_with(**golden_configs) + # check step() getting called + epoch_runner_profiler.step.assert_called_once() + epoch_trainloop_profiler.step.assert_called_once() + iter_trainloop_profiler.step.assert_called_once() + iter_runner_profiler.step.assert_called_once() + + def test_level2_configs(self): + level = 2 + golden_level2_configs = { + 'activities': [torch_npu.profiler.ProfilerActivity.NPU, + torch_npu.profiler.ProfilerActivity.CPU], + 'with_stack': True, + 'record_shapes': True, + 'profile_memory': True, + 'schedule': ANY, + 'experimental_config': ANY, + 'on_trace_ready': ANY + } + + epoch_runner_profiler, epoch_trainloop_profiler, iter_trainloop_profiler, iter_runner_profiler = \ + self._execute_mock_profiler(profiler_level=level, dataloader_len=1, max_iter=1) + + # check profiler configs + golden_configs = golden_level2_configs + epoch_runner_profiler.assert_called_with(**golden_configs) + epoch_trainloop_profiler.assert_called_with(**golden_configs) + iter_trainloop_profiler.assert_called_with(**golden_configs) + iter_runner_profiler.assert_called_with(**golden_configs) + # check step() getting called + epoch_runner_profiler.step.assert_called_once() + epoch_trainloop_profiler.step.assert_called_once() + iter_trainloop_profiler.step.assert_called_once() + iter_runner_profiler.step.assert_called_once() + + def test_rand_len_profiling(self): + num_databatch = random.randint(1, 10) + num_iters = random.randint(1, 10) + level = random.randint(0, 2) + + epoch_runner_profiler, epoch_trainloop_profiler, iter_trainloop_profiler, iter_runner_profiler = \ + self._execute_mock_profiler(profiler_level=level, dataloader_len=num_databatch, max_iter=num_iters) + + assert epoch_runner_profiler.step.call_count == num_databatch + assert epoch_trainloop_profiler.step.call_count == num_databatch + assert iter_trainloop_profiler.step.call_count == num_iters + assert iter_runner_profiler.step.call_count == num_iters + + def test_iter_runner_val_loop(self): + num_databatch = 50 + num_iters = 100 + level = random.randint(0, 2) + + val_size = 10 + val_begin = 20 + val_interval = 50 + workflow = [('train', 50), ('val', val_size), ('train', 50)] + batch_size = 1 + extra_args = (val_begin, val_interval, workflow, batch_size) + + epoch_runner_profiler, epoch_trainloop_profiler, iter_trainloop_profiler, iter_runner_profiler = \ + self._execute_mock_profiler(profiler_level=level, dataloader_len=num_databatch, max_iter=num_iters, + extra_args=extra_args) + + assert iter_trainloop_profiler.step.call_count == num_iters + assert iter_runner_profiler.step.call_count == num_iters + + def test_workflow_exception(self): + num_databatch = 1 + num_iters = 10 + level = random.randint(0, 2) + + val_begin = num_iters + val_interval = num_iters + workflow = [('train', 5), (666, 5)] + batch_size = 1 + extra_args = (val_begin, val_interval, workflow, batch_size) + + with unittest.TestCase.assertRaises(self, ValueError): + self._execute_mock_profiler(profiler_level=level, dataloader_len=num_databatch, max_iter=num_iters, + extra_args=extra_args) + + def test_max_iters_warning(self): + mock_mmcv = MagicMock() + mmengine_patch.iter_runner(mock_mmcv, {'enable_profiler': True, 'enable_brake': False, + 'profiling_path': "/.../profiling_path/", + 'profiling_level': random.randint(0, 2)}) + with patch('torch_npu.profiler.profile') as mock_profiler, \ + patch('torch_npu.profiler.schedule') as mock_schedule, \ + patch('torch_npu.profiler._ExperimentalConfig') as mock_exp_config, \ + patch('torch_npu.profiler.tensorboard_trace_handler') as mock_trace_handler: + + mock_mmcv.runner.IterBasedRunner.iter = 0 + mock_mmcv.runner.IterBasedRunner._max_iters = 0 + + def mock_train(self, data_loader, **kwargs): + self.iter += 1 + + mock_mmcv.runner.IterBasedRunner.train = types.MethodType(mock_train, mock_mmcv.runner.IterBasedRunner) + + def mock_val(self, data_loader, **kwargs): + self.iter += 1 + + mock_mmcv.runner.IterBasedRunner.val = types.MethodType(mock_val, mock_mmcv.runner.IterBasedRunner) + + dataloader_len = 5 + mock_dataloader = MagicMock() + mock_dataloader.__iter__.return_value = (torch.randn(3, 256, 256) for _ in range(dataloader_len)) + mock_dataloader.__len__.return_value = dataloader_len + + with unittest.TestCase.assertWarns(self, DeprecationWarning): + mock_mmcv.runner.IterBasedRunner.run(mock_mmcv.runner.IterBasedRunner, + [mock_dataloader], [('train', 5)], 666) + + def test_patch_failure(self): + mock_mmcv = MagicMock() + with self.assertRaises(AttributeError): + mock_mmcv.runner.EpochBasedRunner = EmptyAttribute + mmengine_patch.epoch_runner(mock_mmcv, {'enable_profiler': True, 'enable_brake': False, + 'profiling_path': "/.../profiling_path/", + 'profiling_level': random.randint(0, 2)}) + + + mock_mmcv = MagicMock() + with self.assertRaises(AttributeError): + mock_mmcv.runner.IterBasedRunner = EmptyAttribute + mmengine_patch.iter_runner(mock_mmcv, {'enable_profiler': True, 'enable_brake': False, + 'profiling_path': "/.../profiling_path/", + 'profiling_level': random.randint(0, 2)}) + + + mock_mmengine = MagicMock() + with self.assertRaises(AttributeError): + mock_mmengine.runner.EpochBasedTrainLoop = EmptyAttribute + mmengine_patch.epoch_train_loop(mock_mmengine, {'enable_profiler': True, 'enable_brake': False, + 'profiling_path': "/.../profiling_path/", + 'profiling_level': random.randint(0, 2)}) + + + mock_mmengine = MagicMock() + with self.assertRaises(AttributeError): + mock_mmengine.runner.IterBasedTrainLoop = EmptyAttribute + mmengine_patch.iter_train_loop(mock_mmengine, {'enable_profiler': True, 'enable_brake': False, + 'profiling_path': "/.../profiling_path/", + 'profiling_level': random.randint(0, 2)}) + + +class TestBrake(TestCase): + def test_epoch_runner_brake(self): + mock_mmcv = MagicMock() + + # apply monkeypatch, brake at 1000th step + mmengine_patch.epoch_runner(mock_mmcv, {'enable_profiler': False, + 'enable_brake': True, + 'brake_step': 1000}) + + dataloader_len = 2000 + mock_dataloader = MagicMock() + mock_dataloader.__iter__.return_value = (torch.randn(3, 256, 256) for _ in range(dataloader_len)) + mock_dataloader.__len__.return_value = dataloader_len + + mock_mmcv.runner.EpochBasedRunner._iter = 0 + + # execute EpochBasedRunner.train and catch brake exit(0) + with unittest.TestCase.assertRaises(self, SystemExit) as se: + mock_mmcv.runner.EpochBasedRunner.train(mock_mmcv.runner.EpochBasedRunner, mock_dataloader) + self.assertEqual(se.exception.code, 0) + self.assertEqual(mock_mmcv.runner.EpochBasedRunner._iter, 1000) + + def test_epoch_train_loop_brake(self): + mock_mmcv = MagicMock() + + # apply monkeypatch, brake at 1000th step + mmengine_patch.epoch_train_loop(mock_mmcv, {'enable_profiler': False, + 'enable_brake': True, + 'brake_step': 1000}) + + dataloader_len = 2000 + mock_dataloader = MagicMock() + mock_dataloader.__iter__.return_value = (torch.randn(3, 256, 256) for _ in range(dataloader_len)) + mock_dataloader.__len__.return_value = dataloader_len + + mock_mmcv.runner.EpochBasedTrainLoop.dataloader = mock_dataloader + mock_mmcv.runner.EpochBasedTrainLoop._iter = 0 + + def mock_run_iter(self, idx, data_batch): + self._iter += 1 + + mock_mmcv.runner.EpochBasedTrainLoop.run_iter = types.MethodType(mock_run_iter, + mock_mmcv.runner.EpochBasedTrainLoop) + + # execute EpochBasedTrainLoop.run_epoch + with unittest.TestCase.assertRaises(self, SystemExit) as se: + mock_mmcv.runner.EpochBasedTrainLoop.run_epoch(mock_mmcv.runner.EpochBasedTrainLoop) + self.assertEqual(se.exception.code, 0) + self.assertEqual(mock_mmcv.runner.EpochBasedTrainLoop._iter, 1000) + + def test_iter_train_loop_brake(self): + mock_engine = MagicMock() + + # apply monkeypatch, brake at 1000th step + mmengine_patch.iter_train_loop(mock_engine, {'enable_profiler': False, + 'enable_brake': True, + 'brake_step': 1000}) + + mock_engine.runner.IterBasedTrainLoop._iter = 0 + mock_engine.runner.IterBasedTrainLoop._max_iters = 2000 + mock_engine.runner.IterBasedTrainLoop.stop_training = False + mock_engine.runner.IterBasedTrainLoop.val_loop = MagicMock() + mock_engine.runner.IterBasedTrainLoop.val_begin = 100 + mock_engine.runner.IterBasedTrainLoop.val_interval = 500 + + def mock_run_iter(self, data_batch): + self._iter += 1 + + mock_engine.runner.IterBasedTrainLoop.run_iter = types.MethodType(mock_run_iter, + mock_engine.runner.IterBasedTrainLoop) + + + # execute IterBasedTrainLoop.run (corresponds to "run" function in patcher's brake.py) + with unittest.TestCase.assertRaises(self, SystemExit) as se: + mock_engine.runner.IterBasedTrainLoop.run(mock_engine.runner.IterBasedTrainLoop) + self.assertEqual(se.exception.code, 0) + self.assertEqual(mock_engine.runner.IterBasedTrainLoop._iter, 1000) + + def test_iter_runner_brake(self): + mock_engine = MagicMock() + + # apply monkeypatch, brake at 1000th step + mmengine_patch.iter_runner(mock_engine, {'enable_profiler': False, + 'enable_brake': True, + 'brake_step': 1000}) + + workflow = [('train', 1000), ('val', 10), ('train', 1000)] + dataloader_len = 1000 + mock_dataloader = MagicMock() + mock_dataloader.__iter__.return_value = (torch.randn(3, 256, 256) for _ in range(dataloader_len)) + mock_dataloader.__len__.return_value = dataloader_len + + mock_engine.runner.IterBasedRunner.iter = 0 + mock_engine.runner.IterBasedRunner._iter = 0 + mock_engine.runner.IterBasedRunner._max_iters = 2010 + + def mock_train(self, data_loader, **kwargs): + self.iter += 1 + self._iter += 1 + + mock_engine.runner.IterBasedRunner.train = types.MethodType(mock_train, mock_engine.runner.IterBasedRunner) + + def mock_val(self, data_loader, **kwargs): + self.iter += 1 + self._iter += 1 + + mock_engine.runner.IterBasedRunner.val = types.MethodType(mock_val, mock_engine.runner.IterBasedRunner) + + # execute IterBasedRunner.run (corresponds to "run_iter" function in patcher's brake.py) + with unittest.TestCase.assertRaises(self, SystemExit) as se: + mock_engine.runner.IterBasedRunner.run(mock_engine.runner.IterBasedRunner, + [mock_dataloader] * len(workflow), + workflow) + self.assertEqual(se.exception.code, 0) + self.assertEqual(mock_engine.runner.IterBasedRunner._iter, 1000) + + if __name__ == '__main__': run_tests() \ No newline at end of file -- Gitee