diff options
-rw-r--r-- | .github/workflows/pull_request.yml | 6 | ||||
-rw-r--r-- | Makefile | 6 | ||||
-rw-r--r-- | test/py/libvfio_user.py | 172 | ||||
-rw-r--r-- | test/py/test_device_get_info.py | 9 | ||||
-rw-r--r-- | test/py/test_device_get_irq_info.py | 6 | ||||
-rw-r--r-- | test/py/test_device_get_region_info.py | 16 | ||||
-rw-r--r-- | test/py/test_device_get_region_info_zero_size.py | 9 | ||||
-rw-r--r-- | test/py/test_device_get_region_io_fds.py | 124 | ||||
-rw-r--r-- | test/py/test_device_set_irqs.py | 31 | ||||
-rw-r--r-- | test/py/test_dirty_pages.py | 23 | ||||
-rw-r--r-- | test/py/test_dma_map.py | 9 | ||||
-rw-r--r-- | test/py/test_dma_unmap.py | 15 | ||||
-rw-r--r-- | test/py/test_irq_trigger.py | 9 | ||||
-rw-r--r-- | test/py/test_map_unmap_sg.py | 7 | ||||
-rw-r--r-- | test/py/test_migration.py | 3 | ||||
-rw-r--r-- | test/py/test_negotiate.py | 31 | ||||
-rw-r--r-- | test/py/test_pci_caps.py | 75 | ||||
-rw-r--r-- | test/py/test_pci_ext_caps.py | 28 | ||||
-rw-r--r-- | test/py/test_request_errors.py | 12 | ||||
-rw-r--r-- | test/py/test_setup_region.py | 38 | ||||
-rw-r--r-- | test/py/test_vfu_create_ctx.py | 12 | ||||
-rw-r--r-- | test/py/test_vfu_realize_ctx.py | 18 |
22 files changed, 454 insertions, 205 deletions
diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index c066619..b7ed789 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -10,7 +10,7 @@ jobs: run: | sudo apt-get update sudo apt-get -y install libjson-c-dev libcmocka-dev clang valgrind \ - python3-pytest debianutils + python3-pytest debianutils flake8 make pre-push VERBOSE=1 ubuntu-18: timeout-minutes: 5 @@ -19,6 +19,7 @@ jobs: - uses: actions/checkout@v2 - name: pre-push run: | + # NB: no working flake8 sudo apt update sudo apt-get -y install libjson-c-dev libcmocka-dev clang valgrind \ python3-pytest debianutils @@ -31,6 +32,7 @@ jobs: - uses: actions/checkout@v2 - name: pre-push run: | + # NB: no working flake8 yum -y install make gcc-4.8.5 epel-release pciutils yum -y install clang cmake json-c-devel libcmocka-devel \ openssl-devel valgrind python36-pytest which @@ -45,5 +47,5 @@ jobs: run: | dnf -y install --releasever=34 \ gcc make clang cmake json-c-devel libcmocka-devel openssl-devel \ - pciutils diffutils valgrind python3-pytest which + pciutils diffutils valgrind python3-pytest python3-flake8 which make pre-push VERBOSE=1 @@ -54,7 +54,8 @@ endif GIT_SHA = $(shell git rev-parse --short HEAD) CMAKE = $(shell bash -c "command -v cmake3 cmake" | head -1) -RSTLINT= $(shell bash -c "command -v restructuredtext-lint /bin/true" | head -1) +RSTLINT = $(shell bash -c "command -v restructuredtext-lint /bin/true" | head -1) +FLAKE8 = $(shell bash -c "command -v flake8 /bin/true" | head -1) BUILD_DIR_BASE = $(CURDIR)/build BUILD_DIR = $(BUILD_DIR_BASE)/$(BUILD_TYPE) @@ -65,7 +66,6 @@ INSTALL_PREFIX ?= /usr/local .PHONY: pre-push clean realclean tags gcov all install: $(BUILD_DIR)/Makefile - $(RSTLINT) docs/vfio-user.rst +$(MAKE) -C $(BUILD_DIR) $@ # @@ -126,6 +126,8 @@ endif cd $(BUILD_DIR)/test; ctest --verbose pre-push: realclean + $(RSTLINT) docs/vfio-user.rst + $(FLAKE8) --extend-ignore=F405,F403,E128,E131,E127 $$(find . -name '*.py') make test WITH_ASAN=1 make realclean make test CC=clang BUILD_TYPE=rel diff --git a/test/py/libvfio_user.py b/test/py/libvfio_user.py index 5cba324..77b3b67 100644 --- a/test/py/libvfio_user.py +++ b/test/py/libvfio_user.py @@ -31,7 +31,6 @@ # Note that we don't use enum here, as class.value is a little verbose # -from collections import namedtuple from types import SimpleNamespace import ctypes as c import array @@ -39,7 +38,6 @@ import errno import json import mmap import os -import pathlib import socket import struct import syslog @@ -101,7 +99,7 @@ VFIO_IRQ_SET_ACTION_TRIGGER = (1 << 5) VFIO_DMA_UNMAP_FLAG_ALL = (1 << 1) VFIO_DEVICE_STATE_STOP = (0) -VFIO_DEVICE_STATE_RUNNING = (1 << 0) +VFIO_DEVICE_STATE_RUNNING = (1 << 0) VFIO_DEVICE_STATE_SAVING = (1 << 1) VFIO_DEVICE_STATE_RESUMING = (1 << 2) VFIO_DEVICE_STATE_MASK = ((1 << 3) - 1) @@ -130,21 +128,21 @@ MAX_DMA_REGIONS = 16 MAX_DMA_SIZE = (8 * ONE_TB) # enum vfio_user_command -VFIO_USER_VERSION = 1 -VFIO_USER_DMA_MAP = 2 -VFIO_USER_DMA_UNMAP = 3 -VFIO_USER_DEVICE_GET_INFO = 4 -VFIO_USER_DEVICE_GET_REGION_INFO = 5 -VFIO_USER_DEVICE_GET_REGION_IO_FDS = 6 -VFIO_USER_DEVICE_GET_IRQ_INFO = 7 -VFIO_USER_DEVICE_SET_IRQS = 8 -VFIO_USER_REGION_READ = 9 -VFIO_USER_REGION_WRITE = 10 -VFIO_USER_DMA_READ = 11 -VFIO_USER_DMA_WRITE = 12 -VFIO_USER_DEVICE_RESET = 13 -VFIO_USER_DIRTY_PAGES = 14 -VFIO_USER_MAX = 15 +VFIO_USER_VERSION = 1 +VFIO_USER_DMA_MAP = 2 +VFIO_USER_DMA_UNMAP = 3 +VFIO_USER_DEVICE_GET_INFO = 4 +VFIO_USER_DEVICE_GET_REGION_INFO = 5 +VFIO_USER_DEVICE_GET_REGION_IO_FDS = 6 +VFIO_USER_DEVICE_GET_IRQ_INFO = 7 +VFIO_USER_DEVICE_SET_IRQS = 8 +VFIO_USER_REGION_READ = 9 +VFIO_USER_REGION_WRITE = 10 +VFIO_USER_DMA_READ = 11 +VFIO_USER_DMA_WRITE = 12 +VFIO_USER_DEVICE_RESET = 13 +VFIO_USER_DIRTY_PAGES = 14 +VFIO_USER_MAX = 15 VFIO_USER_F_TYPE_COMMAND = 0 VFIO_USER_F_TYPE_REPLY = 1 @@ -157,16 +155,16 @@ VFU_PCI_DEV_BAR2_REGION_IDX = 2 VFU_PCI_DEV_BAR3_REGION_IDX = 3 VFU_PCI_DEV_BAR4_REGION_IDX = 4 VFU_PCI_DEV_BAR5_REGION_IDX = 5 -VFU_PCI_DEV_ROM_REGION_IDX = 6 -VFU_PCI_DEV_CFG_REGION_IDX = 7 -VFU_PCI_DEV_VGA_REGION_IDX = 8 +VFU_PCI_DEV_ROM_REGION_IDX = 6 +VFU_PCI_DEV_CFG_REGION_IDX = 7 +VFU_PCI_DEV_VGA_REGION_IDX = 8 VFU_PCI_DEV_MIGR_REGION_IDX = 9 -VFU_PCI_DEV_NUM_REGIONS = 10 +VFU_PCI_DEV_NUM_REGIONS = 10 -VFU_REGION_FLAG_READ = 1 +VFU_REGION_FLAG_READ = 1 VFU_REGION_FLAG_WRITE = 2 VFU_REGION_FLAG_RW = (VFU_REGION_FLAG_READ | VFU_REGION_FLAG_WRITE) -VFU_REGION_FLAG_MEM = 4 +VFU_REGION_FLAG_MEM = 4 VFU_REGION_FLAG_ALWAYS_CB = 8 VFIO_USER_F_DMA_REGION_READ = (1 << 0) @@ -184,10 +182,10 @@ VFIO_USER_IO_FD_TYPE_IOREGIONFD = 1 # enum vfu_dev_irq_type VFU_DEV_INTX_IRQ = 0 -VFU_DEV_MSI_IRQ = 1 +VFU_DEV_MSI_IRQ = 1 VFU_DEV_MSIX_IRQ = 2 -VFU_DEV_ERR_IRQ = 3 -VFU_DEV_REQ_IRQ = 4 +VFU_DEV_ERR_IRQ = 3 +VFU_DEV_REQ_IRQ = 4 VFU_DEV_NUM_IRQS = 5 # enum vfu_reset_type @@ -197,9 +195,9 @@ VFU_RESET_PCI_FLR = 2 # vfu_pci_type_t VFU_PCI_TYPE_CONVENTIONAL = 0 -VFU_PCI_TYPE_PCI_X_1 = 1 -VFU_PCI_TYPE_PCI_X_2 = 2 -VFU_PCI_TYPE_EXPRESS = 3 +VFU_PCI_TYPE_PCI_X_1 = 1 +VFU_PCI_TYPE_PCI_X_2 = 2 +VFU_PCI_TYPE_EXPRESS = 3 VFU_CAP_FLAG_EXTENDED = (1 << 0) VFU_CAP_FLAG_CALLBACK = (1 << 1) @@ -219,6 +217,7 @@ libc = c.CDLL("libc.so.6", use_errno=True) # Structures # + class Structure(c.Structure): def __len__(self): """Handy method to return length in bytes.""" @@ -230,6 +229,7 @@ class Structure(c.Structure): obj = cls.from_buffer_copy(buf) return obj, buf[c.sizeof(obj):] + class vfu_bar_t(c.Union): _pack_ = 1 _fields_ = [ @@ -237,6 +237,7 @@ class vfu_bar_t(c.Union): ("io", c.c_int32) ] + class vfu_pci_hdr_intr_t(Structure): _pack_ = 1 _fields_ = [ @@ -244,6 +245,7 @@ class vfu_pci_hdr_intr_t(Structure): ("ipin", c.c_byte) ] + class vfu_pci_hdr_t(Structure): _pack_ = 1 _fields_ = [ @@ -269,12 +271,14 @@ class vfu_pci_hdr_t(Structure): ("mlat", c.c_byte) ] + class iovec_t(Structure): _fields_ = [ ("iov_base", c.c_void_p), ("iov_len", c.c_int32) ] + class vfio_irq_info(Structure): _pack_ = 1 _fields_ = [ @@ -284,6 +288,7 @@ class vfio_irq_info(Structure): ("count", c.c_uint32), ] + class vfio_irq_set(Structure): _pack_ = 1 _fields_ = [ @@ -294,6 +299,7 @@ class vfio_irq_set(Structure): ("count", c.c_uint32), ] + class vfio_user_device_info(Structure): _pack_ = 1 _fields_ = [ @@ -303,6 +309,7 @@ class vfio_user_device_info(Structure): ("num_irqs", c.c_uint32), ] + class vfio_region_info(Structure): _pack_ = 1 _fields_ = [ @@ -314,6 +321,7 @@ class vfio_region_info(Structure): ("offset", c.c_uint64), ] + class vfio_region_info_cap_type(Structure): _pack_ = 1 _fields_ = [ @@ -324,6 +332,7 @@ class vfio_region_info_cap_type(Structure): ("subtype", c.c_uint32), ] + class vfio_region_info_cap_sparse_mmap(Structure): _pack_ = 1 _fields_ = [ @@ -334,6 +343,7 @@ class vfio_region_info_cap_sparse_mmap(Structure): ("reserved", c.c_uint32), ] + class vfio_region_sparse_mmap_area(Structure): _pack_ = 1 _fields_ = [ @@ -341,6 +351,7 @@ class vfio_region_sparse_mmap_area(Structure): ("size", c.c_uint64), ] + class vfio_user_region_io_fds_request(Structure): _pack_ = 1 _fields_ = [ @@ -350,6 +361,7 @@ class vfio_user_region_io_fds_request(Structure): ("count", c.c_uint32) ] + class vfio_user_sub_region_ioeventfd(Structure): _pack_ = 1 _fields_ = [ @@ -362,6 +374,7 @@ class vfio_user_sub_region_ioeventfd(Structure): ("datamatch", c.c_uint64) ] + class vfio_user_sub_region_ioregionfd(Structure): _pack_ = 1 _fields_ = [ @@ -374,6 +387,7 @@ class vfio_user_sub_region_ioregionfd(Structure): ("user_data", c.c_uint64) ] + class vfio_user_sub_region_io_fd(c.Union): _pack_ = 1 _fields_ = [ @@ -381,6 +395,7 @@ class vfio_user_sub_region_io_fd(c.Union): ("sub_region_ioregionfd", vfio_user_sub_region_ioregionfd) ] + class vfio_user_region_io_fds_reply(Structure): _pack_ = 1 _fields_ = [ @@ -390,6 +405,7 @@ class vfio_user_region_io_fds_reply(Structure): ("count", c.c_uint32) ] + class vfio_user_dma_map(Structure): _pack_ = 1 _fields_ = [ @@ -400,6 +416,7 @@ class vfio_user_dma_map(Structure): ("size", c.c_uint64), ] + class vfio_user_dma_unmap(Structure): _pack_ = 1 _fields_ = [ @@ -409,6 +426,7 @@ class vfio_user_dma_unmap(Structure): ("size", c.c_uint64), ] + class vfu_dma_info_t(Structure): _fields_ = [ ("iova", iovec_t), @@ -418,6 +436,7 @@ class vfu_dma_info_t(Structure): ("prot", c.c_uint32) ] + class vfio_user_dirty_pages(Structure): _pack_ = 1 _fields_ = [ @@ -425,6 +444,7 @@ class vfio_user_dirty_pages(Structure): ("flags", c.c_uint32) ] + class vfio_user_bitmap(Structure): _pack_ = 1 _fields_ = [ @@ -432,6 +452,7 @@ class vfio_user_bitmap(Structure): ("size", c.c_uint64) ] + class vfio_user_bitmap_range(Structure): _pack_ = 1 _fields_ = [ @@ -440,6 +461,7 @@ class vfio_user_bitmap_range(Structure): ("bitmap", vfio_user_bitmap) ] + transition_cb_t = c.CFUNCTYPE(c.c_int, c.c_void_p, c.c_int, use_errno=True) get_pending_bytes_cb_t = c.CFUNCTYPE(c.c_uint64, c.c_void_p) prepare_data_cb_t = c.CFUNCTYPE(c.c_void_p, c.POINTER(c.c_uint64), @@ -449,6 +471,7 @@ read_data_cb_t = c.CFUNCTYPE(c.c_ssize_t, c.c_void_p, c.c_void_p, write_data_cb_t = c.CFUNCTYPE(c.c_ssize_t, c.c_void_p, c.c_uint64) data_written_cb_t = c.CFUNCTYPE(c.c_int, c.c_void_p, c.c_uint64) + class vfu_migration_callbacks_t(Structure): _fields_ = [ ("version", c.c_int), @@ -460,6 +483,7 @@ class vfu_migration_callbacks_t(Structure): ("data_written", data_written_cb_t), ] + class dma_sg_t(Structure): _fields_ = [ ("dma_addr", c.c_void_p), @@ -467,7 +491,7 @@ class dma_sg_t(Structure): ("length", c.c_uint64), ("offset", c.c_uint64), ("writeable", c.c_bool), - ("le_next", c.c_void_p), # FIXME add struct for LIST_ENTRY + ("le_next", c.c_void_p), ("le_prev", c.c_void_p), ] @@ -475,6 +499,7 @@ class dma_sg_t(Structure): # Util functions # + lib.vfu_create_ctx.argtypes = (c.c_int, c.c_char_p, c.c_int, c.c_void_p, c.c_int) lib.vfu_create_ctx.restype = (c.c_void_p) @@ -502,7 +527,8 @@ lib.vfu_pci_find_next_capability.argtypes = (c.c_void_p, c.c_bool, c.c_ulong, c.c_int) lib.vfu_pci_find_next_capability.restype = (c.c_ulong) lib.vfu_irq_trigger.argtypes = (c.c_void_p, c.c_uint) -vfu_dma_register_cb_t = c.CFUNCTYPE(None, c.c_void_p, c.POINTER(vfu_dma_info_t)) +vfu_dma_register_cb_t = c.CFUNCTYPE(None, c.c_void_p, + c.POINTER(vfu_dma_info_t)) vfu_dma_unregister_cb_t = c.CFUNCTYPE(c.c_int, c.c_void_p, c.POINTER(vfu_dma_info_t)) lib.vfu_setup_device_dma.argtypes = (c.c_void_p, vfu_dma_register_cb_t, @@ -527,23 +553,28 @@ def to_byte(val): """Cast an int to a byte value.""" return val.to_bytes(1, 'little') + def skip(fmt, buf): """Return the data remaining after skipping the given elements.""" return buf[struct.calcsize(fmt):] + def parse_json(json_str): """Parse JSON into an object with attributes (instead of using a dict).""" return json.loads(json_str, object_hook=lambda d: SimpleNamespace(**d)) + def eventfd(initval=0, flags=0): libc.eventfd.argtypes = (c.c_uint, c.c_int) return libc.eventfd(initval, flags) + def connect_sock(): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(SOCK_PATH) return sock + def connect_client(ctx): sock = connect_sock() @@ -557,6 +588,7 @@ def connect_client(ctx): payload = get_reply(sock, expect=0) return sock + def disconnect_client(ctx, sock): sock.close() @@ -565,6 +597,7 @@ def disconnect_client(ctx, sock): assert ret == -1 assert c.get_errno() == errno.ENOTCONN + def get_reply(sock, expect=0): buf = sock.recv(4096) (msg_id, cmd, msg_size, flags, errno) = struct.unpack("HHIII", buf[0:16]) @@ -572,6 +605,7 @@ def get_reply(sock, expect=0): assert errno == expect return buf[16:] + def msg(ctx, sock, cmd, payload, expect=0, fds=None, rsp=True): """Round trip a request and reply to the server.""" hdr = vfio_user_header(cmd, size=len(payload)) @@ -589,6 +623,7 @@ def msg(ctx, sock, cmd, payload, expect=0, fds=None, rsp=True): return return get_reply(sock, expect=expect) + def get_reply_fds(sock, expect=0): """Receives a message from a socket and pulls the returned file descriptors out of the message.""" @@ -599,16 +634,17 @@ def get_reply_fds(sock, expect=0): data[0:16]) assert errno == expect - cmsg_level, cmsg_type, packed_fd = ancillary[0] if len(ancillary) != 0 else \ - (0, 0, []) + cmsg_level, cmsg_type, packed_fd = ancillary[0] if len(ancillary) != 0 \ + else (0, 0, []) unpacked_fds = [] for i in range(0, len(packed_fd), 4): - [unpacked_fd] = struct.unpack_from("i", packed_fd, offset = i) + [unpacked_fd] = struct.unpack_from("i", packed_fd, offset=i) unpacked_fds.append(unpacked_fd) assert len(packed_fd)/4 == len(unpacked_fds) assert (msg_flags & VFIO_USER_F_TYPE_REPLY) != 0 return (unpacked_fds, data[16:]) + def msg_fds(ctx, sock, cmd, payload, expect=0, fds=None): """Round trip a request and reply to the server. With the server returning new fds""" @@ -623,18 +659,22 @@ def msg_fds(ctx, sock, cmd, payload, expect=0, fds=None): vfu_run_ctx(ctx) return get_reply_fds(sock, expect=expect) + def get_pci_header(ctx): ptr = lib.vfu_pci_get_config_space(ctx) return c.cast(ptr, c.POINTER(vfu_pci_hdr_t)).contents + def get_pci_cfg_space(ctx): ptr = lib.vfu_pci_get_config_space(ctx) return c.cast(ptr, c.POINTER(c.c_char))[0:PCI_CFG_SPACE_SIZE] + def get_pci_ext_cfg_space(ctx): ptr = lib.vfu_pci_get_config_space(ctx) return c.cast(ptr, c.POINTER(c.c_char))[0:PCI_CFG_SPACE_EXP_SIZE] + def read_pci_cfg_space(ctx, buf, count, offset, extended=False): space = get_pci_ext_cfg_space(ctx) if extended else get_pci_cfg_space(ctx) @@ -642,6 +682,7 @@ def read_pci_cfg_space(ctx, buf, count, offset, extended=False): buf[i] = space[offset+i] return count + def write_pci_cfg_space(ctx, buf, count, offset, extended=False): max_offset = PCI_CFG_SPACE_EXP_SIZE if extended else PCI_CFG_SPACE_SIZE @@ -653,6 +694,7 @@ def write_pci_cfg_space(ctx, buf, count, offset, extended=False): space[offset+i] = buf[i] return count + def access_region(ctx, sock, is_write, region, offset, count, data=None, expect=0, rsp=True): # struct vfio_user_region_access @@ -669,12 +711,16 @@ def access_region(ctx, sock, is_write, region, offset, count, return skip("QII", result) + def write_region(ctx, sock, region, offset, count, data, expect=0, rsp=True): access_region(ctx, sock, True, region, offset, count, data, expect=expect, rsp=rsp) + def read_region(ctx, sock, region, offset, count, expect=0): - return access_region(ctx, sock, False, region, offset, count, expect=expect) + return access_region(ctx, sock, False, region, offset, count, + expect=expect) + def ext_cap_hdr(buf, offset): """Read an extended cap header.""" @@ -684,18 +730,21 @@ def ext_cap_hdr(buf, offset): cap_next >>= 4 return cap_id, cap_next + @vfu_dma_register_cb_t def dma_register(ctx, info): pass + @vfu_dma_unregister_cb_t def dma_unregister(ctx, info): pass return 0 + def prepare_ctx_for_dma(): ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_pci_init(ctx) assert ret == 0 @@ -712,12 +761,15 @@ def prepare_ctx_for_dma(): # Library wrappers # + msg_id = 1 + @c.CFUNCTYPE(None, c.c_void_p, c.c_int, c.c_char_p) def log(ctx, level, msg): print(msg.decode("utf-8")) + def vfio_user_header(cmd, size, no_reply=False, error=False, error_no=0): global msg_id @@ -728,6 +780,7 @@ def vfio_user_header(cmd, size, no_reply=False, error=False, error_no=0): return buf + def vfu_create_ctx(trans=VFU_TRANS_SOCK, sock_path=SOCK_PATH, flags=0, private=None, dev_type=VFU_DEV_TYPE_PCI): if os.path.exists(sock_path): @@ -740,9 +793,11 @@ def vfu_create_ctx(trans=VFU_TRANS_SOCK, sock_path=SOCK_PATH, flags=0, return ctx + def vfu_realize_ctx(ctx): return lib.vfu_realize_ctx(ctx) + def vfu_attach_ctx(ctx, expect=0): ret = lib.vfu_attach_ctx(ctx) if expect == 0: @@ -752,18 +807,21 @@ def vfu_attach_ctx(ctx, expect=0): assert c.get_errno() == expect return ret + def vfu_run_ctx(ctx): return lib.vfu_run_ctx(ctx) + def vfu_destroy_ctx(ctx): lib.vfu_destroy_ctx(ctx) ctx = None if os.path.exists(SOCK_PATH): os.remove(SOCK_PATH) + def vfu_setup_region(ctx, index, size, cb=None, flags=0, mmap_areas=None, nr_mmap_areas=None, fd=-1, offset=0): - assert ctx != None + assert ctx is not None c_mmap_areas = None @@ -790,50 +848,59 @@ def vfu_setup_region(ctx, index, size, cb=None, flags=0, return ret + def vfu_setup_device_reset_cb(ctx, cb): - assert ctx != None + assert ctx is not None return lib.vfu_setup_device_reset_cb(ctx, c.cast(cb, vfu_reset_cb_t)) + def vfu_setup_device_nr_irqs(ctx, irqtype, count): - assert ctx != None + assert ctx is not None return lib.vfu_setup_device_nr_irqs(ctx, irqtype, count) + def vfu_pci_init(ctx, pci_type=VFU_PCI_TYPE_EXPRESS, hdr_type=PCI_HEADER_TYPE_NORMAL): - assert ctx != None + assert ctx is not None return lib.vfu_pci_init(ctx, pci_type, hdr_type, 0) + def vfu_pci_add_capability(ctx, pos, flags, data): - assert ctx != None + assert ctx is not None databuf = (c.c_byte * len(data)).from_buffer(bytearray(data)) return lib.vfu_pci_add_capability(ctx, pos, flags, databuf) + def vfu_pci_find_capability(ctx, extended, cap_id): - assert ctx != None + assert ctx is not None return lib.vfu_pci_find_capability(ctx, extended, cap_id) + def vfu_pci_find_next_capability(ctx, extended, offset, cap_id): - assert ctx != None + assert ctx is not None return lib.vfu_pci_find_next_capability(ctx, extended, offset, cap_id) + def vfu_irq_trigger(ctx, subindex): - assert ctx != None + assert ctx is not None return lib.vfu_irq_trigger(ctx, subindex) + def vfu_setup_device_dma(ctx, register_cb=None, unregister_cb=None): - assert ctx != None + assert ctx is not None return lib.vfu_setup_device_dma(ctx, c.cast(register_cb, vfu_dma_register_cb_t), c.cast(unregister_cb, vfu_dma_unregister_cb_t)) + def vfu_setup_device_migration_callbacks(ctx, cbs=None, offset=0): - assert ctx != None + assert ctx is not None @c.CFUNCTYPE(c.c_int) def stub(): @@ -851,9 +918,10 @@ def vfu_setup_device_migration_callbacks(ctx, cbs=None, offset=0): return lib.vfu_setup_device_migration_callbacks(ctx, cbs, offset) + def vfu_addr_to_sg(ctx, dma_addr, length, max_sg=1, prot=(mmap.PROT_READ | mmap.PROT_WRITE)): - assert ctx != None + assert ctx is not None sg = (dma_sg_t * max_sg)() @@ -863,13 +931,17 @@ def vfu_addr_to_sg(ctx, dma_addr, length, max_sg=1, def vfu_map_sg(ctx, sg, iovec, cnt=1, flags=0): return lib.vfu_map_sg(ctx, sg, iovec, cnt, flags) + def vfu_unmap_sg(ctx, sg, iovec, cnt=1): return lib.vfu_unmap_sg(ctx, sg, iovec, cnt) + def vfu_create_ioeventfd(ctx, region_idx, fd, offset, size, flags, datamatch): - assert ctx != None + assert ctx is not None + + return lib.vfu_create_ioeventfd(ctx, region_idx, fd, offset, size, + flags, datamatch) - return lib.vfu_create_ioeventfd(ctx, region_idx, fd, offset, size, flags, datamatch) def vfu_migr_done(ctx, err): return lib.vfu_migr_done(ctx, err) diff --git a/test/py/test_device_get_info.py b/test/py/test_device_get_info.py index ea9251b..c41d382 100644 --- a/test/py/test_device_get_info.py +++ b/test/py/test_device_get_info.py @@ -30,11 +30,12 @@ from libvfio_user import * import errno + def test_device_get_info(): global ctx ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR0_REGION_IDX, size=4096, flags=VFU_REGION_FLAG_RW) @@ -56,13 +57,15 @@ def test_device_get_info(): # bad argsz - payload = vfio_user_device_info(argsz=8, flags=0, num_regions=0, num_irqs=0) + payload = vfio_user_device_info(argsz=8, flags=0, + num_regions=0, num_irqs=0) msg(ctx, sock, VFIO_USER_DEVICE_GET_INFO, payload, expect=errno.EINVAL) # valid with larger argsz - payload = vfio_user_device_info(argsz=32, flags=0, num_regions=0, num_irqs=0) + payload = vfio_user_device_info(argsz=32, flags=0, + num_regions=0, num_irqs=0) result = msg(ctx, sock, VFIO_USER_DEVICE_GET_INFO, payload) diff --git a/test/py/test_device_get_irq_info.py b/test/py/test_device_get_irq_info.py index 4d6b3d7..c45ad7f 100644 --- a/test/py/test_device_get_irq_info.py +++ b/test/py/test_device_get_irq_info.py @@ -35,11 +35,12 @@ sock = None argsz = len(vfio_irq_info()) + def test_device_get_irq_info_setup(): global ctx, sock ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_pci_init(ctx) assert ret == 0 @@ -56,6 +57,7 @@ def test_device_get_irq_info_setup(): sock = connect_client(ctx) + def test_device_get_irq_info_bad_in(): payload = struct.pack("II", 0, 0) @@ -73,6 +75,7 @@ def test_device_get_irq_info_bad_in(): msg(ctx, sock, VFIO_USER_DEVICE_GET_IRQ_INFO, payload, expect=errno.EINVAL) + def test_device_get_irq_info(): # valid with larger argsz @@ -118,6 +121,7 @@ def test_device_get_irq_info(): assert info.index == VFU_DEV_MSIX_IRQ assert info.count == 2048 + def test_device_get_irq_info_cleanup(): disconnect_client(ctx, sock) diff --git a/test/py/test_device_get_region_info.py b/test/py/test_device_get_region_info.py index 710efd8..f7e63d2 100644 --- a/test/py/test_device_get_region_info.py +++ b/test/py/test_device_get_region_info.py @@ -30,18 +30,18 @@ from libvfio_user import * import errno import tempfile -import os ctx = None sock = None argsz = len(vfio_region_info()) + def test_device_get_region_info_setup(): global ctx, sock ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR1_REGION_IDX, size=4096, flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM)) @@ -50,7 +50,7 @@ def test_device_get_region_info_setup(): f = tempfile.TemporaryFile() f.truncate(65536) - mmap_areas = [ (0x2000, 0x1000), (0x4000, 0x2000) ] + mmap_areas = [(0x2000, 0x1000), (0x4000, 0x2000)] ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x8000, flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM), @@ -60,7 +60,7 @@ def test_device_get_region_info_setup(): f = tempfile.TemporaryFile() f.truncate(0x2000) - mmap_areas = [ (0x1000, 0x1000) ] + mmap_areas = [(0x1000, 0x1000)] ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_MIGR_REGION_IDX, size=0x2000, flags=VFU_REGION_FLAG_RW, mmap_areas=mmap_areas, @@ -72,6 +72,7 @@ def test_device_get_region_info_setup(): sock = connect_client(ctx) + def test_device_get_region_info_short_write(): payload = struct.pack("II", 0, 0) @@ -79,6 +80,7 @@ def test_device_get_region_info_short_write(): msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_INFO, payload, expect=errno.EINVAL) + def test_device_get_region_info_bad_argsz(): payload = vfio_region_info(argsz=8, flags=0, @@ -88,6 +90,7 @@ def test_device_get_region_info_bad_argsz(): msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_INFO, payload, expect=errno.EINVAL) + def test_device_get_region_info_bad_index(): payload = vfio_region_info(argsz=argsz, flags=0, @@ -97,6 +100,7 @@ def test_device_get_region_info_bad_index(): msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_INFO, payload, expect=errno.EINVAL) + def test_device_get_region_info_larger_argsz(): payload = vfio_region_info(argsz=argsz + 8, flags=0, @@ -117,6 +121,7 @@ def test_device_get_region_info_larger_argsz(): assert info.size == 4096 assert info.offset == 0 + def test_device_get_region_info_small_argsz_caps(): global sock @@ -141,6 +146,7 @@ def test_device_get_region_info_small_argsz_caps(): # skip reading the SCM_RIGHTS disconnect_client(ctx, sock) + def test_device_get_region_info_caps(): global sock @@ -176,6 +182,7 @@ def test_device_get_region_info_caps(): # skip reading the SCM_RIGHTS disconnect_client(ctx, sock) + def test_device_get_region_info_migr(): global sock @@ -213,5 +220,6 @@ def test_device_get_region_info_migr(): # skip reading the SCM_RIGHTS disconnect_client(ctx, sock) + def test_device_get_region_info_cleanup(): vfu_destroy_ctx(ctx) diff --git a/test/py/test_device_get_region_info_zero_size.py b/test/py/test_device_get_region_info_zero_size.py index c0965d3..d80152c 100644 --- a/test/py/test_device_get_region_info_zero_size.py +++ b/test/py/test_device_get_region_info_zero_size.py @@ -28,20 +28,18 @@ # from libvfio_user import * -import errno -import tempfile -import os ctx = None sock = None argsz = len(vfio_region_info()) + def test_device_get_region_info_setup(): global ctx, sock ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_realize_ctx(ctx) assert ret == 0 @@ -59,7 +57,8 @@ def test_device_get_region_info_zero_sized_region(): index=index, cap_offset=0, size=0, offset=0) - hdr = vfio_user_header(VFIO_USER_DEVICE_GET_REGION_INFO, size=len(payload)) + hdr = vfio_user_header(VFIO_USER_DEVICE_GET_REGION_INFO, + size=len(payload)) sock.send(hdr + payload) vfu_run_ctx(ctx) result = get_reply(sock) diff --git a/test/py/test_device_get_region_io_fds.py b/test/py/test_device_get_region_io_fds.py index 2179cf7..bffe508 100644 --- a/test/py/test_device_get_region_io_fds.py +++ b/test/py/test_device_get_region_io_fds.py @@ -40,16 +40,17 @@ sock = None fds = [] IOEVENT_SIZE = 8 + def test_device_get_region_io_fds_setup(): global ctx, sock ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None f = tempfile.TemporaryFile() f.truncate(65536) - mmap_areas = [ (0x2000, 0x1000), (0x4000, 0x2000) ] + mmap_areas = [(0x2000, 0x1000), (0x4000, 0x2000)] ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR1_REGION_IDX, size=0x8000, flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM), @@ -59,7 +60,7 @@ def test_device_get_region_io_fds_setup(): f = tempfile.TemporaryFile() f.truncate(65536) - mmap_areas = [ (0x2000, 0x1000), (0x4000, 0x2000) ] + mmap_areas = [(0x2000, 0x1000), (0x4000, 0x2000)] ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x8000, flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM), @@ -74,55 +75,60 @@ def test_device_get_region_io_fds_setup(): assert ret == 0 sock = connect_client(ctx) - for i in range(0,6): - tmp = eventfd(0,0) + for i in range(0, 6): + tmp = eventfd(0, 0) fds.append(tmp) assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, tmp, i * IOEVENT_SIZE, IOEVENT_SIZE, 0, 0) != -1 + def test_device_get_region_io_fds_bad_flags(): payload = vfio_user_region_io_fds_request( - argsz = len(vfio_user_region_io_fds_reply()) + - len(vfio_user_sub_region_ioeventfd()) * 5, flags = 1, - index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0) + argsz=len(vfio_user_region_io_fds_reply()) + + len(vfio_user_sub_region_ioeventfd()) * 5, flags=1, + index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0) msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect=errno.EINVAL) + def test_device_get_region_io_fds_bad_count(): payload = vfio_user_region_io_fds_request( - argsz = len(vfio_user_region_io_fds_reply()) + - len(vfio_user_sub_region_ioeventfd()) * 5, flags = 0, - index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 1) + argsz=len(vfio_user_region_io_fds_reply()) + + len(vfio_user_sub_region_ioeventfd()) * 5, flags=0, + index=VFU_PCI_DEV_BAR2_REGION_IDX, count=1) msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect=errno.EINVAL) + def test_device_get_region_io_fds_buffer_too_small(): payload = vfio_user_region_io_fds_request( - argsz = len(vfio_user_region_io_fds_reply()) - 1, flags = 0, - index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 1) + argsz=len(vfio_user_region_io_fds_reply()) - 1, flags=0, + index=VFU_PCI_DEV_BAR2_REGION_IDX, count=1) msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect=errno.EINVAL) + def test_device_get_region_io_fds_buffer_too_large(): - payload = vfio_user_region_io_fds_request(argsz = SERVER_MAX_DATA_XFER_SIZE - + 1, flags = 0, - index = VFU_PCI_DEV_BAR2_REGION_IDX, - count = 1) + payload = vfio_user_region_io_fds_request(argsz=SERVER_MAX_DATA_XFER_SIZE + + 1, flags=0, + index=VFU_PCI_DEV_BAR2_REGION_IDX, + count=1) + + msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, + expect=errno.EINVAL) - msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect = - errno.EINVAL) def test_device_get_region_io_fds_no_fds(): - payload = vfio_user_region_io_fds_request(argsz = 512, flags = 0, - index = VFU_PCI_DEV_BAR1_REGION_IDX, count = 0) + payload = vfio_user_region_io_fds_request(argsz=512, flags=0, + index=VFU_PCI_DEV_BAR1_REGION_IDX, count=0) ret = msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect=0) @@ -136,19 +142,20 @@ def test_device_get_region_io_fds_no_fds(): def test_device_get_region_io_fds_no_regions_setup(): - payload = vfio_user_region_io_fds_request(argsz = 512, flags = 0, - index = VFU_PCI_DEV_BAR3_REGION_IDX, count = 0) + payload = vfio_user_region_io_fds_request(argsz=512, flags=0, + index=VFU_PCI_DEV_BAR3_REGION_IDX, count=0) + + msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, + expect=errno.EINVAL) - ret = msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, - expect = errno.EINVAL) def test_device_get_region_io_fds_region_no_mmap(): - payload = vfio_user_region_io_fds_request(argsz = 512, flags = 0, - index = VFU_PCI_DEV_BAR5_REGION_IDX, count = 0) + payload = vfio_user_region_io_fds_request(argsz=512, flags=0, + index=VFU_PCI_DEV_BAR5_REGION_IDX, count=0) ret = msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, - expect = 0) + expect=0) reply, ret = vfio_user_region_io_fds_reply.pop_from_buffer(ret) @@ -157,20 +164,22 @@ def test_device_get_region_io_fds_region_no_mmap(): assert reply.flags == 0 assert reply.index == VFU_PCI_DEV_BAR5_REGION_IDX + def test_device_get_region_io_fds_region_out_of_range(): - payload = vfio_user_region_io_fds_request(argsz = 512, flags = 0, - index = 512, count = 0) + payload = vfio_user_region_io_fds_request(argsz=512, flags=0, + index=512, count=0) + + msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, + expect=errno.EINVAL) - msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect = - errno.EINVAL) def test_device_get_region_io_fds_fds_read_write(): payload = vfio_user_region_io_fds_request( - argsz = len(vfio_user_region_io_fds_reply()) + - len(vfio_user_sub_region_ioeventfd()) * 10, flags = 0, - index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0) + argsz=len(vfio_user_region_io_fds_reply()) + + len(vfio_user_sub_region_ioeventfd()) * 10, flags=0, + index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0) newfds, ret = msg_fds(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect=0) @@ -188,18 +197,19 @@ def test_device_get_region_io_fds_fds_read_write(): # Server for i in range(0, len(newfds)): out = os.read(newfds[i], IOEVENT_SIZE) - [out] = struct.unpack("@Q",out) + [out] = struct.unpack("@Q", out) assert out == 10 for i in newfds: os.close(i) + def test_device_get_region_io_fds_full(): payload = vfio_user_region_io_fds_request( - argsz = len(vfio_user_region_io_fds_reply()) + - len(vfio_user_sub_region_ioeventfd()) * 6, flags = 0, - index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0) + argsz=len(vfio_user_region_io_fds_reply()) + + len(vfio_user_sub_region_ioeventfd()) * 6, flags=0, + index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0) newfds, ret = msg_fds(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect=0) @@ -214,7 +224,7 @@ def test_device_get_region_io_fds_full(): for i in range(0, reply.count): out = os.read(newfds[ioevents[i].fd_index], ioevent.size) - [out] = struct.unpack("@Q",out) + [out] = struct.unpack("@Q", out) assert out == 1 assert ioevents[i].size == IOEVENT_SIZE assert ioevents[i].offset == 40 - (IOEVENT_SIZE * i) @@ -223,11 +233,12 @@ def test_device_get_region_io_fds_full(): for i in newfds: os.close(i) + def test_device_get_region_io_fds_fds_read_write_nothing(): payload = vfio_user_region_io_fds_request( - argsz = len(vfio_user_region_io_fds_reply()), flags = 0, - index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0) + argsz=len(vfio_user_region_io_fds_reply()), flags=0, + index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0) newfds, ret = msg_fds(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect=0) @@ -237,21 +248,22 @@ def test_device_get_region_io_fds_fds_read_write_nothing(): assert reply.argsz == len(vfio_user_region_io_fds_reply()) + \ len(vfio_user_sub_region_ioeventfd()) * 6 + def test_device_get_region_io_fds_fds_read_write_dupe_fd(): """ Test here to show that we can return mutliple sub regions with the same - fd_index. fd_index points to the list of fds returned from the socket as - returned by msg_fds. """ + fd_index. fd_index points to the list of fds returned from the socket + as returned by msg_fds. """ - t = eventfd(0,0) + t = eventfd(0, 0) assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, t, 6 * IOEVENT_SIZE, IOEVENT_SIZE, 0, 0) != -1 assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, t, 7 * IOEVENT_SIZE, IOEVENT_SIZE, 0, 0) != -1 payload = vfio_user_region_io_fds_request( - argsz = len(vfio_user_region_io_fds_reply()) + - len(vfio_user_sub_region_ioeventfd()) * 8, flags = 0, - index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0) + argsz=len(vfio_user_region_io_fds_reply()) + + len(vfio_user_sub_region_ioeventfd()) * 8, flags=0, + index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0) newfds, ret = msg_fds(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect=0) @@ -271,7 +283,7 @@ def test_device_get_region_io_fds_fds_read_write_dupe_fd(): for i in range(2, 8): out = os.read(newfds[ioevents[i].fd_index], ioevent.size) - [out] = struct.unpack("@Q",out) + [out] = struct.unpack("@Q", out) assert out == 1 assert ioevents[i].size == IOEVENT_SIZE assert ioevents[i].offset == 56 - (IOEVENT_SIZE * i) @@ -283,30 +295,32 @@ def test_device_get_region_io_fds_fds_read_write_dupe_fd(): os.write(newfds[ioevents[0].fd_index], c.c_ulonglong(1)) out = os.read(newfds[ioevents[1].fd_index], ioevent.size) - [out] = struct.unpack("@Q",out) + [out] = struct.unpack("@Q", out) assert out == 1 os.write(newfds[ioevents[1].fd_index], c.c_ulonglong(1)) out = os.read(newfds[ioevents[0].fd_index], ioevent.size) - [out] = struct.unpack("@Q",out) + [out] = struct.unpack("@Q", out) assert out == 1 os.write(newfds[ioevents[0].fd_index], c.c_ulonglong(1)) out = os.read(newfds[ioevents[1].fd_index], ioevent.size) - [out] = struct.unpack("@Q",out) + [out] = struct.unpack("@Q", out) assert out == 1 for i in newfds: os.close(i) + def test_device_get_region_io_fds_ioeventfd_invalid_size(): - t = eventfd(0,0) - assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, t, 0x8000 - -2048, 4096, 0, 0) == -1 + t = eventfd(0, 0) + assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, t, + 0x8000 - 0x800, 4096, 0, 0) == -1 os.close(t) + def test_device_get_region_info_cleanup(): for i in fds: os.close(i) diff --git a/test/py/test_device_set_irqs.py b/test/py/test_device_set_irqs.py index a958db3..6d47778 100644 --- a/test/py/test_device_set_irqs.py +++ b/test/py/test_device_set_irqs.py @@ -28,21 +28,20 @@ # from libvfio_user import * -import ctypes as c import errno import os -import sys ctx = None sock = None argsz = len(vfio_irq_set()) + def test_device_set_irqs_setup(): global ctx, sock ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_pci_init(ctx) assert ret == 0 @@ -59,17 +58,20 @@ def test_device_set_irqs_setup(): sock = connect_client(ctx) + def test_device_set_irqs_no_irq_set(): hdr = vfio_user_header(VFIO_USER_DEVICE_SET_IRQS, size=0) sock.send(hdr) vfu_run_ctx(ctx) get_reply(sock, expect=errno.EINVAL) + def test_device_set_irqs_short_write(): payload = struct.pack("II", 0, 0) msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_bad_argsz(): payload = vfio_irq_set(argsz=3, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_REQ_IRQ, @@ -77,6 +79,7 @@ def test_device_set_irqs_bad_argsz(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_bad_index(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_NUM_IRQS, @@ -84,6 +87,7 @@ def test_device_set_irqs_bad_index(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_bad_flags_MASK_and_UNMASK(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK | VFIO_IRQ_SET_ACTION_UNMASK, index=VFU_DEV_MSIX_IRQ, @@ -91,6 +95,7 @@ def test_device_set_irqs_bad_flags_MASK_and_UNMASK(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_bad_flags_DATA_NONE_and_DATA_BOOL(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK | VFIO_IRQ_SET_DATA_NONE | VFIO_IRQ_SET_DATA_BOOL, @@ -98,6 +103,7 @@ def test_device_set_irqs_bad_flags_DATA_NONE_and_DATA_BOOL(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_bad_start_count_range(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK | VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_MSIX_IRQ, @@ -105,13 +111,15 @@ def test_device_set_irqs_bad_start_count_range(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) -def test_device_set_irqs_bad_start_count_range(): + +def test_device_set_irqs_bad_start_count_range2(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK | VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_MSIX_IRQ, start=2049, count=1) msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_bad_action_for_err_irq(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK | VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_ERR_IRQ, @@ -119,6 +127,7 @@ def test_device_set_irqs_bad_action_for_err_irq(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_bad_action_for_req_irq(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK | VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_REQ_IRQ, @@ -126,6 +135,7 @@ def test_device_set_irqs_bad_action_for_req_irq(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_bad_start_for_count_0(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK | VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_MSIX_IRQ, @@ -133,6 +143,7 @@ def test_device_set_irqs_bad_start_for_count_0(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_bad_action_for_count_0(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK | VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_MSIX_IRQ, @@ -140,6 +151,7 @@ def test_device_set_irqs_bad_action_for_count_0(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_bad_action_and_data_type_for_count_0(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_BOOL, index=VFU_DEV_MSIX_IRQ, @@ -147,6 +159,7 @@ def test_device_set_irqs_bad_action_and_data_type_for_count_0(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_bad_fds_for_DATA_BOOL(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_BOOL, index=VFU_DEV_MSIX_IRQ, @@ -161,6 +174,7 @@ def test_device_set_irqs_bad_fds_for_DATA_BOOL(): os.close(fd) + def test_device_set_irqs_bad_fds_for_DATA_NONE(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_MSIX_IRQ, @@ -173,6 +187,7 @@ def test_device_set_irqs_bad_fds_for_DATA_NONE(): os.close(fd) + def test_device_set_irqs_bad_fds_for_count_2(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_EVENTFD, index=VFU_DEV_MSIX_IRQ, @@ -185,6 +200,7 @@ def test_device_set_irqs_bad_fds_for_count_2(): os.close(fd) + def test_device_set_irqs_disable(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_REQ_IRQ, @@ -198,6 +214,7 @@ def test_device_set_irqs_disable(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload) + def test_device_set_irqs_enable(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_EVENTFD, index=VFU_DEV_MSIX_IRQ, @@ -207,6 +224,7 @@ def test_device_set_irqs_enable(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, fds=[fd]) + def test_device_set_irqs_trigger_bool_too_small(): payload = vfio_irq_set(argsz=argsz + 1, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_BOOL, index=VFU_DEV_MSIX_IRQ, @@ -215,6 +233,7 @@ def test_device_set_irqs_trigger_bool_too_small(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_trigger_bool_too_large(): payload = vfio_irq_set(argsz=argsz + 3, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_BOOL, index=VFU_DEV_MSIX_IRQ, @@ -223,6 +242,7 @@ def test_device_set_irqs_trigger_bool_too_large(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL) + def test_device_set_irqs_enable_update(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_EVENTFD, index=VFU_DEV_MSIX_IRQ, @@ -232,6 +252,7 @@ def test_device_set_irqs_enable_update(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, fds=[fd]) + def test_device_set_irqs_enable_trigger_none(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_EVENTFD, index=VFU_DEV_MSIX_IRQ, @@ -251,6 +272,7 @@ def test_device_set_irqs_enable_trigger_none(): assert struct.unpack("Q", os.read(fd1, 8))[0] == 4 assert struct.unpack("Q", os.read(fd2, 8))[0] == 9 + def test_device_set_irqs_enable_trigger_bool(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_EVENTFD, index=VFU_DEV_MSIX_IRQ, @@ -271,5 +293,6 @@ def test_device_set_irqs_enable_trigger_bool(): assert struct.unpack("Q", os.read(fd1, 8))[0] == 4 assert struct.unpack("Q", os.read(fd2, 8))[0] == 9 + def test_device_set_irqs_cleanup(): vfu_destroy_ctx(ctx) diff --git a/test/py/test_dirty_pages.py b/test/py/test_dirty_pages.py index 193e694..a5b85dc 100644 --- a/test/py/test_dirty_pages.py +++ b/test/py/test_dirty_pages.py @@ -34,20 +34,23 @@ import tempfile ctx = None + @vfu_dma_register_cb_t def dma_register(ctx, info): pass + @vfu_dma_unregister_cb_t def dma_unregister(ctx, info): pass return 0 + def test_dirty_pages_setup(): global ctx, sock ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_pci_init(ctx) assert ret == 0 @@ -58,7 +61,7 @@ def test_dirty_pages_setup(): f = tempfile.TemporaryFile() f.truncate(0x2000) - mmap_areas = [ (0x1000, 0x1000) ] + mmap_areas = [(0x1000, 0x1000)] ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_MIGR_REGION_IDX, size=0x2000, flags=VFU_REGION_FLAG_RW, mmap_areas=mmap_areas, @@ -79,23 +82,27 @@ def test_dirty_pages_setup(): msg(ctx, sock, VFIO_USER_DMA_MAP, payload, fds=[f.fileno()]) + def test_dirty_pages_short_write(): payload = struct.pack("I", 8) msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.EINVAL) + def test_dirty_pages_bad_argsz(): payload = vfio_user_dirty_pages(argsz=4, flags=VFIO_IOMMU_DIRTY_PAGES_FLAG_START) msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.EINVAL) + def test_dirty_pages_start_no_migration(): payload = vfio_user_dirty_pages(argsz=len(vfio_user_dirty_pages()), flags=VFIO_IOMMU_DIRTY_PAGES_FLAG_START) msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.ENOTSUP) + def test_dirty_pages_start_bad_flags(): # # This is a little cheeky, after vfu_realize_ctx(), but it works at the @@ -122,17 +129,20 @@ def start_logging(): msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload) + def test_dirty_pages_start(): start_logging() # should be idempotent start_logging() + def test_dirty_pages_get_short_read(): payload = vfio_user_dirty_pages(argsz=len(vfio_user_dirty_pages()), flags=VFIO_IOMMU_DIRTY_PAGES_FLAG_GET_BITMAP) msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.EINVAL) + # # This should in fact work; update when it does. # @@ -147,6 +157,7 @@ def test_dirty_pages_get_sub_range(): msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.ENOTSUP) + def test_dirty_pages_get_bad_page_size(): argsz = len(vfio_user_dirty_pages()) + len(vfio_user_bitmap_range()) + 8 dirty_pages = vfio_user_dirty_pages(argsz=argsz, @@ -158,6 +169,7 @@ def test_dirty_pages_get_bad_page_size(): msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.EINVAL) + def test_dirty_pages_get_bad_bitmap_size(): argsz = len(vfio_user_dirty_pages()) + len(vfio_user_bitmap_range()) + 8 dirty_pages = vfio_user_dirty_pages(argsz=argsz, @@ -169,6 +181,7 @@ def test_dirty_pages_get_bad_bitmap_size(): msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.EINVAL) + def test_dirty_pages_get_short_reply(): dirty_pages = vfio_user_dirty_pages(argsz=len(vfio_user_dirty_pages()), flags=VFIO_IOMMU_DIRTY_PAGES_FLAG_GET_BITMAP) @@ -188,6 +201,7 @@ def test_dirty_pages_get_short_reply(): assert dirty_pages.argsz == argsz assert dirty_pages.flags == VFIO_IOMMU_DIRTY_PAGES_FLAG_GET_BITMAP + def test_dirty_pages_get_unmodified(): argsz = len(vfio_user_dirty_pages()) + len(vfio_user_bitmap_range()) + 8 @@ -232,9 +246,11 @@ def get_dirty_page_bitmap(): br, result = vfio_user_bitmap_range.pop_from_buffer(result) return struct.unpack("Q", result)[0] + sg3 = None iovec3 = None + def test_dirty_pages_get_modified(): ret, sg1 = vfu_addr_to_sg(ctx, dma_addr=0x10000, length=0x1000) assert ret == 1 @@ -261,7 +277,7 @@ def test_dirty_pages_get_modified(): # unmap segment, dirty bitmap should be the same vfu_unmap_sg(ctx, sg1, iovec1) - bitmap = get_dirty_page_bitmap() + bitmap = get_dirty_page_bitmap() assert bitmap == 0b11110001 # check again, previously unmapped segment should be clean @@ -301,6 +317,7 @@ def test_dirty_pages_stop(): # destroying the context. stop_logging() + def test_dirty_pages_cleanup(): disconnect_client(ctx, sock) vfu_destroy_ctx(ctx) diff --git a/test/py/test_dma_map.py b/test/py/test_dma_map.py index e4ac2ba..f446efd 100644 --- a/test/py/test_dma_map.py +++ b/test/py/test_dma_map.py @@ -36,11 +36,12 @@ import errno ctx = None + def test_dma_region_too_big(): global ctx ctx = prepare_ctx_for_dma() - assert ctx != None + assert ctx is not None sock = connect_client(ctx) @@ -53,6 +54,7 @@ def test_dma_region_too_big(): disconnect_client(ctx, sock) + def test_dma_region_too_many(): sock = connect_client(ctx) @@ -63,13 +65,14 @@ def test_dma_region_too_many(): offset=0, addr=0x1000 * i, size=4096) if i == MAX_DMA_REGIONS + 1: - expect=errno.EINVAL + expect = errno.EINVAL else: - expect=0 + expect = 0 msg(ctx, sock, VFIO_USER_DMA_MAP, payload, expect=expect) disconnect_client(ctx, sock) + def test_dma_region_cleanup(): vfu_destroy_ctx(ctx) diff --git a/test/py/test_dma_unmap.py b/test/py/test_dma_unmap.py index f070ede..1c7cea0 100644 --- a/test/py/test_dma_unmap.py +++ b/test/py/test_dma_unmap.py @@ -28,19 +28,18 @@ # DAMAGE. # -import ctypes import errno from libvfio_user import * -import tempfile ctx = None sock = None + def test_dma_unmap_setup(): global ctx, sock ctx = prepare_ctx_for_dma() - assert ctx != None + assert ctx is not None payload = struct.pack("II", 0, 0) sock = connect_client(ctx) @@ -52,15 +51,18 @@ def test_dma_unmap_setup(): msg(ctx, sock, VFIO_USER_DMA_MAP, payload) + def test_dma_unmap_short_write(): payload = struct.pack("II", 0, 0) msg(ctx, sock, VFIO_USER_DMA_UNMAP, payload, expect=errno.EINVAL) + def test_dma_unmap_bad_argsz(): - payload = vfio_user_dma_unmap(argsz=8, flags=0x2323, addr=0x1000, size=4096) + vfio_user_dma_unmap(argsz=8, flags=0x2323, addr=0x1000, size=4096) + def test_dma_unmap_invalid_flags(): @@ -68,12 +70,14 @@ def test_dma_unmap_invalid_flags(): flags=0x4, addr=0x1000, size=4096) msg(ctx, sock, VFIO_USER_DMA_UNMAP, payload, expect=errno.ENOTSUP) + def test_dma_unmap(): payload = vfio_user_dma_unmap(argsz=len(vfio_user_dma_unmap()), flags=0, addr=0x1000, size=4096) msg(ctx, sock, VFIO_USER_DMA_UNMAP, payload) + def test_dma_unmap_all(): for i in range(0, MAX_DMA_REGIONS): @@ -89,6 +93,7 @@ def test_dma_unmap_all(): msg(ctx, sock, VFIO_USER_DMA_UNMAP, payload) + def test_dma_unmap_all_invalid_addr(): payload = vfio_user_dma_unmap(argsz=len(vfio_user_dma_unmap()), @@ -96,6 +101,7 @@ def test_dma_unmap_all_invalid_addr(): msg(ctx, sock, VFIO_USER_DMA_UNMAP, payload, expect=errno.EINVAL) + def test_dma_unmap_all_invalid_flags(): payload = vfio_user_dma_unmap(argsz=len(vfio_user_dma_unmap()), @@ -104,6 +110,7 @@ def test_dma_unmap_all_invalid_flags(): msg(ctx, sock, VFIO_USER_DMA_UNMAP, payload, expect=errno.EINVAL) + def test_dma_unmap_cleanup(): disconnect_client(ctx, sock) vfu_destroy_ctx(ctx) diff --git a/test/py/test_irq_trigger.py b/test/py/test_irq_trigger.py index 02bc90a..39c75af 100644 --- a/test/py/test_irq_trigger.py +++ b/test/py/test_irq_trigger.py @@ -34,11 +34,12 @@ import errno ctx = None sock = None + def test_irq_trigger_setup(): global ctx, sock ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_pci_init(ctx) assert ret == 0 @@ -51,6 +52,7 @@ def test_irq_trigger_setup(): sock = connect_client(ctx) + def test_irq_trigger_bad_subindex(): ret = vfu_irq_trigger(ctx, 2048) assert ret == -1 @@ -60,11 +62,13 @@ def test_irq_trigger_bad_subindex(): assert ret == -1 assert c.get_errno() == errno.EINVAL + def test_irq_trigger_no_interrupt(): ret = vfu_irq_trigger(ctx, 0) assert ret == -1 assert c.get_errno() == errno.ENOENT + def test_irq_trigger(): # struct vfio_irq_set payload = struct.pack("IIIII", 20, VFIO_IRQ_SET_ACTION_TRIGGER | @@ -74,9 +78,10 @@ def test_irq_trigger(): msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, fds=[fd]) - ret = vfu_irq_trigger(ctx, 8) + vfu_irq_trigger(ctx, 8) assert struct.unpack("Q", os.read(fd, 8))[0] == 5 + def test_irq_trigger_cleanup(): vfu_destroy_ctx(ctx) diff --git a/test/py/test_map_unmap_sg.py b/test/py/test_map_unmap_sg.py index 00d9b28..fa98159 100644 --- a/test/py/test_map_unmap_sg.py +++ b/test/py/test_map_unmap_sg.py @@ -34,11 +34,12 @@ import tempfile ctx = None + def test_map_sg_with_invalid_region(): global ctx ctx = prepare_ctx_for_dma() - assert ctx != None + assert ctx is not None sg = dma_sg_t() iovec = iovec_t() @@ -46,6 +47,7 @@ def test_map_sg_with_invalid_region(): assert ret == -1 assert ctypes.get_errno() == errno.EINVAL + def test_map_sg_without_fd(): sock = connect_client(ctx) @@ -63,6 +65,7 @@ def test_map_sg_without_fd(): disconnect_client(ctx, sock) + def test_map_multiple_sge(): sock = connect_client(ctx) regions = 4 @@ -88,6 +91,7 @@ def test_map_multiple_sge(): disconnect_client(ctx, sock) + def test_unmap_sg(): sock = connect_client(ctx) regions = 4 @@ -111,6 +115,7 @@ def test_unmap_sg(): disconnect_client(ctx, sock) + def test_map_unmap_sg_cleanup(): vfu_destroy_ctx(ctx) diff --git a/test/py/test_migration.py b/test/py/test_migration.py index 04c35cc..aa271f9 100644 --- a/test/py/test_migration.py +++ b/test/py/test_migration.py @@ -36,6 +36,7 @@ ctx = None global trans_cb_err trans_cb_err = 0 + @transition_cb_t def trans_cb(ctx, state): global trans_cb_err @@ -49,7 +50,7 @@ def test_migration_setup(): global ctx, sock ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_MIGR_REGION_IDX, size=0x2000, flags=VFU_REGION_FLAG_RW) diff --git a/test/py/test_negotiate.py b/test/py/test_negotiate.py index 524f59f..0541d0e 100644 --- a/test/py/test_negotiate.py +++ b/test/py/test_negotiate.py @@ -29,13 +29,11 @@ from libvfio_user import * import errno -import json -import os -import socket import struct ctx = None + def client_version_json(expect=0, json=''): sock = connect_sock() @@ -50,15 +48,17 @@ def client_version_json(expect=0, json=''): return payload + def test_server_setup(): global ctx ctx = vfu_create_ctx() - assert ctx != None + assert ctx is not None ret = vfu_realize_ctx(ctx) assert ret == 0 + def test_short_write(): sock = connect_sock() hdr = vfio_user_header(VFIO_USER_VERSION, size=0) @@ -67,6 +67,7 @@ def test_short_write(): vfu_attach_ctx(ctx, expect=errno.EINVAL) get_reply(sock, expect=errno.EINVAL) + def test_long_write(): sock = connect_sock() hdr = vfio_user_header(VFIO_USER_VERSION, size=SERVER_MAX_MSG_SIZE + 1) @@ -76,6 +77,7 @@ def test_long_write(): assert ret == -1 assert c.get_errno() == errno.EINVAL + def test_bad_command(): sock = connect_sock() @@ -86,6 +88,7 @@ def test_bad_command(): vfu_attach_ctx(ctx, expect=errno.EINVAL) get_reply(sock, expect=errno.EINVAL) + def test_invalid_major(): sock = connect_sock() @@ -96,6 +99,7 @@ def test_invalid_major(): vfu_attach_ctx(ctx, expect=errno.EINVAL) get_reply(sock, expect=errno.EINVAL) + def test_invalid_json_missing_NUL(): sock = connect_sock() @@ -107,45 +111,55 @@ def test_invalid_json_missing_NUL(): vfu_attach_ctx(ctx, expect=errno.EINVAL) get_reply(sock, expect=errno.EINVAL) + def test_invalid_json_missing_closing_brace(): client_version_json(errno.EINVAL, b"{") + def test_invalid_json_missing_closing_quote(): client_version_json(errno.EINVAL, b'"') + def test_invalid_json_bad_capabilities_object(): client_version_json(errno.EINVAL, b'{ "capabilities": "23" }') + def test_invalid_json_bad_max_fds(): client_version_json(errno.EINVAL, b'{ "capabilities": { "max_msg_fds": "foo" } }') -def test_invalid_json_bad_max_fds(): + +def test_invalid_json_bad_max_fds2(): client_version_json(errno.EINVAL, b'{ "capabilities": { "max_msg_fds": -1 } }') -def test_invalid_json_bad_max_fds2(): + +def test_invalid_json_bad_max_fds3(): client_version_json(errno.EINVAL, b'{ "capabilities": { "max_msg_fds": %d } }' % (VFIO_USER_CLIENT_MAX_FDS_LIMIT + 1)) + def test_invalid_json_bad_migration_object(): client_version_json(errno.EINVAL, b'{ "capabilities": { "migration": "23" } }') + def test_invalid_json_bad_pgsize(): client_version_json(errno.EINVAL, b'{ "capabilities": ' + b'{ "migration": { "pgsize": "foo" } } }') + # # FIXME: need vfu_setup_device_migration_callbacks() to be able to test this # failure mode. # -def test_invalid_json_bad_pgsize(): +def test_invalid_json_bad_pgsize2(): if False: client_version_json(errno.EINVAL, b'{ "capabilities": { "migration": { "pgsize": 4095 } } }') + def test_valid_negotiate_no_json(): sock = connect_sock() @@ -167,12 +181,14 @@ def test_valid_negotiate_no_json(): disconnect_client(ctx, sock) + def test_valid_negotiate_empty_json(): client_version_json(json=b'{}') # notice client closed connection vfu_run_ctx(ctx) + def test_valid_negotiate_json(): client_version_json(json=bytes( '{ "capabilities": { "max_msg_fds": %s, "max_data_xfer_size": %u } }' % @@ -182,5 +198,6 @@ def test_valid_negotiate_json(): # notice client closed connection vfu_run_ctx(ctx) + def test_destroying(): vfu_destroy_ctx(ctx) diff --git a/test/py/test_pci_caps.py b/test/py/test_pci_caps.py index 5af54e3..5267246 100644 --- a/test/py/test_pci_caps.py +++ b/test/py/test_pci_caps.py @@ -33,11 +33,12 @@ import errno ctx = None + def test_pci_cap_setup(): global ctx ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_pci_init(ctx, pci_type=VFU_PCI_TYPE_CONVENTIONAL) assert ret == 0 @@ -46,30 +47,35 @@ def test_pci_cap_setup(): size=PCI_CFG_SPACE_SIZE, flags=VFU_REGION_FLAG_RW) assert ret == 0 + def test_pci_cap_bad_flags(): pos = vfu_pci_add_capability(ctx, pos=0, flags=999, data=struct.pack("ccHH", to_byte(PCI_CAP_ID_PM), b'\0', 0, 0)) assert pos == -1 assert c.get_errno() == errno.EINVAL + def test_pci_cap_no_cb(): pos = vfu_pci_add_capability(ctx, pos=0, flags=VFU_CAP_FLAG_CALLBACK, data=struct.pack("ccHH", to_byte(PCI_CAP_ID_PM), b'\0', 0, 0)) assert pos == -1 assert c.get_errno() == errno.EINVAL + def test_pci_cap_unknown_cap(): pos = vfu_pci_add_capability(ctx, pos=0, flags=0, data=struct.pack("ccHH", b'\x81', b'\0', 0, 0)) assert pos == -1 assert c.get_errno() == errno.ENOTSUP + def test_pci_cap_bad_pos(): pos = vfu_pci_add_capability(ctx, pos=PCI_CFG_SPACE_SIZE, flags=0, data=struct.pack("ccHH", to_byte(PCI_CAP_ID_PM), b'\0', 0, 0)) assert pos == -1 assert c.get_errno() == errno.EINVAL + @vfu_region_access_cb_t def pci_region_cb(ctx, buf, count, offset, is_write): if not is_write: @@ -77,13 +83,14 @@ def pci_region_cb(ctx, buf, count, offset, is_write): return write_pci_cfg_space(ctx, buf, count, offset) + def test_pci_cap_setup_cb(): global ctx vfu_destroy_ctx(ctx) ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_pci_init(ctx, pci_type=VFU_PCI_TYPE_CONVENTIONAL) assert ret == 0 @@ -93,6 +100,7 @@ def test_pci_cap_setup_cb(): flags=VFU_REGION_FLAG_RW) assert ret == 0 + cap_offsets = ( PCI_STD_HEADER_SIZEOF, PCI_STD_HEADER_SIZEOF + PCI_PM_SIZEOF, @@ -103,6 +111,7 @@ cap_offsets = ( 0xa0 ) + def test_add_caps(): pos = vfu_pci_add_capability(ctx, pos=0, flags=0, data=struct.pack("ccHH", to_byte(PCI_CAP_ID_PM), b'\0', 0, 0)) @@ -133,6 +142,7 @@ def test_add_caps(): ret = vfu_realize_ctx(ctx) assert ret == 0 + def test_find_caps(): offset = vfu_pci_find_capability(ctx, False, PCI_CAP_ID_PM) assert offset == cap_offsets[0] @@ -191,39 +201,42 @@ def test_find_caps(): assert offset == 0 assert c.get_errno() == errno.ENOENT + def test_pci_cap_write_hdr(): sock = connect_client(ctx) # offset of struct cap_hdr - offset=cap_offsets[0] - data=b'\x01' + offset = cap_offsets[0] + data = b'\x01' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data, expect=errno.EPERM) disconnect_client(ctx, sock) + def test_pci_cap_readonly(): sock = connect_client(ctx) # start of vendor payload - offset=cap_offsets[1] + 2 - data=b'\x01' + offset = cap_offsets[1] + 2 + data = b'\x01' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data, expect=errno.EPERM) # offsetof(struct vsc, data) - offset=cap_offsets[1] + 3 + offset = cap_offsets[1] + 3 payload = read_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=3) assert payload == b'abc' disconnect_client(ctx, sock) + def test_pci_cap_callback(): sock = connect_client(ctx) # offsetof(struct vsc, data) - offset=cap_offsets[2] + 3 + offset = cap_offsets[2] + 3 data = b"Hello world." payload = read_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, @@ -240,60 +253,64 @@ def test_pci_cap_callback(): disconnect_client(ctx, sock) + def test_pci_cap_write_pmcs(): sock = connect_client(ctx) # struct pc - offset=cap_offsets[0] + 3 - data=b'\x01\x02' + offset = cap_offsets[0] + 3 + data = b'\x01\x02' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data, expect=errno.EINVAL) - offset=cap_offsets[0] + 2 - data=b'\x01' + offset = cap_offsets[0] + 2 + data = b'\x01' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data, expect=errno.EINVAL) - offset=cap_offsets[0] + 2 - data=b'\x01\x02' + offset = cap_offsets[0] + 2 + data = b'\x01\x02' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data, expect=errno.ENOTSUP) # struct pmcs - offset=cap_offsets[0] + 5 - data=b'\x01\x02' + offset = cap_offsets[0] + 5 + data = b'\x01\x02' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data, expect=errno.EINVAL) - offset=cap_offsets[0] + 4 - data=b'\x01' + offset = cap_offsets[0] + 4 + data = b'\x01' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data, expect=errno.EINVAL) offset = cap_offsets[0] + 4 - data=b'\x01\x02' + data = b'\x01\x02' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data) assert get_pci_cfg_space(ctx)[offset:offset+2] == data # pmcsr_se - offset=cap_offsets[0] + 6 - data=b'\x01' + offset = cap_offsets[0] + 6 + data = b'\x01' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data, expect=errno.ENOTSUP) # data - offset=cap_offsets[0] + 7 - data=b'\x01' + offset = cap_offsets[0] + 7 + data = b'\x01' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data, expect=errno.ENOTSUP) disconnect_client(ctx, sock) + reset_flag = -1 + + @c.CFUNCTYPE(c.c_int, c.c_void_p, c.c_int) def vfu_reset_cb(ctx, reset_type): assert reset_type == VFU_RESET_PCI_FLR or reset_type == VFU_RESET_LOST_CONN @@ -301,18 +318,19 @@ def vfu_reset_cb(ctx, reset_type): reset_flag = reset_type return 0 + def test_pci_cap_write_px(): sock = connect_client(ctx) ret = vfu_setup_device_reset_cb(ctx, vfu_reset_cb) assert ret == 0 - #flrc + # flrc cap = struct.pack("ccHHcc52c", to_byte(PCI_CAP_ID_EXP), b'\0', 0, 0, b'\0', b'\x10', *[b'\0' for _ in range(52)]) pos = vfu_pci_add_capability(ctx, pos=cap_offsets[5], flags=0, data=cap) assert pos == cap_offsets[5] - #iflr + # iflr offset = cap_offsets[5] + 8 data = b'\x00\x80' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, @@ -322,6 +340,7 @@ def test_pci_cap_write_px(): disconnect_client(ctx, sock) assert reset_flag == VFU_RESET_LOST_CONN + def test_pci_cap_write_msix(): # FIXME pass @@ -329,7 +348,8 @@ def test_pci_cap_write_msix(): def test_pci_cap_write_pxdc2(): sock = connect_client(ctx) - offset = vfu_pci_find_capability(ctx, False, PCI_CAP_ID_EXP) + PCI_EXP_DEVCTL2 + offset = (vfu_pci_find_capability(ctx, False, PCI_CAP_ID_EXP) + + PCI_EXP_DEVCTL2) data = b'\xde\xad' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data) @@ -341,7 +361,8 @@ def test_pci_cap_write_pxdc2(): def test_pci_cap_write_pxlc2(): sock = connect_client(ctx) - offset = vfu_pci_find_capability(ctx, False, PCI_CAP_ID_EXP) + PCI_EXP_LNKCTL2 + offset = (vfu_pci_find_capability(ctx, False, PCI_CAP_ID_EXP) + + PCI_EXP_LNKCTL2) data = b'\xbe\xef' write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data) diff --git a/test/py/test_pci_ext_caps.py b/test/py/test_pci_ext_caps.py index ab10e12..94eda0e 100644 --- a/test/py/test_pci_ext_caps.py +++ b/test/py/test_pci_ext_caps.py @@ -33,9 +33,10 @@ import errno ctx = None + def test_pci_ext_cap_conventional(): ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_pci_init(ctx, pci_type=VFU_PCI_TYPE_CONVENTIONAL) assert ret == 0 @@ -55,11 +56,12 @@ def test_pci_ext_cap_conventional(): vfu_destroy_ctx(ctx) + def test_pci_ext_cap_setup(): global ctx ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_pci_init(ctx) assert ret == 0 @@ -69,6 +71,7 @@ def test_pci_ext_cap_setup(): flags=VFU_REGION_FLAG_RW) assert ret == 0 + def test_pci_ext_cap_unknown_cap(): cap = struct.pack("HHII", PCI_EXT_CAP_ID_DSN + 99, 0, 0, 0) @@ -77,6 +80,7 @@ def test_pci_ext_cap_unknown_cap(): assert pos == -1 assert c.get_errno() == errno.ENOTSUP + def test_pci_ext_cap_bad_pos(): cap = struct.pack("HHII", PCI_EXT_CAP_ID_DSN, 0, 0, 0) @@ -91,6 +95,7 @@ def test_pci_ext_cap_bad_pos(): assert pos == -1 assert c.get_errno() == errno.EINVAL + @vfu_region_access_cb_t def pci_region_cb(ctx, buf, count, offset, is_write): if not is_write: @@ -98,6 +103,7 @@ def pci_region_cb(ctx, buf, count, offset, is_write): return write_pci_cfg_space(ctx, buf, count, offset, extended=True) + cap_offsets = ( PCI_CFG_SPACE_SIZE, PCI_CFG_SPACE_SIZE + PCI_EXT_CAP_DSN_SIZEOF, @@ -107,6 +113,7 @@ cap_offsets = ( 600 ) + def test_add_ext_caps(): cap = struct.pack("HHII", PCI_EXT_CAP_ID_DSN, 0, 4, 8) @@ -150,6 +157,7 @@ def test_add_ext_caps(): ret = vfu_realize_ctx(ctx) assert ret == 0 + def test_find_ext_caps(): offset = vfu_pci_find_capability(ctx, True, PCI_EXT_CAP_ID_DSN) assert offset == cap_offsets[0] @@ -160,7 +168,8 @@ def test_find_ext_caps(): assert cap_id == PCI_EXT_CAP_ID_DSN assert cap_next == cap_offsets[1] - offset = vfu_pci_find_next_capability(ctx, True, offset, PCI_EXT_CAP_ID_DSN) + offset = vfu_pci_find_next_capability(ctx, True, offset, + PCI_EXT_CAP_ID_DSN) assert offset == 0 offset = vfu_pci_find_capability(ctx, True, PCI_EXT_CAP_ID_VNDR) @@ -169,7 +178,8 @@ def test_find_ext_caps(): assert cap_id == PCI_EXT_CAP_ID_VNDR assert cap_next == cap_offsets[2] - offset = vfu_pci_find_next_capability(ctx, True, offset, PCI_EXT_CAP_ID_DSN) + offset = vfu_pci_find_next_capability(ctx, True, offset, + PCI_EXT_CAP_ID_DSN) assert offset == 0 offset = vfu_pci_find_next_capability(ctx, True, 0, PCI_EXT_CAP_ID_VNDR) @@ -212,6 +222,7 @@ def test_find_ext_caps(): assert offset == 0 assert c.get_errno() == errno.ENOENT + def test_pci_ext_cap_write_hdr(): sock = connect_client(ctx) @@ -228,6 +239,7 @@ def test_pci_ext_cap_write_hdr(): disconnect_client(ctx, sock) + def test_pci_ext_cap_readonly(): sock = connect_client(ctx) @@ -244,6 +256,7 @@ def test_pci_ext_cap_readonly(): disconnect_client(ctx, sock) + def test_pci_ext_cap_callback(): sock = connect_client(ctx) @@ -265,10 +278,11 @@ def test_pci_ext_cap_callback(): disconnect_client(ctx, sock) + def test_pci_ext_cap_write_dsn(): sock = connect_client(ctx) - data = struct.pack("II", 1, 2); + data = struct.pack("II", 1, 2) offset = cap_offsets[0] + 4 write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, count=len(data), data=data, expect=errno.EPERM) @@ -281,10 +295,11 @@ def test_pci_ext_cap_write_dsn(): disconnect_client(ctx, sock) + def test_pci_ext_cap_write_vendor(): sock = connect_client(ctx) - data = struct.pack("II", 0x1, 0x2); + data = struct.pack("II", 0x1, 0x2) # start of vendor payload offset = cap_offsets[2] + 8 write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset, @@ -297,5 +312,6 @@ def test_pci_ext_cap_write_vendor(): disconnect_client(ctx, sock) + def test_pci_ext_cap_cleanup(): vfu_destroy_ctx(ctx) diff --git a/test/py/test_request_errors.py b/test/py/test_request_errors.py index fe6f29e..a734bab 100644 --- a/test/py/test_request_errors.py +++ b/test/py/test_request_errors.py @@ -28,21 +28,20 @@ # from libvfio_user import * -import ctypes as c import errno import os -import sys ctx = None sock = None argsz = len(vfio_irq_set()) + def test_request_errors_setup(): global ctx, sock ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None ret = vfu_pci_init(ctx) assert ret == 0 @@ -55,6 +54,7 @@ def test_request_errors_setup(): sock = connect_client(ctx) + def test_too_small(): # struct vfio_user_header hdr = struct.pack("HHIII", 0xbad1, VFIO_USER_DEVICE_SET_IRQS, @@ -64,6 +64,7 @@ def test_too_small(): vfu_run_ctx(ctx) get_reply(sock, expect=errno.EINVAL) + def test_too_large(): # struct vfio_user_header hdr = struct.pack("HHIII", 0xbad1, VFIO_USER_DEVICE_SET_IRQS, @@ -73,6 +74,7 @@ def test_too_large(): vfu_run_ctx(ctx) get_reply(sock, expect=errno.EINVAL) + def test_unsolicited_reply(): # struct vfio_user_header hdr = struct.pack("HHIII", 0xbad2, VFIO_USER_DEVICE_SET_IRQS, @@ -82,6 +84,7 @@ def test_unsolicited_reply(): vfu_run_ctx(ctx) get_reply(sock, expect=errno.EINVAL) + def test_bad_command(): hdr = vfio_user_header(VFIO_USER_MAX, size=1) @@ -89,12 +92,14 @@ def test_bad_command(): vfu_run_ctx(ctx) get_reply(sock, expect=errno.EINVAL) + def test_no_payload(): hdr = vfio_user_header(VFIO_USER_DEVICE_SET_IRQS, size=0) sock.send(hdr) vfu_run_ctx(ctx) get_reply(sock, expect=errno.EINVAL) + def test_bad_request_closes_fds(): payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER | VFIO_IRQ_SET_DATA_BOOL, index=VFU_DEV_MSIX_IRQ, @@ -120,5 +125,6 @@ def test_bad_request_closes_fds(): os.close(fd1) os.close(fd2) + def test_request_errors_cleanup(): vfu_destroy_ctx(ctx) diff --git a/test/py/test_setup_region.py b/test/py/test_setup_region.py index 0e1ff7f..ac6dc03 100644 --- a/test/py/test_setup_region.py +++ b/test/py/test_setup_region.py @@ -34,50 +34,56 @@ import tempfile ctx = None + def test_device_set_irqs_setup(): global ctx ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB) - assert ctx != None + assert ctx is not None + def test_setup_region_bad_flags(): - ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x10000, - flags=0x400) + ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, + size=0x10000, flags=0x400) assert ret == -1 assert c.get_errno() == errno.EINVAL - ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x10000, - flags=0) + ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, + size=0x10000, flags=0) assert ret == -1 assert c.get_errno() == errno.EINVAL + def test_setup_region_bad_mmap_areas(): f = tempfile.TemporaryFile() f.truncate(65536) - mmap_areas = [ (0x2000, 0x1000), (0x4000, 0x2000) ] + mmap_areas = [(0x2000, 0x1000), (0x4000, 0x2000)] - ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x10000, + ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, + size=0x10000, flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM), mmap_areas=mmap_areas, nr_mmap_areas=0, fd=f.fileno()) assert ret == -1 assert c.get_errno() == errno.EINVAL - ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x10000, + ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, + size=0x10000, flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM), mmap_areas=None, nr_mmap_areas=1, fd=f.fileno()) assert ret == -1 assert c.get_errno() == errno.EINVAL - ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x10000, + ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, + size=0x10000, flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM), mmap_areas=mmap_areas, fd=-1) assert ret == -1 assert c.get_errno() == errno.EINVAL - mmap_areas = [ (0x2000, 0x1000), (0x4000, 0x2000) ] + mmap_areas = [(0x2000, 0x1000), (0x4000, 0x2000)] ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x5000, flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM), @@ -85,6 +91,7 @@ def test_setup_region_bad_mmap_areas(): assert ret == -1 assert c.get_errno() == errno.EINVAL + def test_setup_region_bad_index(): ret = vfu_setup_region(ctx, index=-2, size=0x10000, flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM)) @@ -96,12 +103,14 @@ def test_setup_region_bad_index(): assert ret == -1 assert c.get_errno() == errno.EINVAL + def test_setup_region_bad_pci(): ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_CFG_REGION_IDX, size=0x1000, flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM)) assert ret == -1 assert c.get_errno() == errno.EINVAL + def test_setup_region_bad_migr(): ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_MIGR_REGION_IDX, size=512, flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM)) @@ -117,7 +126,7 @@ def test_setup_region_bad_migr(): assert ret == -1 assert c.get_errno() == errno.EINVAL - mmap_areas = [ (0x0, 0x1000), (0x1000, 0x1000) ] + mmap_areas = [(0x0, 0x1000), (0x1000, 0x1000)] ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_MIGR_REGION_IDX, size=0x2000, flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM), @@ -125,6 +134,7 @@ def test_setup_region_bad_migr(): assert ret == -1 assert c.get_errno() == errno.EINVAL + def test_setup_region_cfg_always_cb_nocb(): ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_CFG_REGION_IDX, size=PCI_CFG_SPACE_EXP_SIZE, cb=None, @@ -133,6 +143,7 @@ def test_setup_region_cfg_always_cb_nocb(): assert ret == -1 assert c.get_errno() == errno.EINVAL + @vfu_region_access_cb_t def pci_cfg_region_cb(ctx, buf, count, offset, is_write): if not is_write: @@ -141,6 +152,7 @@ def pci_cfg_region_cb(ctx, buf, count, offset, is_write): return count + def test_setup_region_cfg_always_cb(): global ctx @@ -155,10 +167,12 @@ def test_setup_region_cfg_always_cb(): sock = connect_client(ctx) - payload = read_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=0, count=2) + payload = read_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, + offset=0, count=2) assert payload == b'\xcc\xcc' disconnect_client(ctx, sock) + def test_setup_region_cleanup(): vfu_destroy_ctx(ctx) diff --git a/test/py/test_vfu_create_ctx.py b/test/py/test_vfu_create_ctx.py index 439ff69..cb1448f 100644 --- a/test/py/test_vfu_create_ctx.py +++ b/test/py/test_vfu_create_ctx.py @@ -31,24 +31,28 @@ from libvfio_user import * import ctypes as c import errno + def test_vfu_create_ctx_bad_trans(): ctx = vfu_create_ctx(trans=VFU_TRANS_SOCK + 1) - assert ctx == None + assert ctx is None assert c.get_errno() == errno.ENOTSUP + def test_vfu_create_ctx_bad_flags(): ctx = vfu_create_ctx(flags=999) - assert ctx == None + assert ctx is None assert c.get_errno() == errno.EINVAL + def test_vfu_create_ctx_bad_dev_type(): ctx = vfu_create_ctx(dev_type=VFU_DEV_TYPE_PCI + 4) - assert ctx == None + assert ctx is None assert c.get_errno() == errno.ENOTSUP + def test_vfu_create_ctx_default(): ctx = vfu_create_ctx() - assert ctx != None + assert ctx is not None ret = vfu_realize_ctx(ctx) assert ret == 0 diff --git a/test/py/test_vfu_realize_ctx.py b/test/py/test_vfu_realize_ctx.py index 9b863cb..de949aa 100644 --- a/test/py/test_vfu_realize_ctx.py +++ b/test/py/test_vfu_realize_ctx.py @@ -31,9 +31,10 @@ import ctypes as c import errno from libvfio_user import * + def test_vfu_realize_ctx_twice(): ctx = vfu_create_ctx() - assert ctx != None + assert ctx is not None ret = vfu_realize_ctx(ctx) assert ret == 0 @@ -43,9 +44,10 @@ def test_vfu_realize_ctx_twice(): vfu_destroy_ctx(ctx) + def test_vfu_unrealized_ctx(): ctx = vfu_create_ctx() - assert ctx != None + assert ctx is not None ret = vfu_run_ctx(ctx) assert ret == -1 @@ -53,18 +55,20 @@ def test_vfu_unrealized_ctx(): vfu_destroy_ctx(ctx) + def test_vfu_realize_ctx_default(): ctx = vfu_create_ctx() - assert ctx != None + assert ctx is not None ret = vfu_realize_ctx(ctx) assert ret == 0 vfu_destroy_ctx(ctx) + def test_vfu_realize_ctx_pci_bars(): ctx = vfu_create_ctx() - assert ctx != None + assert ctx is not None ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR0_REGION_IDX, size=4096, flags=VFU_REGION_FLAG_RW) @@ -83,9 +87,10 @@ def test_vfu_realize_ctx_pci_bars(): vfu_destroy_ctx(ctx) + def test_vfu_realize_ctx_irqs(): ctx = vfu_create_ctx() - assert ctx != None + assert ctx is not None ret = vfu_setup_device_nr_irqs(ctx, VFU_DEV_INTX_IRQ, 1) assert ret == 0 @@ -99,9 +104,10 @@ def test_vfu_realize_ctx_irqs(): vfu_destroy_ctx(ctx) + def test_vfu_realize_ctx_caps(): ctx = vfu_create_ctx() - assert ctx != None + assert ctx is not None ret = vfu_pci_init(ctx) assert ret == 0 |