aboutsummaryrefslogtreecommitdiff
path: root/tests/functional/qemu_test/uncompress.py
blob: 76dcf223856172a66890659287481af08f5db1f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Utilities for python-based QEMU tests
#
# Copyright 2024 Red Hat, Inc.
#
# Authors:
#  Thomas Huth <thuth@redhat.com>

import gzip
import lzma
import os
import stat
import shutil
from urllib.parse import urlparse
from subprocess import check_call, CalledProcessError

from .asset import Asset


def gzip_uncompress(gz_path, output_path):
    if os.path.exists(output_path):
        return
    with gzip.open(gz_path, 'rb') as gz_in:
        try:
            with open(output_path, 'wb') as raw_out:
                shutil.copyfileobj(gz_in, raw_out)
        except:
            os.remove(output_path)
            raise

def lzma_uncompress(xz_path, output_path):
    if os.path.exists(output_path):
        return
    with lzma.open(xz_path, 'rb') as lzma_in:
        try:
            with open(output_path, 'wb') as raw_out:
                shutil.copyfileobj(lzma_in, raw_out)
        except:
            os.remove(output_path)
            raise


def zstd_uncompress(zstd_path, output_path):
    if os.path.exists(output_path):
        return

    try:
        check_call(['zstd', "-f", "-d", zstd_path,
                    "-o", output_path])
    except CalledProcessError as e:
        os.remove(output_path)
        raise Exception(
            f"Unable to decompress zstd file {zstd_path} with {e}") from e

    # zstd copies source archive permissions for the output
    # file, so must make this writable for QEMU
    os.chmod(output_path, stat.S_IRUSR | stat.S_IWUSR)


'''
@params compressed: filename, Asset, or file-like object to uncompress
@params uncompressed: filename to uncompress into
@params format: optional compression format (gzip, lzma)

Uncompresses @compressed into @uncompressed

If @format is None, heuristics will be applied to guess the format
from the filename or Asset URL. @format must be non-None if @uncompressed
is a file-like object.

Returns the fully qualified path to the uncompessed file
'''
def uncompress(compressed, uncompressed, format=None):
    if format is None:
        format = guess_uncompress_format(compressed)

    if format == "xz":
        lzma_uncompress(str(compressed), uncompressed)
    elif format == "gz":
        gzip_uncompress(str(compressed), uncompressed)
    elif format == "zstd":
        zstd_uncompress(str(compressed), uncompressed)
    else:
        raise Exception(f"Unknown compression format {format}")

'''
@params compressed: filename, Asset, or file-like object to guess

Guess the format of @compressed, raising an exception if
no format can be determined
'''
def guess_uncompress_format(compressed):
    if type(compressed) == Asset:
        compressed = urlparse(compressed.url).path
    elif type(compressed) != str:
        raise Exception(f"Unable to guess compression cformat for {compressed}")

    (name, ext) = os.path.splitext(compressed)
    if ext == ".xz":
        return "xz"
    elif ext == ".gz":
        return "gz"
    elif ext in [".zstd", ".zst"]:
        return 'zstd'
    else:
        raise Exception(f"Unknown compression format for {compressed}")