aboutsummaryrefslogtreecommitdiff
path: root/tests/functional/qemu_test/archive.py
blob: c803fdaf6dca42114acd607105e2be56965c919a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Utilities for python-based QEMU tests
#
# Copyright 2024 Red Hat, Inc.
#
# Authors:
#  Thomas Huth <thuth@redhat.com>

import os
from subprocess import check_call, run, DEVNULL
import tarfile
from urllib.parse import urlparse
import zipfile

from .asset import Asset


def tar_extract(archive, dest_dir, member=None):
    with tarfile.open(archive) as tf:
        if hasattr(tarfile, 'data_filter'):
            tf.extraction_filter = getattr(tarfile, 'data_filter',
                                           (lambda member, path: member))
        if member:
            tf.extract(member=member, path=dest_dir)
        else:
            tf.extractall(path=dest_dir)

def cpio_extract(archive, output_path):
    cwd = os.getcwd()
    os.chdir(output_path)
    # Not passing 'check=True' as cpio exits with non-zero
    # status if the archive contains any device nodes :-(
    if type(archive) == str:
        run(['cpio', '-i', '-F', archive],
            stdout=DEVNULL, stderr=DEVNULL)
    else:
        run(['cpio', '-i'],
            input=archive.read(),
            stdout=DEVNULL, stderr=DEVNULL)
    os.chdir(cwd)

def zip_extract(archive, dest_dir, member=None):
    with zipfile.ZipFile(archive, 'r') as zf:
        if member:
            zf.extract(member=member, path=dest_dir)
        else:
            zf.extractall(path=dest_dir)

def deb_extract(archive, dest_dir, member=None):
    cwd = os.getcwd()
    os.chdir(dest_dir)
    try:
        proc = run(['ar', 't', archive],
                   check=True, capture_output=True, encoding='utf8')
        file_path = proc.stdout.split()[2]
        check_call(['ar', 'x', archive, file_path],
                   stdout=DEVNULL, stderr=DEVNULL)
        tar_extract(file_path, dest_dir, member)
    finally:
        os.chdir(cwd)

'''
@params archive: filename, Asset, or file-like object to extract
@params dest_dir: target directory to extract into
@params member: optional member file to limit extraction to

Extracts @archive into @dest_dir. All files are extracted
unless @member specifies a limit.

If @format is None, heuristics will be applied to guess the format
from the filename or Asset URL. @format must be non-None if @archive
is a file-like object.
'''
def archive_extract(archive, dest_dir, format=None, member=None):
    if format is None:
        format = guess_archive_format(archive)
    if type(archive) == Asset:
        archive = str(archive)

    if format == "tar":
        tar_extract(archive, dest_dir, member)
    elif format == "zip":
        zip_extract(archive, dest_dir, member)
    elif format == "cpio":
        if member is not None:
            raise Exception("Unable to filter cpio extraction")
        cpio_extract(archive, dest_dir)
    elif format == "deb":
        if type(archive) != str:
            raise Exception("Unable to use file-like object with deb archives")
        deb_extract(archive, dest_dir, "./" + member)
    else:
        raise Exception(f"Unknown archive format {format}")

'''
@params archive: filename, or Asset to guess

Guess the format of @compressed, raising an exception if
no format can be determined
'''
def guess_archive_format(archive):
    if type(archive) == Asset:
        archive = urlparse(archive.url).path
    elif type(archive) != str:
        raise Exception(f"Unable to guess archive format for {archive}")

    if ".tar." in archive or archive.endswith("tgz"):
        return "tar"
    elif archive.endswith(".zip"):
        return "zip"
    elif archive.endswith(".cpio"):
        return "cpio"
    elif archive.endswith(".deb") or archive.endswith(".udeb"):
        return "deb"
    else:
        raise Exception(f"Unknown archive format for {archive}")