aboutsummaryrefslogtreecommitdiff
path: root/test/py/test_device_get_region_io_fds.py
diff options
context:
space:
mode:
authorJohn Levon <levon@movementarian.org>2021-10-22 13:56:38 +0100
committerGitHub <noreply@github.com>2021-10-22 13:56:38 +0100
commit7fd786fa7f0023e7bf74618c6ebc3999f5736bc6 (patch)
tree79a152071fe31d3f6bd0b5e1fbecabc310e3c895 /test/py/test_device_get_region_io_fds.py
parent3111d8ec3ed5557b64434778fb2596908235bfa2 (diff)
downloadlibvfio-user-7fd786fa7f0023e7bf74618c6ebc3999f5736bc6.zip
libvfio-user-7fd786fa7f0023e7bf74618c6ebc3999f5736bc6.tar.gz
libvfio-user-7fd786fa7f0023e7bf74618c6ebc3999f5736bc6.tar.bz2
run python code through flake8 (#613)
Aside from general style goodness, this found a couple of accidental re-definitions, so it's worth taking the pain now. Also, only run rstlint as part of pre-push. Signed-off-by: John Levon <john.levon@nutanix.com> Reviewed-by: Swapnil Ingle <swapnil.ingle@nutanix.com>
Diffstat (limited to 'test/py/test_device_get_region_io_fds.py')
-rw-r--r--test/py/test_device_get_region_io_fds.py124
1 files changed, 69 insertions, 55 deletions
diff --git a/test/py/test_device_get_region_io_fds.py b/test/py/test_device_get_region_io_fds.py
index 2179cf7..bffe508 100644
--- a/test/py/test_device_get_region_io_fds.py
+++ b/test/py/test_device_get_region_io_fds.py
@@ -40,16 +40,17 @@ sock = None
fds = []
IOEVENT_SIZE = 8
+
def test_device_get_region_io_fds_setup():
global ctx, sock
ctx = vfu_create_ctx(flags=LIBVFIO_USER_FLAG_ATTACH_NB)
- assert ctx != None
+ assert ctx is not None
f = tempfile.TemporaryFile()
f.truncate(65536)
- mmap_areas = [ (0x2000, 0x1000), (0x4000, 0x2000) ]
+ mmap_areas = [(0x2000, 0x1000), (0x4000, 0x2000)]
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR1_REGION_IDX, size=0x8000,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM),
@@ -59,7 +60,7 @@ def test_device_get_region_io_fds_setup():
f = tempfile.TemporaryFile()
f.truncate(65536)
- mmap_areas = [ (0x2000, 0x1000), (0x4000, 0x2000) ]
+ mmap_areas = [(0x2000, 0x1000), (0x4000, 0x2000)]
ret = vfu_setup_region(ctx, index=VFU_PCI_DEV_BAR2_REGION_IDX, size=0x8000,
flags=(VFU_REGION_FLAG_RW | VFU_REGION_FLAG_MEM),
@@ -74,55 +75,60 @@ def test_device_get_region_io_fds_setup():
assert ret == 0
sock = connect_client(ctx)
- for i in range(0,6):
- tmp = eventfd(0,0)
+ for i in range(0, 6):
+ tmp = eventfd(0, 0)
fds.append(tmp)
assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, tmp,
i * IOEVENT_SIZE, IOEVENT_SIZE, 0, 0) != -1
+
def test_device_get_region_io_fds_bad_flags():
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()) +
- len(vfio_user_sub_region_ioeventfd()) * 5, flags = 1,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0)
+ argsz=len(vfio_user_region_io_fds_reply()) +
+ len(vfio_user_sub_region_ioeventfd()) * 5, flags=1,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0)
msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
expect=errno.EINVAL)
+
def test_device_get_region_io_fds_bad_count():
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()) +
- len(vfio_user_sub_region_ioeventfd()) * 5, flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 1)
+ argsz=len(vfio_user_region_io_fds_reply()) +
+ len(vfio_user_sub_region_ioeventfd()) * 5, flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=1)
msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
expect=errno.EINVAL)
+
def test_device_get_region_io_fds_buffer_too_small():
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()) - 1, flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 1)
+ argsz=len(vfio_user_region_io_fds_reply()) - 1, flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=1)
msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
expect=errno.EINVAL)
+
def test_device_get_region_io_fds_buffer_too_large():
- payload = vfio_user_region_io_fds_request(argsz = SERVER_MAX_DATA_XFER_SIZE
- + 1, flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX,
- count = 1)
+ payload = vfio_user_region_io_fds_request(argsz=SERVER_MAX_DATA_XFER_SIZE
+ + 1, flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX,
+ count=1)
+
+ msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
+ expect=errno.EINVAL)
- msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect =
- errno.EINVAL)
def test_device_get_region_io_fds_no_fds():
- payload = vfio_user_region_io_fds_request(argsz = 512, flags = 0,
- index = VFU_PCI_DEV_BAR1_REGION_IDX, count = 0)
+ payload = vfio_user_region_io_fds_request(argsz=512, flags=0,
+ index=VFU_PCI_DEV_BAR1_REGION_IDX, count=0)
ret = msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect=0)
@@ -136,19 +142,20 @@ def test_device_get_region_io_fds_no_fds():
def test_device_get_region_io_fds_no_regions_setup():
- payload = vfio_user_region_io_fds_request(argsz = 512, flags = 0,
- index = VFU_PCI_DEV_BAR3_REGION_IDX, count = 0)
+ payload = vfio_user_region_io_fds_request(argsz=512, flags=0,
+ index=VFU_PCI_DEV_BAR3_REGION_IDX, count=0)
+
+ msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
+ expect=errno.EINVAL)
- ret = msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
- expect = errno.EINVAL)
def test_device_get_region_io_fds_region_no_mmap():
- payload = vfio_user_region_io_fds_request(argsz = 512, flags = 0,
- index = VFU_PCI_DEV_BAR5_REGION_IDX, count = 0)
+ payload = vfio_user_region_io_fds_request(argsz=512, flags=0,
+ index=VFU_PCI_DEV_BAR5_REGION_IDX, count=0)
ret = msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
- expect = 0)
+ expect=0)
reply, ret = vfio_user_region_io_fds_reply.pop_from_buffer(ret)
@@ -157,20 +164,22 @@ def test_device_get_region_io_fds_region_no_mmap():
assert reply.flags == 0
assert reply.index == VFU_PCI_DEV_BAR5_REGION_IDX
+
def test_device_get_region_io_fds_region_out_of_range():
- payload = vfio_user_region_io_fds_request(argsz = 512, flags = 0,
- index = 512, count = 0)
+ payload = vfio_user_region_io_fds_request(argsz=512, flags=0,
+ index=512, count=0)
+
+ msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload,
+ expect=errno.EINVAL)
- msg(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS, payload, expect =
- errno.EINVAL)
def test_device_get_region_io_fds_fds_read_write():
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()) +
- len(vfio_user_sub_region_ioeventfd()) * 10, flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0)
+ argsz=len(vfio_user_region_io_fds_reply()) +
+ len(vfio_user_sub_region_ioeventfd()) * 10, flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0)
newfds, ret = msg_fds(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS,
payload, expect=0)
@@ -188,18 +197,19 @@ def test_device_get_region_io_fds_fds_read_write():
# Server
for i in range(0, len(newfds)):
out = os.read(newfds[i], IOEVENT_SIZE)
- [out] = struct.unpack("@Q",out)
+ [out] = struct.unpack("@Q", out)
assert out == 10
for i in newfds:
os.close(i)
+
def test_device_get_region_io_fds_full():
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()) +
- len(vfio_user_sub_region_ioeventfd()) * 6, flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0)
+ argsz=len(vfio_user_region_io_fds_reply()) +
+ len(vfio_user_sub_region_ioeventfd()) * 6, flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0)
newfds, ret = msg_fds(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS,
payload, expect=0)
@@ -214,7 +224,7 @@ def test_device_get_region_io_fds_full():
for i in range(0, reply.count):
out = os.read(newfds[ioevents[i].fd_index], ioevent.size)
- [out] = struct.unpack("@Q",out)
+ [out] = struct.unpack("@Q", out)
assert out == 1
assert ioevents[i].size == IOEVENT_SIZE
assert ioevents[i].offset == 40 - (IOEVENT_SIZE * i)
@@ -223,11 +233,12 @@ def test_device_get_region_io_fds_full():
for i in newfds:
os.close(i)
+
def test_device_get_region_io_fds_fds_read_write_nothing():
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()), flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0)
+ argsz=len(vfio_user_region_io_fds_reply()), flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0)
newfds, ret = msg_fds(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS,
payload, expect=0)
@@ -237,21 +248,22 @@ def test_device_get_region_io_fds_fds_read_write_nothing():
assert reply.argsz == len(vfio_user_region_io_fds_reply()) + \
len(vfio_user_sub_region_ioeventfd()) * 6
+
def test_device_get_region_io_fds_fds_read_write_dupe_fd():
""" Test here to show that we can return mutliple sub regions with the same
- fd_index. fd_index points to the list of fds returned from the socket as
- returned by msg_fds. """
+ fd_index. fd_index points to the list of fds returned from the socket
+ as returned by msg_fds. """
- t = eventfd(0,0)
+ t = eventfd(0, 0)
assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, t, 6 *
IOEVENT_SIZE, IOEVENT_SIZE, 0, 0) != -1
assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, t, 7 *
IOEVENT_SIZE, IOEVENT_SIZE, 0, 0) != -1
payload = vfio_user_region_io_fds_request(
- argsz = len(vfio_user_region_io_fds_reply()) +
- len(vfio_user_sub_region_ioeventfd()) * 8, flags = 0,
- index = VFU_PCI_DEV_BAR2_REGION_IDX, count = 0)
+ argsz=len(vfio_user_region_io_fds_reply()) +
+ len(vfio_user_sub_region_ioeventfd()) * 8, flags=0,
+ index=VFU_PCI_DEV_BAR2_REGION_IDX, count=0)
newfds, ret = msg_fds(ctx, sock, VFIO_USER_DEVICE_GET_REGION_IO_FDS,
payload, expect=0)
@@ -271,7 +283,7 @@ def test_device_get_region_io_fds_fds_read_write_dupe_fd():
for i in range(2, 8):
out = os.read(newfds[ioevents[i].fd_index], ioevent.size)
- [out] = struct.unpack("@Q",out)
+ [out] = struct.unpack("@Q", out)
assert out == 1
assert ioevents[i].size == IOEVENT_SIZE
assert ioevents[i].offset == 56 - (IOEVENT_SIZE * i)
@@ -283,30 +295,32 @@ def test_device_get_region_io_fds_fds_read_write_dupe_fd():
os.write(newfds[ioevents[0].fd_index], c.c_ulonglong(1))
out = os.read(newfds[ioevents[1].fd_index], ioevent.size)
- [out] = struct.unpack("@Q",out)
+ [out] = struct.unpack("@Q", out)
assert out == 1
os.write(newfds[ioevents[1].fd_index], c.c_ulonglong(1))
out = os.read(newfds[ioevents[0].fd_index], ioevent.size)
- [out] = struct.unpack("@Q",out)
+ [out] = struct.unpack("@Q", out)
assert out == 1
os.write(newfds[ioevents[0].fd_index], c.c_ulonglong(1))
out = os.read(newfds[ioevents[1].fd_index], ioevent.size)
- [out] = struct.unpack("@Q",out)
+ [out] = struct.unpack("@Q", out)
assert out == 1
for i in newfds:
os.close(i)
+
def test_device_get_region_io_fds_ioeventfd_invalid_size():
- t = eventfd(0,0)
- assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, t, 0x8000
- -2048, 4096, 0, 0) == -1
+ t = eventfd(0, 0)
+ assert vfu_create_ioeventfd(ctx, VFU_PCI_DEV_BAR2_REGION_IDX, t,
+ 0x8000 - 0x800, 4096, 0, 0) == -1
os.close(t)
+
def test_device_get_region_info_cleanup():
for i in fds:
os.close(i)