# # Copyright (c) 2021 Nutanix Inc. All rights reserved. # # Authors: Swapnil Ingle # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of Nutanix nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH # DAMAGE. # import ctypes import errno from libvfio_user import * import tempfile ctx = None def test_dma_sg_size(): size = dma_sg_size() assert size == len(dma_sg_t()) def test_sgl_get_with_invalid_region(): global ctx ctx = prepare_ctx_for_dma() assert ctx is not None sg = dma_sg_t() iovec = iovec_t() ret = vfu_sgl_get(ctx, sg, iovec) assert ret == -1 assert ctypes.get_errno() == errno.EINVAL def test_sgl_get_without_fd(): client = connect_client(ctx) payload = vfio_user_dma_map(argsz=len(vfio_user_dma_map()), flags=(VFIO_USER_F_DMA_REGION_READ | VFIO_USER_F_DMA_REGION_WRITE), offset=0, addr=0x1000, size=4096) msg(ctx, client.sock, VFIO_USER_DMA_MAP, payload) sg = dma_sg_t() iovec = iovec_t() sg.region = 0 ret = vfu_sgl_get(ctx, sg, iovec) assert ret == -1 assert ctypes.get_errno() == errno.EFAULT client.disconnect(ctx) def test_get_multiple_sge(): client = connect_client(ctx) regions = 4 f = tempfile.TemporaryFile() f.truncate(0x1000 * regions) for i in range(1, regions): payload = vfio_user_dma_map(argsz=len(vfio_user_dma_map()), flags=(VFIO_USER_F_DMA_REGION_READ | VFIO_USER_F_DMA_REGION_WRITE), offset=0, addr=0x1000 * i, size=4096) msg(ctx, client.sock, VFIO_USER_DMA_MAP, payload, fds=[f.fileno()]) ret, sg = vfu_addr_to_sgl(ctx, dma_addr=0x1000, length=4096 * 3, max_nr_sgs=3, prot=mmap.PROT_READ) assert ret == 3 iovec = (iovec_t * 3)() ret = vfu_sgl_get(ctx, sg, iovec, cnt=3) assert ret == 0 assert iovec[0].iov_len == 4096 assert iovec[1].iov_len == 4096 assert iovec[2].iov_len == 4096 client.disconnect(ctx) def test_sgl_put(): client = connect_client(ctx) regions = 4 f = tempfile.TemporaryFile() f.truncate(0x1000 * regions) for i in range(1, regions): payload = vfio_user_dma_map(argsz=len(vfio_user_dma_map()), flags=(VFIO_USER_F_DMA_REGION_READ | VFIO_USER_F_DMA_REGION_WRITE), offset=0, addr=0x1000 * i, size=4096) msg(ctx, client.sock, VFIO_USER_DMA_MAP, payload, fds=[f.fileno()]) ret, sg = vfu_addr_to_sgl(ctx, dma_addr=0x1000, length=4096 * 3, max_nr_sgs=3, prot=mmap.PROT_READ) assert ret == 3 iovec = (iovec_t * 3)() ret = vfu_sgl_get(ctx, sg, iovec, cnt=3) assert ret == 0 vfu_sgl_put(ctx, sg, iovec, cnt=3) client.disconnect(ctx) def test_sgl_get_put_cleanup(): vfu_destroy_ctx(ctx) # ex: set tabstop=4 shiftwidth=4 softtabstop=4 expandtab: