#!/usr/bin/env python3 # # Functional test that check overcommit memlock options # # Copyright (c) Yandex Technologies LLC, 2025 # # Author: # Alexandr Moshkov # # SPDX-License-Identifier: GPL-2.0-or-later import re from typing import Dict from qemu_test import QemuSystemTest from qemu_test import skipLockedMemoryTest STATUS_VALUE_PATTERN = re.compile(r'^(\w+):\s+(\d+) kB', re.MULTILINE) @skipLockedMemoryTest(2_097_152) # 2GB class MemlockTest(QemuSystemTest): """ Runs a guest with memlock options. Then verify, that this options is working correctly by checking the status file of the QEMU process. """ def common_vm_setup_with_memlock(self, memlock): self.vm.add_args('-overcommit', f'mem-lock={memlock}') self.vm.launch() def test_memlock_off(self): self.common_vm_setup_with_memlock('off') status = self.get_process_status_values(self.vm.get_pid()) self.assertTrue(status['VmLck'] == 0) def test_memlock_on(self): self.common_vm_setup_with_memlock('on') status = self.get_process_status_values(self.vm.get_pid()) # VmLck > 0 kB and almost all memory is resident self.assertTrue(status['VmLck'] > 0) self.assertTrue(status['VmRSS'] >= status['VmSize'] * 0.70) def test_memlock_onfault(self): self.common_vm_setup_with_memlock('on-fault') status = self.get_process_status_values(self.vm.get_pid()) # VmLck > 0 kB and only few memory is resident self.assertTrue(status['VmLck'] > 0) self.assertTrue(status['VmRSS'] <= status['VmSize'] * 0.30) def get_process_status_values(self, pid: int) -> Dict[str, int]: result = {} raw_status = self._get_raw_process_status(pid) for line in raw_status.split('\n'): if m := STATUS_VALUE_PATTERN.match(line): result[m.group(1)] = int(m.group(2)) return result def _get_raw_process_status(self, pid: int) -> str: try: with open(f'/proc/{pid}/status', 'r') as f: return f.read() except FileNotFoundError: self.skipTest("Can't open status file of the process") if __name__ == '__main__': MemlockTest.main()