aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/pull_request.yml6
-rw-r--r--Makefile6
-rw-r--r--test/py/libvfio_user.py172
-rw-r--r--test/py/test_device_get_info.py9
-rw-r--r--test/py/test_device_get_irq_info.py6
-rw-r--r--test/py/test_device_get_region_info.py16
-rw-r--r--test/py/test_device_get_region_info_zero_size.py9
-rw-r--r--test/py/test_device_get_region_io_fds.py124
-rw-r--r--test/py/test_device_set_irqs.py31
-rw-r--r--test/py/test_dirty_pages.py23
-rw-r--r--test/py/test_dma_map.py9
-rw-r--r--test/py/test_dma_unmap.py15
-rw-r--r--test/py/test_irq_trigger.py9
-rw-r--r--test/py/test_map_unmap_sg.py7
-rw-r--r--test/py/test_migration.py3
-rw-r--r--test/py/test_negotiate.py31
-rw-r--r--test/py/test_pci_caps.py75
-rw-r--r--test/py/test_pci_ext_caps.py28
-rw-r--r--test/py/test_request_errors.py12
-rw-r--r--test/py/test_setup_region.py38
-rw-r--r--test/py/test_vfu_create_ctx.py12
-rw-r--r--test/py/test_vfu_realize_ctx.py18
22 files changed, 454 insertions, 205 deletions
diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml
index c066619..b7ed789 100644
--- a/.github/workflows/pull_request.yml
+++ b/.github/workflows/pull_request.yml
@@ -10,7 +10,7 @@ jobs:
run: |
sudo apt-get update
sudo apt-get -y install libjson-c-dev libcmocka-dev clang valgrind \
- python3-pytest debianutils
+ python3-pytest debianutils flake8
make pre-push VERBOSE=1
ubuntu-18:
timeout-minutes: 5
@@ -19,6 +19,7 @@ jobs:
- uses: actions/checkout@v2
- name: pre-push
run: |
+ # NB: no working flake8
sudo apt update
sudo apt-get -y install libjson-c-dev libcmocka-dev clang valgrind \
python3-pytest debianutils
@@ -31,6 +32,7 @@ jobs:
- uses: actions/checkout@v2
- name: pre-push
run: |
+ # NB: no working flake8
yum -y install make gcc-4.8.5 epel-release pciutils
yum -y install clang cmake json-c-devel libcmocka-devel \
openssl-devel valgrind python36-pytest which
@@ -45,5 +47,5 @@ jobs:
run: |
dnf -y install --releasever=34 \
gcc make clang cmake json-c-devel libcmocka-devel openssl-devel \
- pciutils diffutils valgrind python3-pytest which
+ pciutils diffutils valgrind python3-pytest python3-flake8 which
make pre-push VERBOSE=1
diff --git a/Makefile b/Makefile
index 11900cb..60e17bb 100644
--- a/Makefile
+++ b/Makefile
@@ -54,7 +54,8 @@ endif
GIT_SHA = $(shell git rev-parse --short HEAD)
CMAKE = $(shell bash -c "command -v cmake3 cmake" | head -1)
-RSTLINT= $(shell bash -c "command -v restructuredtext-lint /bin/true" | head -1)
+RSTLINT = $(shell bash -c "command -v restructuredtext-lint /bin/true" | head -1)
+FLAKE8 = $(shell bash -c "command -v flake8 /bin/true" | head -1)
BUILD_DIR_BASE = $(CURDIR)/build
BUILD_DIR = $(BUILD_DIR_BASE)/$(BUILD_TYPE)
@@ -65,7 +66,6 @@ INSTALL_PREFIX ?= /usr/local
.PHONY: pre-push clean realclean tags gcov
all install: $(BUILD_DIR)/Makefile
- $(RSTLINT) docs/vfio-user.rst
+$(MAKE) -C $(BUILD_DIR) $@
#
@@ -126,6 +126,8 @@ endif
cd $(BUILD_DIR)/test; ctest --verbose
pre-push: realclean
+ $(RSTLINT) docs/vfio-user.rst
+ $(FLAKE8) --extend-ignore=F405,F403,E128,E131,E127 $$(find . -name '*.py')
make test WITH_ASAN=1
make realclean
make test CC=clang BUILD_TYPE=rel
diff --git a/test/py/libvfio_user.py b/test/py/libvfio_user.py
index 5cba324..77b3b67 100644
--- a/test/py/libvfio_user.py
+++ b/test/py/libvfio_user.py
@@ -31,7 +31,6 @@
# Note that we don't use enum here, as class.value is a little verbose
#
-from collections import namedtuple
from types import SimpleNamespace
import ctypes as c
import array
@@ -39,7 +38,6 @@ import errno
import json
import mmap
import os
-import pathlib
import socket
import struct
import syslog
@@ -101,7 +99,7 @@ VFIO_IRQ_SET_ACTION_TRIGGER = (1 << 5)
VFIO_DMA_UNMAP_FLAG_ALL = (1 << 1)
VFIO_DEVICE_STATE_STOP = (0)
-VFIO_DEVICE_STATE_RUNNING = (1 << 0)
+VFIO_DEVICE_STATE_RUNNING = (1 << 0)
VFIO_DEVICE_STATE_SAVING = (1 << 1)
VFIO_DEVICE_STATE_RESUMING = (1 << 2)
VFIO_DEVICE_STATE_MASK = ((1 << 3) - 1)
@@ -130,21 +128,21 @@ MAX_DMA_REGIONS = 16
MAX_DMA_SIZE = (8 * ONE_TB)
# enum vfio_user_command
-VFIO_USER_VERSION = 1
-VFIO_USER_DMA_MAP = 2
-VFIO_USER_DMA_UNMAP = 3
-VFIO_USER_DEVICE_GET_INFO = 4
-VFIO_USER_DEVICE_GET_REGION_INFO = 5
-VFIO_USER_DEVICE_GET_REGION_IO_FDS = 6
-VFIO_USER_DEVICE_GET_IRQ_INFO = 7
-VFIO_USER_DEVICE_SET_IRQS = 8
-VFIO_USER_REGION_READ = 9
-VFIO_USER_REGION_WRITE = 10
-VFIO_USER_DMA_READ = 11
-VFIO_USER_DMA_WRITE = 12
-VFIO_USER_DEVICE_RESET = 13
-VFIO_USER_DIRTY_PAGES = 14
-VFIO_USER_MAX = 15
+VFIO_USER_VERSION = 1
+VFIO_USER_DMA_MAP = 2
+VFIO_USER_DMA_UNMAP = 3
+VFIO_USER_DEVICE_GET_INFO = 4
+VFIO_USER_DEVICE_GET_REGION_INFO = 5
+VFIO_USER_DEVICE_GET_REGION_IO_FDS = 6
+VFIO_USER_DEVICE_GET_IRQ_INFO = 7
+VFIO_USER_DEVICE_SET_IRQS = 8
+VFIO_USER_REGION_READ = 9
+VFIO_USER_REGION_WRITE = 10
+VFIO_USER_DMA_READ = 11
+VFIO_USER_DMA_WRITE = 12
+VFIO_USER_DEVICE_RESET = 13
+VFIO_USER_DIRTY_PAGES = 14
+VFIO_USER_MAX = 15
VFIO_USER_F_TYPE_COMMAND = 0
VFIO_USER_F_TYPE_REPLY = 1
@@ -157,16 +155,16 @@ VFU_PCI_DEV_BAR2_REGION_IDX = 2
VFU_PCI_DEV_BAR3_REGION_IDX = 3
VFU_PCI_DEV_BAR4_REGION_IDX = 4
VFU_PCI_DEV_BAR5_REGION_IDX = 5
-VFU_PCI_DEV_ROM_REGION_IDX = 6
-VFU_PCI_DEV_CFG_REGION_IDX = 7
-VFU_PCI_DEV_VGA_REGION_IDX = 8
+VFU_PCI_DEV_ROM_REGION_IDX = 6
+VFU_PCI_DEV_CFG_REGION_IDX = 7
+VFU_PCI_DEV_VGA_REGION_IDX = 8
VFU_PCI_DEV_MIGR_REGION_IDX = 9
-VFU_PCI_DEV_NUM_REGIONS = 10
+VFU_PCI_DEV_NUM_REGIONS = 10
-VFU_REGION_FLAG_READ = 1
+VFU_REGION_FLAG_READ = 1
VFU_REGION_FLAG_WRITE = 2
VFU_REGION_FLAG_RW = (VFU_REGION_FLAG_READ | VFU_REGION_FLAG_WRITE)
-VFU_REGION_FLAG_MEM = 4
+VFU_REGION_FLAG_MEM = 4
VFU_REGION_FLAG_ALWAYS_CB = 8
VFIO_USER_F_DMA_REGION_READ = (1 << 0)
@@ -184,10 +182,10 @@ VFIO_USER_IO_FD_TYPE_IOREGIONFD = 1
# enum vfu_dev_irq_type
VFU_DEV_INTX_IRQ = 0
-VFU_DEV_MSI_IRQ = 1
+VFU_DEV_MSI_IRQ = 1
VFU_DEV_MSIX_IRQ = 2
-VFU_DEV_ERR_IRQ = 3
-VFU_DEV_REQ_IRQ = 4
+VFU_DEV_ERR_IRQ = 3
+VFU_DEV_REQ_IRQ = 4
VFU_DEV_NUM_IRQS = 5
# enum vfu_reset_type
@@ -197,9 +195,9 @@ VFU_RESET_PCI_FLR = 2
# vfu_pci_type_t
VFU_PCI_TYPE_CONVENTIONAL = 0
-VFU_PCI_TYPE_PCI_X_1 = 1
-VFU_PCI_TYPE_PCI_X_2 = 2
-VFU_PCI_TYPE_EXPRESS = 3
+VFU_PCI_TYPE_PCI_X_1 = 1
+VFU_PCI_TYPE_PCI_X_2 = 2
+VFU_PCI_TYPE_EXPRESS = 3
VFU_CAP_FLAG_EXTENDED = (1 << 0)
VFU_CAP_FLAG_CALLBACK = (1 << 1)
@@ -219,6 +217,7 @@ libc = c.CDLL("libc.so.6", use_errno=True)
# Structures
#
+
class Structure(c.Structure):
def __len__(self):
"""Handy method to return length in bytes."""
@@ -230,6 +229,7 @@ class Structure(c.Structure):
obj = cls.from_buffer_copy(buf)
return obj, buf[c.sizeof(obj):]
+
class vfu_bar_t(c.Union):
_pack_ = 1
_fields_ = [
@@ -237,6 +237,7 @@ class vfu_bar_t(c.Union):
("io", c.c_int32)
]
+
class vfu_pci_hdr_intr_t(Structure):
_pack_ = 1
_fields_ = [
@@ -244,6 +245,7 @@ class vfu_pci_hdr_intr_t(Structure):
("ipin", c.c_byte)
]
+
class vfu_pci_hdr_t(Structure):
_pack_ = 1
_fields_ = [
@@ -269,12 +271,14 @@ class vfu_pci_hdr_t(Structure):
("mlat", c.c_byte)
]
+
class iovec_t(Structure):
_fields_ = [
("iov_base", c.c_void_p),
("iov_len", c.c_int32)
]
+
class vfio_irq_info(Structure):
_pack_ = 1
_fields_ = [
@@ -284,6 +288,7 @@ class vfio_irq_info(Structure):
("count", c.c_uint32),
]
+
class vfio_irq_set(Structure):
_pack_ = 1
_fields_ = [
@@ -294,6 +299,7 @@ class vfio_irq_set(Structure):
("count", c.c_uint32),
]
+
class vfio_user_device_info(Structure):
_pack_ = 1
_fields_ = [
@@ -303,6 +309,7 @@ class vfio_user_device_info(Structure):
("num_irqs", c.c_uint32),
]
+
class vfio_region_info(Structure):
_pack_ = 1
_fields_ = [
@@ -314,6 +321,7 @@ class vfio_region_info(Structure):
("offset", c.c_uint64),
]
+
class vfio_region_info_cap_type(Structure):
_pack_ = 1
_fields_ = [
@@ -324,6 +332,7 @@ class vfio_region_info_cap_type(Structure):
("subtype", c.c_uint32),
]
+
class vfio_region_info_cap_sparse_mmap(Structure):
_pack_ = 1
_fields_ = [
@@ -334,6 +343,7 @@ class vfio_region_info_cap_sparse_mmap(Structure):
("reserved", c.c_uint32),
]
+
class vfio_region_sparse_mmap_area(Structure):
_pack_ = 1
_fields_ = [
@@ -341,6 +351,7 @@ class vfio_region_sparse_mmap_area(Structure):
("size", c.c_uint64),
]
+
class vfio_user_region_io_fds_request(Structure):
_pack_ = 1
_fields_ = [
@@ -350,6 +361,7 @@ class vfio_user_region_io_fds_request(Structure):
("count", c.c_uint32)
]
+
class vfio_user_sub_region_ioeventfd(Structure):
_pack_ = 1
_fields_ = [
@@ -362,6 +374,7 @@ class vfio_user_sub_region_ioeventfd(Structure):
("datamatch", c.c_uint64)
]
+
class vfio_user_sub_region_ioregionfd(Structure):
_pack_ = 1
_fields_ = [
@@ -374,6 +387,7 @@ class vfio_user_sub_region_ioregionfd(Structure):
("user_data", c.c_uint64)
]
+
class vfio_user_sub_region_io_fd(c.Union):
_pack_ = 1
_fields_ = [
@@ -381,6 +395,7 @@ class vfio_user_sub_region_io_fd(c.Union):
("sub_region_ioregionfd", vfio_user_sub_region_ioregionfd)
]
+
class vfio_user_region_io_fds_reply(Structure):
_pack_ = 1
_fields_ = [
@@ -390,6 +405,7 @@ class vfio_user_region_io_fds_reply(Structure):
("count", c.c_uint32)
]
+
class vfio_user_dma_map(Structure):
_pack_ = 1
_fields_ = [
@@ -400,6 +416,7 @@ class vfio_user_dma_map(Structure):
("size", c.c_uint64),
]
+
class vfio_user_dma_unmap(Structure):
_pack_ = 1
_fields_ = [
@@ -409,6 +426,7 @@ class vfio_user_dma_unmap(Structure):
("size", c.c_uint64),
]
+
class vfu_dma_info_t(Structure):
_fields_ = [
("iova", iovec_t),
@@ -418,6 +436,7 @@ class vfu_dma_info_t(Structure):
("prot", c.c_uint32)
]
+
class vfio_user_dirty_pages(Structure):
_pack_ = 1
_fields_ = [
@@ -425,6 +444,7 @@ class vfio_user_dirty_pages(Structure):
("flags", c.c_uint32)
]
+
class vfio_user_bitmap(Structure):
_pack_ = 1
_fields_ = [
@@ -432,6 +452,7 @@ class vfio_user_bitmap(Structure):
("size", c.c_uint64)
]
+
class vfio_user_bitmap_range(Structure):
_pack_ = 1
_fields_ = [
@@ -440,6 +461,7 @@ class vfio_user_bitmap_range(Structure):
("bitmap", vfio_user_bitmap)
]
+
transition_cb_t = c.CFUNCTYPE(c.c_int, c.c_void_p, c.c_int, use_errno=True)
get_pending_bytes_cb_t = c.CFUNCTYPE(c.c_uint64, c.c_void_p)
prepare_data_cb_t = c.CFUNCTYPE(c.c_void_p, c.POINTER(c.c_uint64),
@@ -449,6 +471,7 @@ read_data_cb_t = c.CFUNCTYPE(c.c_ssize_t, c.c_void_p, c.c_void_p,
write_data_cb_t = c.CFUNCTYPE(c.c_ssize_t, c.c_void_p, c.c_uint64)
data_written_cb_t = c.CFUNCTYPE(c.c_int, c.c_void_p, c.c_uint64)
+
class vfu_migration_callbacks_t(Structure):
_fields_ = [
("version", c.c_int),
@@ -460,6 +483,7 @@ class vfu_migration_callbacks_t(Structure):
("data_written", data_written_cb_t),
]
+
class dma_sg_t(Structure):
_fields_ = [
("dma_addr", c.c_void_p),
@@ -467,7 +491,7 @@ class dma_sg_t(Structure):
("length", c.c_uint64),
("offset", c.c_uint64),
("writeable", c.c_bool),
- ("le_next", c.c_void_p), # FIXME add struct for LIST_ENTRY
+ ("le_next", c.c_void_p),
("le_prev", c.c_void_p),
]
@@ -475,6 +499,7 @@ class dma_sg_t(Structure):
# Util functions
#
+
lib.vfu_create_ctx.argtypes = (c.c_int, c.c_char_p, c.c_int,
c.c_void_p, c.c_int)
lib.vfu_create_ctx.restype = (c.c_void_p)
@@ -502,7 +527,8 @@ lib.vfu_pci_find_next_capability.argtypes = (c.c_void_p, c.c_bool, c.c_ulong,
c.c_int)
lib.vfu_pci_find_next_capability.restype = (c.c_ulong)
lib.vfu_irq_trigger.argtypes = (c.c_void_p, c.c_uint)
-vfu_dma_register_cb_t = c.CFUNCTYPE(None, c.c_void_p, c.POINTER(vfu_dma_info_t))
+vfu_dma_register_cb_t = c.CFUNCTYPE(None, c.c_void_p,
+ c.POINTER(vfu_dma_info_t))
vfu_dma_unregister_cb_t = c.CFUNCTYPE(c.c_int, c.c_void_p,
c.POINTER(vfu_dma_info_t))
lib.vfu_setup_device_dma.argtypes = (c.c_void_p, vfu_dma_register_cb_t,
@@ -527,23 +553,28 @@ def to_byte(val):
"""Cast an int to a byte value."""
return val.to_bytes(1, 'little')
+
def skip(fmt, buf):
"""Return the data remaining after skipping the given elements."""
return buf[struct.calcsize(fmt):]
+
def parse_json(json_str):
"""Parse JSON into an object with attributes (instead of using a dict)."""
return json.loads(json_str, object_hook=lambda d: SimpleNamespace(**d))
+
def eventfd(initval=0, flags=0):
libc.eventfd.argtypes = (c.c_uint, c.c_int)
return libc.eventfd(initval, flags)
+
def connect_sock():
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(SOCK_PATH)
return sock
+
def connect_client(ctx):
sock = connect_sock()
@@ -557,6 +588,7 @@ def connect_client(ctx):
payload = get_reply(sock, expect=0)
return sock
+
def disconnect_client(ctx, sock):
sock.close()
@@ -565,6 +597,7 @@ def disconnect_client(ctx, sock):
assert ret == -1
assert c.get_errno() == errno.ENOTCONN
+
def get_reply(sock, expect=0):
buf = sock.recv(4096)
(msg_id, cmd, msg_size, flags, errno) = struct.unpack("HHIII", buf[0:16])
@@ -572,6 +605,7 @@ def get_reply(sock, expect=0):
assert errno == expect
return buf[16:]
+
def msg(ctx, sock, cmd, payload, expect=0, fds=None, rsp=True):
"""Round trip a request and reply to the server."""
hdr = vfio_user_header(cmd, size=len(payload))
@@ -589,6 +623,7 @@ def msg(ctx, sock, cmd, payload, expect=0, fds=None, rsp=True):
return
return get_reply(sock, expect=expect)
+
def get_reply_fds(sock, expect=0):
"""Receives a message from a socket and pulls the returned file descriptors
out of the message."""
@@ -599,16 +634,17 @@ def get_reply_fds(sock, expect=0):
data[0:16])
assert errno == expect
- cmsg_level, cmsg_type, packed_fd = ancillary[0] if len(ancillary) != 0 else \
- (0, 0, [])
+ cmsg_level, cmsg_type, packed_fd = ancillary[0] if len(ancillary) != 0 \
+ else (0, 0, [])
unpacked_fds = []
for i in range(0, len(packed_fd), 4):
- [unpacked_fd] = struct.unpack_from("i", packed_fd, offset = i)
+ [unpacked_fd] = struct.unpack_from("i", packed_fd, offset=i)
unpacked_fds.append(unpacked_fd)
assert len(packed_fd)/4 == len(unpacked_fds)
assert (msg_flags & VFIO_USER_F_TYPE_REPLY) != 0
return (unpacked_fds, data[16:])
+
def msg_fds(ctx, sock, cmd, payload, expect=0, fds=None):
"""Round trip a request and reply to the server. With the server returning
new fds"""
@@ -623,18 +659,22 @@ def msg_fds(ctx, sock, cmd, payload, expect=0, fds=None):
vfu_run_ctx(ctx)
return get_reply_fds(sock, expect=expect)
+
def get_pci_header(ctx):
ptr = lib.vfu_pci_get_config_space(ctx)
return c.cast(ptr, c.POINTER(vfu_pci_hdr_t)).contents
+
def get_pci_cfg_space(ctx):
ptr = lib.vfu_pci_get_config_space(ctx)
return c.cast(ptr, c.POINTER(c.c_char))[0:PCI_CFG_SPACE_SIZE]
+
def get_pci_ext_cfg_space(ctx):
ptr = lib.vfu_pci_get_config_space(ctx)
return c.cast(ptr, c.POINTER(c.c_char))[0:PCI_CFG_SPACE_EXP_SIZE]
+
def read_pci_cfg_space(ctx, buf, count, offset, extended=False):
space = get_pci_ext_cfg_space(ctx) if extended else get_pci_cfg_space(ctx)
@@ -642,6 +682,7 @@ def read_pci_cfg_space(ctx, buf, count, offset, extended=False):
buf[i] = space[offset+i]
return count
+
def write_pci_cfg_space(ctx, buf, count, offset, extended=False):
max_offset = PCI_CFG_SPACE_EXP_SIZE if extended else PCI_CFG_SPACE_SIZE
@@ -653,6 +694,7 @@ def write_pci_cfg_space(ctx, buf, count, offset, extended=False):
space[offset+i] = buf[i]
return count
+
def access_region(ctx, sock, is_write, region, offset, count,
data=None, expect=0, rsp=True):
# struct vfio_user_region_access
@@ -669,12 +711,16 @@ def access_region(ctx, sock, is_write, region, offset, count,
return skip("QII", result)
+
def write_region(ctx, sock, region, offset, count, data, expect=0, rsp=True):
access_region(ctx, sock, True, region, offset, count, data, expect=expect,
rsp=rsp)
+
def read_region(ctx, sock, region, offset, count, expect=0):
- return access_region(ctx, sock, False, region, offset, count, expect=expect)
+ return access_region(ctx, sock, False, region, offset, count,
+ expect=expect)
+
def ext_cap_hdr(buf, offset):
"""Read an extended cap header."""
@@ -684,18 +730,21 @@ def ext_cap_hdr(buf, offset):
cap_next >>= 4
return cap_id, cap_next
+
@vfu_dma_register_cb_t
def dma_register(ctx, info):
pass
+
@vfu_dma_unregister_cb_t
def dma_unregister(ctx, info):
pass
return 0
+
def prepare_ctx_for_dma():
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_pci_init(ctx)
assert ret == 0
@@ -712,12 +761,15 @@ def prepare_ctx_for_dma():
# Library wrappers
#
+
msg_id = 1
+
@c.CFUNCTYPE(None, c.c_void_p, c.c_int, c.c_char_p)
def log(ctx, level, msg):
print(msg.decode("utf-8"))
+
def vfio_user_header(cmd, size, no_reply=False, error=False, error_no=0):
global msg_id
@@ -728,6 +780,7 @@ def vfio_user_header(cmd, size, no_reply=False, error=False, error_no=0):
return buf
+
def vfu_create_ctx(trans=VFU_TRANS_SOCK, sock_path=SOCK_PATH, flags=0,
private=None, dev_type=VFU_DEV_TYPE_PCI):
if os.path.exists(sock_path):
@@ -740,9 +793,11 @@ def vfu_create_ctx(trans=VFU_TRANS_SOCK, sock_path=SOCK_PATH, flags=0,
return ctx
+
def vfu_realize_ctx(ctx):
return lib.vfu_realize_ctx(ctx)
+
def vfu_attach_ctx(ctx, expect=0):
ret = lib.vfu_attach_ctx(ctx)
if expect == 0:
@@ -752,18 +807,21 @@ def vfu_attach_ctx(ctx, expect=0):
assert c.get_errno() == expect
return ret
+
def vfu_run_ctx(ctx):
return lib.vfu_run_ctx(ctx)
+
def vfu_destroy_ctx(ctx):
lib.vfu_destroy_ctx(ctx)
ctx = None
if os.path.exists(SOCK_PATH):
os.remove(SOCK_PATH)
+
def vfu_setup_region(ctx, index, size, cb=None, flags=0,
mmap_areas=None, nr_mmap_areas=None, fd=-1, offset=0):
- assert ctx != None
+ assert ctx is not None
c_mmap_areas = None
@@ -790,50 +848,59 @@ def vfu_setup_region(ctx, index, size, cb=None, flags=0,
return ret
+
def vfu_setup_device_reset_cb(ctx, cb):
- assert ctx != None
+ assert ctx is not None
return lib.vfu_setup_device_reset_cb(ctx, c.cast(cb, vfu_reset_cb_t))
+
def vfu_setup_device_nr_irqs(ctx, irqtype, count):
- assert ctx != None
+ assert ctx is not None
return lib.vfu_setup_device_nr_irqs(ctx, irqtype, count)
+
def vfu_pci_init(ctx, pci_type=VFU_PCI_TYPE_EXPRESS,
hdr_type=PCI_HEADER_TYPE_NORMAL):
- assert ctx != None
+ assert ctx is not None
return lib.vfu_pci_init(ctx, pci_type, hdr_type, 0)
+
def vfu_pci_add_capability(ctx, pos, flags, data):
- assert ctx != None
+ assert ctx is not None
databuf = (c.c_byte * len(data)).from_buffer(bytearray(data))
return lib.vfu_pci_add_capability(ctx, pos, flags, databuf)
+
def vfu_pci_find_capability(ctx, extended, cap_id):
- assert ctx != None
+ assert ctx is not None
return lib.vfu_pci_find_capability(ctx, extended, cap_id)
+
def vfu_pci_find_next_capability(ctx, extended, offset, cap_id):
- assert ctx != None
+ assert ctx is not None
return lib.vfu_pci_find_next_capability(ctx, extended, offset, cap_id)
+
def vfu_irq_trigger(ctx, subindex):
- assert ctx != None
+ assert ctx is not None
return lib.vfu_irq_trigger(ctx, subindex)
+
def vfu_setup_device_dma(ctx, register_cb=None, unregister_cb=None):
- assert ctx != None
+ assert ctx is not None
return lib.vfu_setup_device_dma(ctx, c.cast(register_cb,
vfu_dma_register_cb_t),
c.cast(unregister_cb,
vfu_dma_unregister_cb_t))
+
def vfu_setup_device_migration_callbacks(ctx, cbs=None, offset=0):
- assert ctx != None
+ assert ctx is not None
@c.CFUNCTYPE(c.c_int)
def stub():
@@ -851,9 +918,10 @@ def vfu_setup_device_migration_callbacks(ctx, cbs=None, offset=0):
return lib.vfu_setup_device_migration_callbacks(ctx, cbs, offset)
+
def vfu_addr_to_sg(ctx, dma_addr, length, max_sg=1,
prot=(mmap.PROT_READ | mmap.PROT_WRITE)):
- assert ctx != None
+ assert ctx is not None
sg = (dma_sg_t * max_sg)()
@@ -863,13 +931,17 @@ def vfu_addr_to_sg(ctx, dma_addr, length, max_sg=1,
def vfu_map_sg(ctx, sg, iovec, cnt=1, flags=0):
return lib.vfu_map_sg(ctx, sg, iovec, cnt, flags)
+
def vfu_unmap_sg(ctx, sg, iovec, cnt=1):
return lib.vfu_unmap_sg(ctx, sg, iovec, cnt)
+
def vfu_create_ioeventfd(ctx, region_idx, fd, offset, size, flags, datamatch):
- assert ctx != None
+ assert ctx is not None
+
+ return lib.vfu_create_ioeventfd(ctx, region_idx, fd, offset, size,
+ flags, datamatch)
- return lib.vfu_create_ioeventfd(ctx, region_idx, fd, offset, size, flags, datamatch)
def vfu_migr_done(ctx, err):
return lib.vfu_migr_done(ctx, err)
diff --git a/test/py/test_device_get_info.py b/test/py/test_device_get_info.py
index ea9251b..c41d382 100644
--- a/test/py/test_device_get_info.py
+++ b/test/py/test_device_get_info.py
@@ -30,11 +30,12 @@
from libvfio_user import *
import errno
+
def test_device_get_info():
global ctx
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR0_REGION_IDX, size=4096,
flags=VFU_REGION_FLAG_RW)
@@ -56,13 +57,15 @@ def test_device_get_info():
# bad argsz
- payload = vfio_user_device_info(argsz=8, flags=0, num_regions=0, num_irqs=0)
+ payload = vfio_user_device_info(argsz=8, flags=0,
+ num_regions=0, num_irqs=0)
msg(ctx, sock, VFIO_USER_DEVICE_GET_INFO, payload, expect=errno.EINVAL)
# valid with larger argsz
- payload = vfio_user_device_info(argsz=32, flags=0, num_regions=0, num_irqs=0)
+ payload = vfio_user_device_info(argsz=32, flags=0,
+ num_regions=0, num_irqs=0)
result = msg(ctx, sock, VFIO_USER_DEVICE_GET_INFO, payload)
diff --git a/test/py/test_device_get_irq_info.py b/test/py/test_device_get_irq_info.py
index 4d6b3d7..c45ad7f 100644
--- a/test/py/test_device_get_irq_info.py
+++ b/test/py/test_device_get_irq_info.py
@@ -35,11 +35,12 @@ sock = None
argsz = len(vfio_irq_info())
+
def test_device_get_irq_info_setup():
global ctx, sock
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_pci_init(ctx)
assert ret == 0
@@ -56,6 +57,7 @@ def test_device_get_irq_info_setup():
sock = connect_client(ctx)
+
def test_device_get_irq_info_bad_in():
payload = struct.pack("II", 0, 0)
@@ -73,6 +75,7 @@ def test_device_get_irq_info_bad_in():
msg(ctx, sock, VFIO_USER_DEVICE_GET_IRQ_INFO, payload, expect=errno.EINVAL)
+
def test_device_get_irq_info():
# valid with larger argsz
@@ -118,6 +121,7 @@ def test_device_get_irq_info():
assert info.index == VFU_DEV_MSIX_IRQ
assert info.count == 2048
+
def test_device_get_irq_info_cleanup():
disconnect_client(ctx, sock)
diff --git a/test/py/test_device_get_region_info.py b/test/py/test_device_get_region_info.py
index 710efd8..f7e63d2 100644
--- a/test/py/test_device_get_region_info.py
+++ b/test/py/test_device_get_region_info.py
@@ -30,18 +30,18 @@
from libvfio_user import *
import errno
import tempfile
-import os
ctx = None
sock = None
argsz = len(vfio_region_info())
+
def test_device_get_region_info_setup():
global ctx, sock
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR1_REGION_IDX, size=4096,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM))
@@ -50,7 +50,7 @@ def test_device_get_region_info_setup():
f = tempfile.TemporaryFile()
f.truncate(65536)
- mmap_areas = [ (0x2000, 0x1000), (0x4000, 0x2000) ]
+ mmap_areas = [(0x2000, 0x1000), (0x4000, 0x2000)]
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x8000,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM),
@@ -60,7 +60,7 @@ def test_device_get_region_info_setup():
f = tempfile.TemporaryFile()
f.truncate(0x2000)
- mmap_areas = [ (0x1000, 0x1000) ]
+ mmap_areas = [(0x1000, 0x1000)]
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_MIGR_REGION_IDX, size=0x2000,
flags=VFU_REGION_FLAG_RW, mmap_areas=mmap_areas,
@@ -72,6 +72,7 @@ def test_device_get_region_info_setup():
sock = connect_client(ctx)
+
def test_device_get_region_info_short_write():
payload = struct.pack("II", 0, 0)
@@ -79,6 +80,7 @@ def test_device_get_region_info_short_write():
msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_INFO, payload,
expect=errno.EINVAL)
+
def test_device_get_region_info_bad_argsz():
payload = vfio_region_info(argsz=8, flags=0,
@@ -88,6 +90,7 @@ def test_device_get_region_info_bad_argsz():
msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_INFO, payload,
expect=errno.EINVAL)
+
def test_device_get_region_info_bad_index():
payload = vfio_region_info(argsz=argsz, flags=0,
@@ -97,6 +100,7 @@ def test_device_get_region_info_bad_index():
msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_INFO, payload,
expect=errno.EINVAL)
+
def test_device_get_region_info_larger_argsz():
payload = vfio_region_info(argsz=argsz + 8, flags=0,
@@ -117,6 +121,7 @@ def test_device_get_region_info_larger_argsz():
assert info.size == 4096
assert info.offset == 0
+
def test_device_get_region_info_small_argsz_caps():
global sock
@@ -141,6 +146,7 @@ def test_device_get_region_info_small_argsz_caps():
# skip reading the SCM_RIGHTS
disconnect_client(ctx, sock)
+
def test_device_get_region_info_caps():
global sock
@@ -176,6 +182,7 @@ def test_device_get_region_info_caps():
# skip reading the SCM_RIGHTS
disconnect_client(ctx, sock)
+
def test_device_get_region_info_migr():
global sock
@@ -213,5 +220,6 @@ def test_device_get_region_info_migr():
# skip reading the SCM_RIGHTS
disconnect_client(ctx, sock)
+
def test_device_get_region_info_cleanup():
vfu_destroy_ctx(ctx)
diff --git a/test/py/test_device_get_region_info_zero_size.py b/test/py/test_device_get_region_info_zero_size.py
index c0965d3..d80152c 100644
--- a/test/py/test_device_get_region_info_zero_size.py
+++ b/test/py/test_device_get_region_info_zero_size.py
@@ -28,20 +28,18 @@
#
from libvfio_user import *
-import errno
-import tempfile
-import os
ctx = None
sock = None
argsz = len(vfio_region_info())
+
def test_device_get_region_info_setup():
global ctx, sock
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_realize_ctx(ctx)
assert ret == 0
@@ -59,7 +57,8 @@ def test_device_get_region_info_zero_sized_region():
index=index, cap_offset=0,
size=0, offset=0)
- hdr = vfio_user_header(VFIO_USER_DEVICE_GET_REGION_INFO, size=len(payload))
+ hdr = vfio_user_header(VFIO_USER_DEVICE_GET_REGION_INFO,
+ size=len(payload))
sock.send(hdr + payload)
vfu_run_ctx(ctx)
result = get_reply(sock)
diff --git a/test/py/test_device_get_region_io_fds.py b/test/py/test_device_get_region_io_fds.py
index 2179cf7..bffe508 100644
--- a/test/py/test_device_get_region_io_fds.py
+++ b/test/py/test_device_get_region_io_fds.py
@@ -40,16 +40,17 @@ sock = None
fds = []
IOEVENT_SIZE = 8
+
def test_device_get_region_io_fds_setup():
global ctx, sock
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
f = tempfile.TemporaryFile()
f.truncate(65536)
- mmap_areas = [ (0x2000, 0x1000), (0x4000, 0x2000) ]
+ mmap_areas = [(0x2000, 0x1000), (0x4000, 0x2000)]
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR1_REGION_IDX, size=0x8000,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM),
@@ -59,7 +60,7 @@ def test_device_get_region_io_fds_setup():
f = tempfile.TemporaryFile()
f.truncate(65536)
- mmap_areas = [ (0x2000, 0x1000), (0x4000, 0x2000) ]
+ mmap_areas = [(0x2000, 0x1000), (0x4000, 0x2000)]
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x8000,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM),
@@ -74,55 +75,60 @@ def test_device_get_region_io_fds_setup():
assert ret == 0
sock = connect_client(ctx)
- for i in range(0,6):
- tmp = eventfd(0,0)
+ for i in range(0, 6):
+ tmp = eventfd(0, 0)
fds.append(tmp)
assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, tmp,
i * IOEVENT_SIZE, IOEVENT_SIZE, 0, 0) != -1
+
def test_device_get_region_io_fds_bad_flags():
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()) +
- len(vfio_user_sub_region_ioeventfd()) * 5, flags = 1,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0)
+ argsz=len(vfio_user_region_io_fds_reply()) +
+ len(vfio_user_sub_region_ioeventfd()) * 5, flags=1,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0)
msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
expect=errno.EINVAL)
+
def test_device_get_region_io_fds_bad_count():
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()) +
- len(vfio_user_sub_region_ioeventfd()) * 5, flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 1)
+ argsz=len(vfio_user_region_io_fds_reply()) +
+ len(vfio_user_sub_region_ioeventfd()) * 5, flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=1)
msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
expect=errno.EINVAL)
+
def test_device_get_region_io_fds_buffer_too_small():
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()) - 1, flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 1)
+ argsz=len(vfio_user_region_io_fds_reply()) - 1, flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=1)
msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
expect=errno.EINVAL)
+
def test_device_get_region_io_fds_buffer_too_large():
- payload = vfio_user_region_io_fds_request(argsz = SERVER_MAX_DATA_XFER_SIZE
- + 1, flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX,
- count = 1)
+ payload = vfio_user_region_io_fds_request(argsz=SERVER_MAX_DATA_XFER_SIZE
+ + 1, flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX,
+ count=1)
+
+ msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
+ expect=errno.EINVAL)
- msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect =
- errno.EINVAL)
def test_device_get_region_io_fds_no_fds():
- payload = vfio_user_region_io_fds_request(argsz = 512, flags = 0,
- index = VFU_PCI_DEV_BAR1_REGION_IDX, count = 0)
+ payload = vfio_user_region_io_fds_request(argsz=512, flags=0,
+ index=VFU_PCI_DEV_BAR1_REGION_IDX, count=0)
ret = msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect=0)
@@ -136,19 +142,20 @@ def test_device_get_region_io_fds_no_fds():
def test_device_get_region_io_fds_no_regions_setup():
- payload = vfio_user_region_io_fds_request(argsz = 512, flags = 0,
- index = VFU_PCI_DEV_BAR3_REGION_IDX, count = 0)
+ payload = vfio_user_region_io_fds_request(argsz=512, flags=0,
+ index=VFU_PCI_DEV_BAR3_REGION_IDX, count=0)
+
+ msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
+ expect=errno.EINVAL)
- ret = msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
- expect = errno.EINVAL)
def test_device_get_region_io_fds_region_no_mmap():
- payload = vfio_user_region_io_fds_request(argsz = 512, flags = 0,
- index = VFU_PCI_DEV_BAR5_REGION_IDX, count = 0)
+ payload = vfio_user_region_io_fds_request(argsz=512, flags=0,
+ index=VFU_PCI_DEV_BAR5_REGION_IDX, count=0)
ret = msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
- expect = 0)
+ expect=0)
reply, ret = vfio_user_region_io_fds_reply.pop_from_buffer(ret)
@@ -157,20 +164,22 @@ def test_device_get_region_io_fds_region_no_mmap():
assert reply.flags == 0
assert reply.index == VFU_PCI_DEV_BAR5_REGION_IDX
+
def test_device_get_region_io_fds_region_out_of_range():
- payload = vfio_user_region_io_fds_request(argsz = 512, flags = 0,
- index = 512, count = 0)
+ payload = vfio_user_region_io_fds_request(argsz=512, flags=0,
+ index=512, count=0)
+
+ msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
+ expect=errno.EINVAL)
- msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect =
- errno.EINVAL)
def test_device_get_region_io_fds_fds_read_write():
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()) +
- len(vfio_user_sub_region_ioeventfd()) * 10, flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0)
+ argsz=len(vfio_user_region_io_fds_reply()) +
+ len(vfio_user_sub_region_ioeventfd()) * 10, flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0)
newfds, ret = msg_fds(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS,
payload, expect=0)
@@ -188,18 +197,19 @@ def test_device_get_region_io_fds_fds_read_write():
# Server
for i in range(0, len(newfds)):
out = os.read(newfds[i], IOEVENT_SIZE)
- [out] = struct.unpack("@Q",out)
+ [out] = struct.unpack("@Q", out)
assert out == 10
for i in newfds:
os.close(i)
+
def test_device_get_region_io_fds_full():
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()) +
- len(vfio_user_sub_region_ioeventfd()) * 6, flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0)
+ argsz=len(vfio_user_region_io_fds_reply()) +
+ len(vfio_user_sub_region_ioeventfd()) * 6, flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0)
newfds, ret = msg_fds(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS,
payload, expect=0)
@@ -214,7 +224,7 @@ def test_device_get_region_io_fds_full():
for i in range(0, reply.count):
out = os.read(newfds[ioevents[i].fd_index], ioevent.size)
- [out] = struct.unpack("@Q",out)
+ [out] = struct.unpack("@Q", out)
assert out == 1
assert ioevents[i].size == IOEVENT_SIZE
assert ioevents[i].offset == 40 - (IOEVENT_SIZE * i)
@@ -223,11 +233,12 @@ def test_device_get_region_io_fds_full():
for i in newfds:
os.close(i)
+
def test_device_get_region_io_fds_fds_read_write_nothing():
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()), flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0)
+ argsz=len(vfio_user_region_io_fds_reply()), flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0)
newfds, ret = msg_fds(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS,
payload, expect=0)
@@ -237,21 +248,22 @@ def test_device_get_region_io_fds_fds_read_write_nothing():
assert reply.argsz == len(vfio_user_region_io_fds_reply()) + \
len(vfio_user_sub_region_ioeventfd()) * 6
+
def test_device_get_region_io_fds_fds_read_write_dupe_fd():
""" Test here to show that we can return mutliple sub regions with the same
- fd_index. fd_index points to the list of fds returned from the socket as
- returned by msg_fds. """
+ fd_index. fd_index points to the list of fds returned from the socket
+ as returned by msg_fds. """
- t = eventfd(0,0)
+ t = eventfd(0, 0)
assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, t, 6 *
IOEVENT_SIZE, IOEVENT_SIZE, 0, 0) != -1
assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, t, 7 *
IOEVENT_SIZE, IOEVENT_SIZE, 0, 0) != -1
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()) +
- len(vfio_user_sub_region_ioeventfd()) * 8, flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0)
+ argsz=len(vfio_user_region_io_fds_reply()) +
+ len(vfio_user_sub_region_ioeventfd()) * 8, flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0)
newfds, ret = msg_fds(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS,
payload, expect=0)
@@ -271,7 +283,7 @@ def test_device_get_region_io_fds_fds_read_write_dupe_fd():
for i in range(2, 8):
out = os.read(newfds[ioevents[i].fd_index], ioevent.size)
- [out] = struct.unpack("@Q",out)
+ [out] = struct.unpack("@Q", out)
assert out == 1
assert ioevents[i].size == IOEVENT_SIZE
assert ioevents[i].offset == 56 - (IOEVENT_SIZE * i)
@@ -283,30 +295,32 @@ def test_device_get_region_io_fds_fds_read_write_dupe_fd():
os.write(newfds[ioevents[0].fd_index], c.c_ulonglong(1))
out = os.read(newfds[ioevents[1].fd_index], ioevent.size)
- [out] = struct.unpack("@Q",out)
+ [out] = struct.unpack("@Q", out)
assert out == 1
os.write(newfds[ioevents[1].fd_index], c.c_ulonglong(1))
out = os.read(newfds[ioevents[0].fd_index], ioevent.size)
- [out] = struct.unpack("@Q",out)
+ [out] = struct.unpack("@Q", out)
assert out == 1
os.write(newfds[ioevents[0].fd_index], c.c_ulonglong(1))
out = os.read(newfds[ioevents[1].fd_index], ioevent.size)
- [out] = struct.unpack("@Q",out)
+ [out] = struct.unpack("@Q", out)
assert out == 1
for i in newfds:
os.close(i)
+
def test_device_get_region_io_fds_ioeventfd_invalid_size():
- t = eventfd(0,0)
- assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, t, 0x8000
- -2048, 4096, 0, 0) == -1
+ t = eventfd(0, 0)
+ assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, t,
+ 0x8000 - 0x800, 4096, 0, 0) == -1
os.close(t)
+
def test_device_get_region_info_cleanup():
for i in fds:
os.close(i)
diff --git a/test/py/test_device_set_irqs.py b/test/py/test_device_set_irqs.py
index a958db3..6d47778 100644
--- a/test/py/test_device_set_irqs.py
+++ b/test/py/test_device_set_irqs.py
@@ -28,21 +28,20 @@
#
from libvfio_user import *
-import ctypes as c
import errno
import os
-import sys
ctx = None
sock = None
argsz = len(vfio_irq_set())
+
def test_device_set_irqs_setup():
global ctx, sock
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_pci_init(ctx)
assert ret == 0
@@ -59,17 +58,20 @@ def test_device_set_irqs_setup():
sock = connect_client(ctx)
+
def test_device_set_irqs_no_irq_set():
hdr = vfio_user_header(VFIO_USER_DEVICE_SET_IRQS, size=0)
sock.send(hdr)
vfu_run_ctx(ctx)
get_reply(sock, expect=errno.EINVAL)
+
def test_device_set_irqs_short_write():
payload = struct.pack("II", 0, 0)
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_bad_argsz():
payload = vfio_irq_set(argsz=3, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_REQ_IRQ,
@@ -77,6 +79,7 @@ def test_device_set_irqs_bad_argsz():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_bad_index():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_NUM_IRQS,
@@ -84,6 +87,7 @@ def test_device_set_irqs_bad_index():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_bad_flags_MASK_and_UNMASK():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK |
VFIO_IRQ_SET_ACTION_UNMASK, index=VFU_DEV_MSIX_IRQ,
@@ -91,6 +95,7 @@ def test_device_set_irqs_bad_flags_MASK_and_UNMASK():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_bad_flags_DATA_NONE_and_DATA_BOOL():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK |
VFIO_IRQ_SET_DATA_NONE | VFIO_IRQ_SET_DATA_BOOL,
@@ -98,6 +103,7 @@ def test_device_set_irqs_bad_flags_DATA_NONE_and_DATA_BOOL():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_bad_start_count_range():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK |
VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_MSIX_IRQ,
@@ -105,13 +111,15 @@ def test_device_set_irqs_bad_start_count_range():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
-def test_device_set_irqs_bad_start_count_range():
+
+def test_device_set_irqs_bad_start_count_range2():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK |
VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_MSIX_IRQ,
start=2049, count=1)
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_bad_action_for_err_irq():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK |
VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_ERR_IRQ,
@@ -119,6 +127,7 @@ def test_device_set_irqs_bad_action_for_err_irq():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_bad_action_for_req_irq():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK |
VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_REQ_IRQ,
@@ -126,6 +135,7 @@ def test_device_set_irqs_bad_action_for_req_irq():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_bad_start_for_count_0():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK |
VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_MSIX_IRQ,
@@ -133,6 +143,7 @@ def test_device_set_irqs_bad_start_for_count_0():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_bad_action_for_count_0():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_MASK |
VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_MSIX_IRQ,
@@ -140,6 +151,7 @@ def test_device_set_irqs_bad_action_for_count_0():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_bad_action_and_data_type_for_count_0():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_BOOL, index=VFU_DEV_MSIX_IRQ,
@@ -147,6 +159,7 @@ def test_device_set_irqs_bad_action_and_data_type_for_count_0():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_bad_fds_for_DATA_BOOL():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_BOOL, index=VFU_DEV_MSIX_IRQ,
@@ -161,6 +174,7 @@ def test_device_set_irqs_bad_fds_for_DATA_BOOL():
os.close(fd)
+
def test_device_set_irqs_bad_fds_for_DATA_NONE():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_MSIX_IRQ,
@@ -173,6 +187,7 @@ def test_device_set_irqs_bad_fds_for_DATA_NONE():
os.close(fd)
+
def test_device_set_irqs_bad_fds_for_count_2():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_EVENTFD, index=VFU_DEV_MSIX_IRQ,
@@ -185,6 +200,7 @@ def test_device_set_irqs_bad_fds_for_count_2():
os.close(fd)
+
def test_device_set_irqs_disable():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_NONE, index=VFU_DEV_REQ_IRQ,
@@ -198,6 +214,7 @@ def test_device_set_irqs_disable():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload)
+
def test_device_set_irqs_enable():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_EVENTFD, index=VFU_DEV_MSIX_IRQ,
@@ -207,6 +224,7 @@ def test_device_set_irqs_enable():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, fds=[fd])
+
def test_device_set_irqs_trigger_bool_too_small():
payload = vfio_irq_set(argsz=argsz + 1, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_BOOL, index=VFU_DEV_MSIX_IRQ,
@@ -215,6 +233,7 @@ def test_device_set_irqs_trigger_bool_too_small():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_trigger_bool_too_large():
payload = vfio_irq_set(argsz=argsz + 3, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_BOOL, index=VFU_DEV_MSIX_IRQ,
@@ -223,6 +242,7 @@ def test_device_set_irqs_trigger_bool_too_large():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, expect=errno.EINVAL)
+
def test_device_set_irqs_enable_update():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_EVENTFD, index=VFU_DEV_MSIX_IRQ,
@@ -232,6 +252,7 @@ def test_device_set_irqs_enable_update():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, fds=[fd])
+
def test_device_set_irqs_enable_trigger_none():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_EVENTFD, index=VFU_DEV_MSIX_IRQ,
@@ -251,6 +272,7 @@ def test_device_set_irqs_enable_trigger_none():
assert struct.unpack("Q", os.read(fd1, 8))[0] == 4
assert struct.unpack("Q", os.read(fd2, 8))[0] == 9
+
def test_device_set_irqs_enable_trigger_bool():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_EVENTFD, index=VFU_DEV_MSIX_IRQ,
@@ -271,5 +293,6 @@ def test_device_set_irqs_enable_trigger_bool():
assert struct.unpack("Q", os.read(fd1, 8))[0] == 4
assert struct.unpack("Q", os.read(fd2, 8))[0] == 9
+
def test_device_set_irqs_cleanup():
vfu_destroy_ctx(ctx)
diff --git a/test/py/test_dirty_pages.py b/test/py/test_dirty_pages.py
index 193e694..a5b85dc 100644
--- a/test/py/test_dirty_pages.py
+++ b/test/py/test_dirty_pages.py
@@ -34,20 +34,23 @@ import tempfile
ctx = None
+
@vfu_dma_register_cb_t
def dma_register(ctx, info):
pass
+
@vfu_dma_unregister_cb_t
def dma_unregister(ctx, info):
pass
return 0
+
def test_dirty_pages_setup():
global ctx, sock
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_pci_init(ctx)
assert ret == 0
@@ -58,7 +61,7 @@ def test_dirty_pages_setup():
f = tempfile.TemporaryFile()
f.truncate(0x2000)
- mmap_areas = [ (0x1000, 0x1000) ]
+ mmap_areas = [(0x1000, 0x1000)]
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_MIGR_REGION_IDX, size=0x2000,
flags=VFU_REGION_FLAG_RW, mmap_areas=mmap_areas,
@@ -79,23 +82,27 @@ def test_dirty_pages_setup():
msg(ctx, sock, VFIO_USER_DMA_MAP, payload, fds=[f.fileno()])
+
def test_dirty_pages_short_write():
payload = struct.pack("I", 8)
msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.EINVAL)
+
def test_dirty_pages_bad_argsz():
payload = vfio_user_dirty_pages(argsz=4,
flags=VFIO_IOMMU_DIRTY_PAGES_FLAG_START)
msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.EINVAL)
+
def test_dirty_pages_start_no_migration():
payload = vfio_user_dirty_pages(argsz=len(vfio_user_dirty_pages()),
flags=VFIO_IOMMU_DIRTY_PAGES_FLAG_START)
msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.ENOTSUP)
+
def test_dirty_pages_start_bad_flags():
#
# This is a little cheeky, after vfu_realize_ctx(), but it works at the
@@ -122,17 +129,20 @@ def start_logging():
msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload)
+
def test_dirty_pages_start():
start_logging()
# should be idempotent
start_logging()
+
def test_dirty_pages_get_short_read():
payload = vfio_user_dirty_pages(argsz=len(vfio_user_dirty_pages()),
flags=VFIO_IOMMU_DIRTY_PAGES_FLAG_GET_BITMAP)
msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.EINVAL)
+
#
# This should in fact work; update when it does.
#
@@ -147,6 +157,7 @@ def test_dirty_pages_get_sub_range():
msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.ENOTSUP)
+
def test_dirty_pages_get_bad_page_size():
argsz = len(vfio_user_dirty_pages()) + len(vfio_user_bitmap_range()) + 8
dirty_pages = vfio_user_dirty_pages(argsz=argsz,
@@ -158,6 +169,7 @@ def test_dirty_pages_get_bad_page_size():
msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.EINVAL)
+
def test_dirty_pages_get_bad_bitmap_size():
argsz = len(vfio_user_dirty_pages()) + len(vfio_user_bitmap_range()) + 8
dirty_pages = vfio_user_dirty_pages(argsz=argsz,
@@ -169,6 +181,7 @@ def test_dirty_pages_get_bad_bitmap_size():
msg(ctx, sock, VFIO_USER_DIRTY_PAGES, payload, expect=errno.EINVAL)
+
def test_dirty_pages_get_short_reply():
dirty_pages = vfio_user_dirty_pages(argsz=len(vfio_user_dirty_pages()),
flags=VFIO_IOMMU_DIRTY_PAGES_FLAG_GET_BITMAP)
@@ -188,6 +201,7 @@ def test_dirty_pages_get_short_reply():
assert dirty_pages.argsz == argsz
assert dirty_pages.flags == VFIO_IOMMU_DIRTY_PAGES_FLAG_GET_BITMAP
+
def test_dirty_pages_get_unmodified():
argsz = len(vfio_user_dirty_pages()) + len(vfio_user_bitmap_range()) + 8
@@ -232,9 +246,11 @@ def get_dirty_page_bitmap():
br, result = vfio_user_bitmap_range.pop_from_buffer(result)
return struct.unpack("Q", result)[0]
+
sg3 = None
iovec3 = None
+
def test_dirty_pages_get_modified():
ret, sg1 = vfu_addr_to_sg(ctx, dma_addr=0x10000, length=0x1000)
assert ret == 1
@@ -261,7 +277,7 @@ def test_dirty_pages_get_modified():
# unmap segment, dirty bitmap should be the same
vfu_unmap_sg(ctx, sg1, iovec1)
- bitmap = get_dirty_page_bitmap()
+ bitmap = get_dirty_page_bitmap()
assert bitmap == 0b11110001
# check again, previously unmapped segment should be clean
@@ -301,6 +317,7 @@ def test_dirty_pages_stop():
# destroying the context.
stop_logging()
+
def test_dirty_pages_cleanup():
disconnect_client(ctx, sock)
vfu_destroy_ctx(ctx)
diff --git a/test/py/test_dma_map.py b/test/py/test_dma_map.py
index e4ac2ba..f446efd 100644
--- a/test/py/test_dma_map.py
+++ b/test/py/test_dma_map.py
@@ -36,11 +36,12 @@ import errno
ctx = None
+
def test_dma_region_too_big():
global ctx
ctx = prepare_ctx_for_dma()
- assert ctx != None
+ assert ctx is not None
sock = connect_client(ctx)
@@ -53,6 +54,7 @@ def test_dma_region_too_big():
disconnect_client(ctx, sock)
+
def test_dma_region_too_many():
sock = connect_client(ctx)
@@ -63,13 +65,14 @@ def test_dma_region_too_many():
offset=0, addr=0x1000 * i, size=4096)
if i == MAX_DMA_REGIONS + 1:
- expect=errno.EINVAL
+ expect = errno.EINVAL
else:
- expect=0
+ expect = 0
msg(ctx, sock, VFIO_USER_DMA_MAP, payload, expect=expect)
disconnect_client(ctx, sock)
+
def test_dma_region_cleanup():
vfu_destroy_ctx(ctx)
diff --git a/test/py/test_dma_unmap.py b/test/py/test_dma_unmap.py
index f070ede..1c7cea0 100644
--- a/test/py/test_dma_unmap.py
+++ b/test/py/test_dma_unmap.py
@@ -28,19 +28,18 @@
# DAMAGE.
#
-import ctypes
import errno
from libvfio_user import *
-import tempfile
ctx = None
sock = None
+
def test_dma_unmap_setup():
global ctx, sock
ctx = prepare_ctx_for_dma()
- assert ctx != None
+ assert ctx is not None
payload = struct.pack("II", 0, 0)
sock = connect_client(ctx)
@@ -52,15 +51,18 @@ def test_dma_unmap_setup():
msg(ctx, sock, VFIO_USER_DMA_MAP, payload)
+
def test_dma_unmap_short_write():
payload = struct.pack("II", 0, 0)
msg(ctx, sock, VFIO_USER_DMA_UNMAP, payload, expect=errno.EINVAL)
+
def test_dma_unmap_bad_argsz():
- payload = vfio_user_dma_unmap(argsz=8, flags=0x2323, addr=0x1000, size=4096)
+ vfio_user_dma_unmap(argsz=8, flags=0x2323, addr=0x1000, size=4096)
+
def test_dma_unmap_invalid_flags():
@@ -68,12 +70,14 @@ def test_dma_unmap_invalid_flags():
flags=0x4, addr=0x1000, size=4096)
msg(ctx, sock, VFIO_USER_DMA_UNMAP, payload, expect=errno.ENOTSUP)
+
def test_dma_unmap():
payload = vfio_user_dma_unmap(argsz=len(vfio_user_dma_unmap()),
flags=0, addr=0x1000, size=4096)
msg(ctx, sock, VFIO_USER_DMA_UNMAP, payload)
+
def test_dma_unmap_all():
for i in range(0, MAX_DMA_REGIONS):
@@ -89,6 +93,7 @@ def test_dma_unmap_all():
msg(ctx, sock, VFIO_USER_DMA_UNMAP, payload)
+
def test_dma_unmap_all_invalid_addr():
payload = vfio_user_dma_unmap(argsz=len(vfio_user_dma_unmap()),
@@ -96,6 +101,7 @@ def test_dma_unmap_all_invalid_addr():
msg(ctx, sock, VFIO_USER_DMA_UNMAP, payload, expect=errno.EINVAL)
+
def test_dma_unmap_all_invalid_flags():
payload = vfio_user_dma_unmap(argsz=len(vfio_user_dma_unmap()),
@@ -104,6 +110,7 @@ def test_dma_unmap_all_invalid_flags():
msg(ctx, sock, VFIO_USER_DMA_UNMAP, payload, expect=errno.EINVAL)
+
def test_dma_unmap_cleanup():
disconnect_client(ctx, sock)
vfu_destroy_ctx(ctx)
diff --git a/test/py/test_irq_trigger.py b/test/py/test_irq_trigger.py
index 02bc90a..39c75af 100644
--- a/test/py/test_irq_trigger.py
+++ b/test/py/test_irq_trigger.py
@@ -34,11 +34,12 @@ import errno
ctx = None
sock = None
+
def test_irq_trigger_setup():
global ctx, sock
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_pci_init(ctx)
assert ret == 0
@@ -51,6 +52,7 @@ def test_irq_trigger_setup():
sock = connect_client(ctx)
+
def test_irq_trigger_bad_subindex():
ret = vfu_irq_trigger(ctx, 2048)
assert ret == -1
@@ -60,11 +62,13 @@ def test_irq_trigger_bad_subindex():
assert ret == -1
assert c.get_errno() == errno.EINVAL
+
def test_irq_trigger_no_interrupt():
ret = vfu_irq_trigger(ctx, 0)
assert ret == -1
assert c.get_errno() == errno.ENOENT
+
def test_irq_trigger():
# struct vfio_irq_set
payload = struct.pack("IIIII", 20, VFIO_IRQ_SET_ACTION_TRIGGER |
@@ -74,9 +78,10 @@ def test_irq_trigger():
msg(ctx, sock, VFIO_USER_DEVICE_SET_IRQS, payload, fds=[fd])
- ret = vfu_irq_trigger(ctx, 8)
+ vfu_irq_trigger(ctx, 8)
assert struct.unpack("Q", os.read(fd, 8))[0] == 5
+
def test_irq_trigger_cleanup():
vfu_destroy_ctx(ctx)
diff --git a/test/py/test_map_unmap_sg.py b/test/py/test_map_unmap_sg.py
index 00d9b28..fa98159 100644
--- a/test/py/test_map_unmap_sg.py
+++ b/test/py/test_map_unmap_sg.py
@@ -34,11 +34,12 @@ import tempfile
ctx = None
+
def test_map_sg_with_invalid_region():
global ctx
ctx = prepare_ctx_for_dma()
- assert ctx != None
+ assert ctx is not None
sg = dma_sg_t()
iovec = iovec_t()
@@ -46,6 +47,7 @@ def test_map_sg_with_invalid_region():
assert ret == -1
assert ctypes.get_errno() == errno.EINVAL
+
def test_map_sg_without_fd():
sock = connect_client(ctx)
@@ -63,6 +65,7 @@ def test_map_sg_without_fd():
disconnect_client(ctx, sock)
+
def test_map_multiple_sge():
sock = connect_client(ctx)
regions = 4
@@ -88,6 +91,7 @@ def test_map_multiple_sge():
disconnect_client(ctx, sock)
+
def test_unmap_sg():
sock = connect_client(ctx)
regions = 4
@@ -111,6 +115,7 @@ def test_unmap_sg():
disconnect_client(ctx, sock)
+
def test_map_unmap_sg_cleanup():
vfu_destroy_ctx(ctx)
diff --git a/test/py/test_migration.py b/test/py/test_migration.py
index 04c35cc..aa271f9 100644
--- a/test/py/test_migration.py
+++ b/test/py/test_migration.py
@@ -36,6 +36,7 @@ ctx = None
global trans_cb_err
trans_cb_err = 0
+
@transition_cb_t
def trans_cb(ctx, state):
global trans_cb_err
@@ -49,7 +50,7 @@ def test_migration_setup():
global ctx, sock
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_MIGR_REGION_IDX, size=0x2000,
flags=VFU_REGION_FLAG_RW)
diff --git a/test/py/test_negotiate.py b/test/py/test_negotiate.py
index 524f59f..0541d0e 100644
--- a/test/py/test_negotiate.py
+++ b/test/py/test_negotiate.py
@@ -29,13 +29,11 @@
from libvfio_user import *
import errno
-import json
-import os
-import socket
import struct
ctx = None
+
def client_version_json(expect=0, json=''):
sock = connect_sock()
@@ -50,15 +48,17 @@ def client_version_json(expect=0, json=''):
return payload
+
def test_server_setup():
global ctx
ctx = vfu_create_ctx()
- assert ctx != None
+ assert ctx is not None
ret = vfu_realize_ctx(ctx)
assert ret == 0
+
def test_short_write():
sock = connect_sock()
hdr = vfio_user_header(VFIO_USER_VERSION, size=0)
@@ -67,6 +67,7 @@ def test_short_write():
vfu_attach_ctx(ctx, expect=errno.EINVAL)
get_reply(sock, expect=errno.EINVAL)
+
def test_long_write():
sock = connect_sock()
hdr = vfio_user_header(VFIO_USER_VERSION, size=SERVER_MAX_MSG_SIZE + 1)
@@ -76,6 +77,7 @@ def test_long_write():
assert ret == -1
assert c.get_errno() == errno.EINVAL
+
def test_bad_command():
sock = connect_sock()
@@ -86,6 +88,7 @@ def test_bad_command():
vfu_attach_ctx(ctx, expect=errno.EINVAL)
get_reply(sock, expect=errno.EINVAL)
+
def test_invalid_major():
sock = connect_sock()
@@ -96,6 +99,7 @@ def test_invalid_major():
vfu_attach_ctx(ctx, expect=errno.EINVAL)
get_reply(sock, expect=errno.EINVAL)
+
def test_invalid_json_missing_NUL():
sock = connect_sock()
@@ -107,45 +111,55 @@ def test_invalid_json_missing_NUL():
vfu_attach_ctx(ctx, expect=errno.EINVAL)
get_reply(sock, expect=errno.EINVAL)
+
def test_invalid_json_missing_closing_brace():
client_version_json(errno.EINVAL, b"{")
+
def test_invalid_json_missing_closing_quote():
client_version_json(errno.EINVAL, b'"')
+
def test_invalid_json_bad_capabilities_object():
client_version_json(errno.EINVAL, b'{ "capabilities": "23" }')
+
def test_invalid_json_bad_max_fds():
client_version_json(errno.EINVAL,
b'{ "capabilities": { "max_msg_fds": "foo" } }')
-def test_invalid_json_bad_max_fds():
+
+def test_invalid_json_bad_max_fds2():
client_version_json(errno.EINVAL,
b'{ "capabilities": { "max_msg_fds": -1 } }')
-def test_invalid_json_bad_max_fds2():
+
+def test_invalid_json_bad_max_fds3():
client_version_json(errno.EINVAL,
b'{ "capabilities": { "max_msg_fds": %d } }' %
(VFIO_USER_CLIENT_MAX_FDS_LIMIT + 1))
+
def test_invalid_json_bad_migration_object():
client_version_json(errno.EINVAL,
b'{ "capabilities": { "migration": "23" } }')
+
def test_invalid_json_bad_pgsize():
client_version_json(errno.EINVAL, b'{ "capabilities": ' +
b'{ "migration": { "pgsize": "foo" } } }')
+
#
# FIXME: need vfu_setup_device_migration_callbacks() to be able to test this
# failure mode.
#
-def test_invalid_json_bad_pgsize():
+def test_invalid_json_bad_pgsize2():
if False:
client_version_json(errno.EINVAL,
b'{ "capabilities": { "migration": { "pgsize": 4095 } } }')
+
def test_valid_negotiate_no_json():
sock = connect_sock()
@@ -167,12 +181,14 @@ def test_valid_negotiate_no_json():
disconnect_client(ctx, sock)
+
def test_valid_negotiate_empty_json():
client_version_json(json=b'{}')
# notice client closed connection
vfu_run_ctx(ctx)
+
def test_valid_negotiate_json():
client_version_json(json=bytes(
'{ "capabilities": { "max_msg_fds": %s, "max_data_xfer_size": %u } }' %
@@ -182,5 +198,6 @@ def test_valid_negotiate_json():
# notice client closed connection
vfu_run_ctx(ctx)
+
def test_destroying():
vfu_destroy_ctx(ctx)
diff --git a/test/py/test_pci_caps.py b/test/py/test_pci_caps.py
index 5af54e3..5267246 100644
--- a/test/py/test_pci_caps.py
+++ b/test/py/test_pci_caps.py
@@ -33,11 +33,12 @@ import errno
ctx = None
+
def test_pci_cap_setup():
global ctx
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_pci_init(ctx, pci_type=VFU_PCI_TYPE_CONVENTIONAL)
assert ret == 0
@@ -46,30 +47,35 @@ def test_pci_cap_setup():
size=PCI_CFG_SPACE_SIZE, flags=VFU_REGION_FLAG_RW)
assert ret == 0
+
def test_pci_cap_bad_flags():
pos = vfu_pci_add_capability(ctx, pos=0, flags=999,
data=struct.pack("ccHH", to_byte(PCI_CAP_ID_PM), b'\0', 0, 0))
assert pos == -1
assert c.get_errno() == errno.EINVAL
+
def test_pci_cap_no_cb():
pos = vfu_pci_add_capability(ctx, pos=0, flags=VFU_CAP_FLAG_CALLBACK,
data=struct.pack("ccHH", to_byte(PCI_CAP_ID_PM), b'\0', 0, 0))
assert pos == -1
assert c.get_errno() == errno.EINVAL
+
def test_pci_cap_unknown_cap():
pos = vfu_pci_add_capability(ctx, pos=0, flags=0,
data=struct.pack("ccHH", b'\x81', b'\0', 0, 0))
assert pos == -1
assert c.get_errno() == errno.ENOTSUP
+
def test_pci_cap_bad_pos():
pos = vfu_pci_add_capability(ctx, pos=PCI_CFG_SPACE_SIZE, flags=0,
data=struct.pack("ccHH", to_byte(PCI_CAP_ID_PM), b'\0', 0, 0))
assert pos == -1
assert c.get_errno() == errno.EINVAL
+
@vfu_region_access_cb_t
def pci_region_cb(ctx, buf, count, offset, is_write):
if not is_write:
@@ -77,13 +83,14 @@ def pci_region_cb(ctx, buf, count, offset, is_write):
return write_pci_cfg_space(ctx, buf, count, offset)
+
def test_pci_cap_setup_cb():
global ctx
vfu_destroy_ctx(ctx)
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_pci_init(ctx, pci_type=VFU_PCI_TYPE_CONVENTIONAL)
assert ret == 0
@@ -93,6 +100,7 @@ def test_pci_cap_setup_cb():
flags=VFU_REGION_FLAG_RW)
assert ret == 0
+
cap_offsets = (
PCI_STD_HEADER_SIZEOF,
PCI_STD_HEADER_SIZEOF + PCI_PM_SIZEOF,
@@ -103,6 +111,7 @@ cap_offsets = (
0xa0
)
+
def test_add_caps():
pos = vfu_pci_add_capability(ctx, pos=0, flags=0,
data=struct.pack("ccHH", to_byte(PCI_CAP_ID_PM), b'\0', 0, 0))
@@ -133,6 +142,7 @@ def test_add_caps():
ret = vfu_realize_ctx(ctx)
assert ret == 0
+
def test_find_caps():
offset = vfu_pci_find_capability(ctx, False, PCI_CAP_ID_PM)
assert offset == cap_offsets[0]
@@ -191,39 +201,42 @@ def test_find_caps():
assert offset == 0
assert c.get_errno() == errno.ENOENT
+
def test_pci_cap_write_hdr():
sock = connect_client(ctx)
# offset of struct cap_hdr
- offset=cap_offsets[0]
- data=b'\x01'
+ offset = cap_offsets[0]
+ data = b'\x01'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data, expect=errno.EPERM)
disconnect_client(ctx, sock)
+
def test_pci_cap_readonly():
sock = connect_client(ctx)
# start of vendor payload
- offset=cap_offsets[1] + 2
- data=b'\x01'
+ offset = cap_offsets[1] + 2
+ data = b'\x01'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data, expect=errno.EPERM)
# offsetof(struct vsc, data)
- offset=cap_offsets[1] + 3
+ offset = cap_offsets[1] + 3
payload = read_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=3)
assert payload == b'abc'
disconnect_client(ctx, sock)
+
def test_pci_cap_callback():
sock = connect_client(ctx)
# offsetof(struct vsc, data)
- offset=cap_offsets[2] + 3
+ offset = cap_offsets[2] + 3
data = b"Hello world."
payload = read_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
@@ -240,60 +253,64 @@ def test_pci_cap_callback():
disconnect_client(ctx, sock)
+
def test_pci_cap_write_pmcs():
sock = connect_client(ctx)
# struct pc
- offset=cap_offsets[0] + 3
- data=b'\x01\x02'
+ offset = cap_offsets[0] + 3
+ data = b'\x01\x02'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data, expect=errno.EINVAL)
- offset=cap_offsets[0] + 2
- data=b'\x01'
+ offset = cap_offsets[0] + 2
+ data = b'\x01'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data, expect=errno.EINVAL)
- offset=cap_offsets[0] + 2
- data=b'\x01\x02'
+ offset = cap_offsets[0] + 2
+ data = b'\x01\x02'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data, expect=errno.ENOTSUP)
# struct pmcs
- offset=cap_offsets[0] + 5
- data=b'\x01\x02'
+ offset = cap_offsets[0] + 5
+ data = b'\x01\x02'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data, expect=errno.EINVAL)
- offset=cap_offsets[0] + 4
- data=b'\x01'
+ offset = cap_offsets[0] + 4
+ data = b'\x01'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data, expect=errno.EINVAL)
offset = cap_offsets[0] + 4
- data=b'\x01\x02'
+ data = b'\x01\x02'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data)
assert get_pci_cfg_space(ctx)[offset:offset+2] == data
# pmcsr_se
- offset=cap_offsets[0] + 6
- data=b'\x01'
+ offset = cap_offsets[0] + 6
+ data = b'\x01'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data, expect=errno.ENOTSUP)
# data
- offset=cap_offsets[0] + 7
- data=b'\x01'
+ offset = cap_offsets[0] + 7
+ data = b'\x01'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data, expect=errno.ENOTSUP)
disconnect_client(ctx, sock)
+
reset_flag = -1
+
+
@c.CFUNCTYPE(c.c_int, c.c_void_p, c.c_int)
def vfu_reset_cb(ctx, reset_type):
assert reset_type == VFU_RESET_PCI_FLR or reset_type == VFU_RESET_LOST_CONN
@@ -301,18 +318,19 @@ def vfu_reset_cb(ctx, reset_type):
reset_flag = reset_type
return 0
+
def test_pci_cap_write_px():
sock = connect_client(ctx)
ret = vfu_setup_device_reset_cb(ctx, vfu_reset_cb)
assert ret == 0
- #flrc
+ # flrc
cap = struct.pack("ccHHcc52c", to_byte(PCI_CAP_ID_EXP), b'\0', 0, 0, b'\0',
b'\x10', *[b'\0' for _ in range(52)])
pos = vfu_pci_add_capability(ctx, pos=cap_offsets[5], flags=0, data=cap)
assert pos == cap_offsets[5]
- #iflr
+ # iflr
offset = cap_offsets[5] + 8
data = b'\x00\x80'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
@@ -322,6 +340,7 @@ def test_pci_cap_write_px():
disconnect_client(ctx, sock)
assert reset_flag == VFU_RESET_LOST_CONN
+
def test_pci_cap_write_msix():
# FIXME
pass
@@ -329,7 +348,8 @@ def test_pci_cap_write_msix():
def test_pci_cap_write_pxdc2():
sock = connect_client(ctx)
- offset = vfu_pci_find_capability(ctx, False, PCI_CAP_ID_EXP) + PCI_EXP_DEVCTL2
+ offset = (vfu_pci_find_capability(ctx, False, PCI_CAP_ID_EXP) +
+ PCI_EXP_DEVCTL2)
data = b'\xde\xad'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data)
@@ -341,7 +361,8 @@ def test_pci_cap_write_pxdc2():
def test_pci_cap_write_pxlc2():
sock = connect_client(ctx)
- offset = vfu_pci_find_capability(ctx, False, PCI_CAP_ID_EXP) + PCI_EXP_LNKCTL2
+ offset = (vfu_pci_find_capability(ctx, False, PCI_CAP_ID_EXP) +
+ PCI_EXP_LNKCTL2)
data = b'\xbe\xef'
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data)
diff --git a/test/py/test_pci_ext_caps.py b/test/py/test_pci_ext_caps.py
index ab10e12..94eda0e 100644
--- a/test/py/test_pci_ext_caps.py
+++ b/test/py/test_pci_ext_caps.py
@@ -33,9 +33,10 @@ import errno
ctx = None
+
def test_pci_ext_cap_conventional():
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_pci_init(ctx, pci_type=VFU_PCI_TYPE_CONVENTIONAL)
assert ret == 0
@@ -55,11 +56,12 @@ def test_pci_ext_cap_conventional():
vfu_destroy_ctx(ctx)
+
def test_pci_ext_cap_setup():
global ctx
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_pci_init(ctx)
assert ret == 0
@@ -69,6 +71,7 @@ def test_pci_ext_cap_setup():
flags=VFU_REGION_FLAG_RW)
assert ret == 0
+
def test_pci_ext_cap_unknown_cap():
cap = struct.pack("HHII", PCI_EXT_CAP_ID_DSN + 99, 0, 0, 0)
@@ -77,6 +80,7 @@ def test_pci_ext_cap_unknown_cap():
assert pos == -1
assert c.get_errno() == errno.ENOTSUP
+
def test_pci_ext_cap_bad_pos():
cap = struct.pack("HHII", PCI_EXT_CAP_ID_DSN, 0, 0, 0)
@@ -91,6 +95,7 @@ def test_pci_ext_cap_bad_pos():
assert pos == -1
assert c.get_errno() == errno.EINVAL
+
@vfu_region_access_cb_t
def pci_region_cb(ctx, buf, count, offset, is_write):
if not is_write:
@@ -98,6 +103,7 @@ def pci_region_cb(ctx, buf, count, offset, is_write):
return write_pci_cfg_space(ctx, buf, count, offset, extended=True)
+
cap_offsets = (
PCI_CFG_SPACE_SIZE,
PCI_CFG_SPACE_SIZE + PCI_EXT_CAP_DSN_SIZEOF,
@@ -107,6 +113,7 @@ cap_offsets = (
600
)
+
def test_add_ext_caps():
cap = struct.pack("HHII", PCI_EXT_CAP_ID_DSN, 0, 4, 8)
@@ -150,6 +157,7 @@ def test_add_ext_caps():
ret = vfu_realize_ctx(ctx)
assert ret == 0
+
def test_find_ext_caps():
offset = vfu_pci_find_capability(ctx, True, PCI_EXT_CAP_ID_DSN)
assert offset == cap_offsets[0]
@@ -160,7 +168,8 @@ def test_find_ext_caps():
assert cap_id == PCI_EXT_CAP_ID_DSN
assert cap_next == cap_offsets[1]
- offset = vfu_pci_find_next_capability(ctx, True, offset, PCI_EXT_CAP_ID_DSN)
+ offset = vfu_pci_find_next_capability(ctx, True, offset,
+ PCI_EXT_CAP_ID_DSN)
assert offset == 0
offset = vfu_pci_find_capability(ctx, True, PCI_EXT_CAP_ID_VNDR)
@@ -169,7 +178,8 @@ def test_find_ext_caps():
assert cap_id == PCI_EXT_CAP_ID_VNDR
assert cap_next == cap_offsets[2]
- offset = vfu_pci_find_next_capability(ctx, True, offset, PCI_EXT_CAP_ID_DSN)
+ offset = vfu_pci_find_next_capability(ctx, True, offset,
+ PCI_EXT_CAP_ID_DSN)
assert offset == 0
offset = vfu_pci_find_next_capability(ctx, True, 0, PCI_EXT_CAP_ID_VNDR)
@@ -212,6 +222,7 @@ def test_find_ext_caps():
assert offset == 0
assert c.get_errno() == errno.ENOENT
+
def test_pci_ext_cap_write_hdr():
sock = connect_client(ctx)
@@ -228,6 +239,7 @@ def test_pci_ext_cap_write_hdr():
disconnect_client(ctx, sock)
+
def test_pci_ext_cap_readonly():
sock = connect_client(ctx)
@@ -244,6 +256,7 @@ def test_pci_ext_cap_readonly():
disconnect_client(ctx, sock)
+
def test_pci_ext_cap_callback():
sock = connect_client(ctx)
@@ -265,10 +278,11 @@ def test_pci_ext_cap_callback():
disconnect_client(ctx, sock)
+
def test_pci_ext_cap_write_dsn():
sock = connect_client(ctx)
- data = struct.pack("II", 1, 2);
+ data = struct.pack("II", 1, 2)
offset = cap_offsets[0] + 4
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
count=len(data), data=data, expect=errno.EPERM)
@@ -281,10 +295,11 @@ def test_pci_ext_cap_write_dsn():
disconnect_client(ctx, sock)
+
def test_pci_ext_cap_write_vendor():
sock = connect_client(ctx)
- data = struct.pack("II", 0x1, 0x2);
+ data = struct.pack("II", 0x1, 0x2)
# start of vendor payload
offset = cap_offsets[2] + 8
write_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=offset,
@@ -297,5 +312,6 @@ def test_pci_ext_cap_write_vendor():
disconnect_client(ctx, sock)
+
def test_pci_ext_cap_cleanup():
vfu_destroy_ctx(ctx)
diff --git a/test/py/test_request_errors.py b/test/py/test_request_errors.py
index fe6f29e..a734bab 100644
--- a/test/py/test_request_errors.py
+++ b/test/py/test_request_errors.py
@@ -28,21 +28,20 @@
#
from libvfio_user import *
-import ctypes as c
import errno
import os
-import sys
ctx = None
sock = None
argsz = len(vfio_irq_set())
+
def test_request_errors_setup():
global ctx, sock
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
ret = vfu_pci_init(ctx)
assert ret == 0
@@ -55,6 +54,7 @@ def test_request_errors_setup():
sock = connect_client(ctx)
+
def test_too_small():
# struct vfio_user_header
hdr = struct.pack("HHIII", 0xbad1, VFIO_USER_DEVICE_SET_IRQS,
@@ -64,6 +64,7 @@ def test_too_small():
vfu_run_ctx(ctx)
get_reply(sock, expect=errno.EINVAL)
+
def test_too_large():
# struct vfio_user_header
hdr = struct.pack("HHIII", 0xbad1, VFIO_USER_DEVICE_SET_IRQS,
@@ -73,6 +74,7 @@ def test_too_large():
vfu_run_ctx(ctx)
get_reply(sock, expect=errno.EINVAL)
+
def test_unsolicited_reply():
# struct vfio_user_header
hdr = struct.pack("HHIII", 0xbad2, VFIO_USER_DEVICE_SET_IRQS,
@@ -82,6 +84,7 @@ def test_unsolicited_reply():
vfu_run_ctx(ctx)
get_reply(sock, expect=errno.EINVAL)
+
def test_bad_command():
hdr = vfio_user_header(VFIO_USER_MAX, size=1)
@@ -89,12 +92,14 @@ def test_bad_command():
vfu_run_ctx(ctx)
get_reply(sock, expect=errno.EINVAL)
+
def test_no_payload():
hdr = vfio_user_header(VFIO_USER_DEVICE_SET_IRQS, size=0)
sock.send(hdr)
vfu_run_ctx(ctx)
get_reply(sock, expect=errno.EINVAL)
+
def test_bad_request_closes_fds():
payload = vfio_irq_set(argsz=argsz, flags=VFIO_IRQ_SET_ACTION_TRIGGER |
VFIO_IRQ_SET_DATA_BOOL, index=VFU_DEV_MSIX_IRQ,
@@ -120,5 +125,6 @@ def test_bad_request_closes_fds():
os.close(fd1)
os.close(fd2)
+
def test_request_errors_cleanup():
vfu_destroy_ctx(ctx)
diff --git a/test/py/test_setup_region.py b/test/py/test_setup_region.py
index 0e1ff7f..ac6dc03 100644
--- a/test/py/test_setup_region.py
+++ b/test/py/test_setup_region.py
@@ -34,50 +34,56 @@ import tempfile
ctx = None
+
def test_device_set_irqs_setup():
global ctx
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
+
def test_setup_region_bad_flags():
- ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x10000,
- flags=0x400)
+ ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX,
+ size=0x10000, flags=0x400)
assert ret == -1
assert c.get_errno() == errno.EINVAL
- ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x10000,
- flags=0)
+ ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX,
+ size=0x10000, flags=0)
assert ret == -1
assert c.get_errno() == errno.EINVAL
+
def test_setup_region_bad_mmap_areas():
f = tempfile.TemporaryFile()
f.truncate(65536)
- mmap_areas = [ (0x2000, 0x1000), (0x4000, 0x2000) ]
+ mmap_areas = [(0x2000, 0x1000), (0x4000, 0x2000)]
- ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x10000,
+ ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX,
+ size=0x10000,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM),
mmap_areas=mmap_areas, nr_mmap_areas=0,
fd=f.fileno())
assert ret == -1
assert c.get_errno() == errno.EINVAL
- ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x10000,
+ ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX,
+ size=0x10000,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM),
mmap_areas=None, nr_mmap_areas=1, fd=f.fileno())
assert ret == -1
assert c.get_errno() == errno.EINVAL
- ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x10000,
+ ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX,
+ size=0x10000,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM),
mmap_areas=mmap_areas, fd=-1)
assert ret == -1
assert c.get_errno() == errno.EINVAL
- mmap_areas = [ (0x2000, 0x1000), (0x4000, 0x2000) ]
+ mmap_areas = [(0x2000, 0x1000), (0x4000, 0x2000)]
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x5000,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM),
@@ -85,6 +91,7 @@ def test_setup_region_bad_mmap_areas():
assert ret == -1
assert c.get_errno() == errno.EINVAL
+
def test_setup_region_bad_index():
ret = vfu_setup_region(ctx, index=-2, size=0x10000,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM))
@@ -96,12 +103,14 @@ def test_setup_region_bad_index():
assert ret == -1
assert c.get_errno() == errno.EINVAL
+
def test_setup_region_bad_pci():
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_CFG_REGION_IDX, size=0x1000,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM))
assert ret == -1
assert c.get_errno() == errno.EINVAL
+
def test_setup_region_bad_migr():
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_MIGR_REGION_IDX, size=512,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM))
@@ -117,7 +126,7 @@ def test_setup_region_bad_migr():
assert ret == -1
assert c.get_errno() == errno.EINVAL
- mmap_areas = [ (0x0, 0x1000), (0x1000, 0x1000) ]
+ mmap_areas = [(0x0, 0x1000), (0x1000, 0x1000)]
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_MIGR_REGION_IDX, size=0x2000,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM),
@@ -125,6 +134,7 @@ def test_setup_region_bad_migr():
assert ret == -1
assert c.get_errno() == errno.EINVAL
+
def test_setup_region_cfg_always_cb_nocb():
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_CFG_REGION_IDX,
size=PCI_CFG_SPACE_EXP_SIZE, cb=None,
@@ -133,6 +143,7 @@ def test_setup_region_cfg_always_cb_nocb():
assert ret == -1
assert c.get_errno() == errno.EINVAL
+
@vfu_region_access_cb_t
def pci_cfg_region_cb(ctx, buf, count, offset, is_write):
if not is_write:
@@ -141,6 +152,7 @@ def pci_cfg_region_cb(ctx, buf, count, offset, is_write):
return count
+
def test_setup_region_cfg_always_cb():
global ctx
@@ -155,10 +167,12 @@ def test_setup_region_cfg_always_cb():
sock = connect_client(ctx)
- payload = read_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX, offset=0, count=2)
+ payload = read_region(ctx, sock, VFU_PCI_DEV_CFG_REGION_IDX,
+ offset=0, count=2)
assert payload == b'\xcc\xcc'
disconnect_client(ctx, sock)
+
def test_setup_region_cleanup():
vfu_destroy_ctx(ctx)
diff --git a/test/py/test_vfu_create_ctx.py b/test/py/test_vfu_create_ctx.py
index 439ff69..cb1448f 100644
--- a/test/py/test_vfu_create_ctx.py
+++ b/test/py/test_vfu_create_ctx.py
@@ -31,24 +31,28 @@ from libvfio_user import *
import ctypes as c
import errno
+
def test_vfu_create_ctx_bad_trans():
ctx = vfu_create_ctx(trans=VFU_TRANS_SOCK + 1)
- assert ctx == None
+ assert ctx is None
assert c.get_errno() == errno.ENOTSUP
+
def test_vfu_create_ctx_bad_flags():
ctx = vfu_create_ctx(flags=999)
- assert ctx == None
+ assert ctx is None
assert c.get_errno() == errno.EINVAL
+
def test_vfu_create_ctx_bad_dev_type():
ctx = vfu_create_ctx(dev_type=VFU_DEV_TYPE_PCI + 4)
- assert ctx == None
+ assert ctx is None
assert c.get_errno() == errno.ENOTSUP
+
def test_vfu_create_ctx_default():
ctx = vfu_create_ctx()
- assert ctx != None
+ assert ctx is not None
ret = vfu_realize_ctx(ctx)
assert ret == 0
diff --git a/test/py/test_vfu_realize_ctx.py b/test/py/test_vfu_realize_ctx.py
index 9b863cb..de949aa 100644
--- a/test/py/test_vfu_realize_ctx.py
+++ b/test/py/test_vfu_realize_ctx.py
@@ -31,9 +31,10 @@ import ctypes as c
import errno
from libvfio_user import *
+
def test_vfu_realize_ctx_twice():
ctx = vfu_create_ctx()
- assert ctx != None
+ assert ctx is not None
ret = vfu_realize_ctx(ctx)
assert ret == 0
@@ -43,9 +44,10 @@ def test_vfu_realize_ctx_twice():
vfu_destroy_ctx(ctx)
+
def test_vfu_unrealized_ctx():
ctx = vfu_create_ctx()
- assert ctx != None
+ assert ctx is not None
ret = vfu_run_ctx(ctx)
assert ret == -1
@@ -53,18 +55,20 @@ def test_vfu_unrealized_ctx():
vfu_destroy_ctx(ctx)
+
def test_vfu_realize_ctx_default():
ctx = vfu_create_ctx()
- assert ctx != None
+ assert ctx is not None
ret = vfu_realize_ctx(ctx)
assert ret == 0
vfu_destroy_ctx(ctx)
+
def test_vfu_realize_ctx_pci_bars():
ctx = vfu_create_ctx()
- assert ctx != None
+ assert ctx is not None
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR0_REGION_IDX, size=4096,
flags=VFU_REGION_FLAG_RW)
@@ -83,9 +87,10 @@ def test_vfu_realize_ctx_pci_bars():
vfu_destroy_ctx(ctx)
+
def test_vfu_realize_ctx_irqs():
ctx = vfu_create_ctx()
- assert ctx != None
+ assert ctx is not None
ret = vfu_setup_device_nr_irqs(ctx, VFU_DEV_INTX_IRQ, 1)
assert ret == 0
@@ -99,9 +104,10 @@ def test_vfu_realize_ctx_irqs():
vfu_destroy_ctx(ctx)
+
def test_vfu_realize_ctx_caps():
ctx = vfu_create_ctx()
- assert ctx != None
+ assert ctx is not None
ret = vfu_pci_init(ctx)
assert ret == 0