aboutsummaryrefslogtreecommitdiff
path: root/python/tests/bro_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests/bro_test.py')
-rw-r--r--python/tests/bro_test.py120
1 files changed, 120 insertions, 0 deletions
diff --git a/python/tests/bro_test.py b/python/tests/bro_test.py
new file mode 100644
index 0000000..16ed7ef
--- /dev/null
+++ b/python/tests/bro_test.py
@@ -0,0 +1,120 @@
+# Copyright 2016 The Brotli Authors. All rights reserved.
+#
+# Distributed under MIT license.
+# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
+
+import subprocess
+import unittest
+
+import _test_utils
+import brotli
+
+PYTHON = _test_utils.PYTHON
+BRO = _test_utils.BRO
+TEST_ENV = _test_utils.TEST_ENV
+
+
+def _get_original_name(test_data):
+ return test_data.split('.compressed')[0]
+
+
+class TestBroDecompress(_test_utils.TestCase):
+
+ def _check_decompression(self, test_data):
+ # Verify decompression matches the original.
+ temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
+ original = _get_original_name(test_data)
+ self.assertFilesMatch(temp_uncompressed, original)
+
+ def _decompress_file(self, test_data):
+ temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
+ args = [PYTHON, BRO, '-f', '-d', '-i', test_data, '-o',
+ temp_uncompressed]
+ subprocess.check_call(args, env=TEST_ENV)
+
+ def _decompress_pipe(self, test_data):
+ temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
+ args = [PYTHON, BRO, '-d']
+ with open(temp_uncompressed, 'wb') as out_file:
+ with open(test_data, 'rb') as in_file:
+ subprocess.check_call(
+ args, stdin=in_file, stdout=out_file, env=TEST_ENV)
+
+ def _test_decompress_file(self, test_data):
+ self._decompress_file(test_data)
+ self._check_decompression(test_data)
+
+ def _test_decompress_pipe(self, test_data):
+ self._decompress_pipe(test_data)
+ self._check_decompression(test_data)
+
+
+_test_utils.generate_test_methods(TestBroDecompress, for_decompression=True)
+
+
+class TestBroCompress(_test_utils.TestCase):
+
+ VARIANTS = {'quality': (1, 6, 9, 11), 'lgwin': (10, 15, 20, 24)}
+
+ def _check_decompression(self, test_data, **kwargs):
+ # Write decompression to temp file and verify it matches the original.
+ temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
+ temp_compressed = _test_utils.get_temp_compressed_name(test_data)
+ original = test_data
+ args = [PYTHON, BRO, '-f', '-d']
+ if 'dictionary' in kwargs:
+ args.extend(['--custom-dictionary', str(kwargs['dictionary'])])
+ args.extend(['-i', temp_compressed, '-o', temp_uncompressed])
+ subprocess.check_call(args, env=TEST_ENV)
+ self.assertFilesMatch(temp_uncompressed, original)
+
+ def _compress_file(self, test_data, **kwargs):
+ temp_compressed = _test_utils.get_temp_compressed_name(test_data)
+ args = [PYTHON, BRO, '-f']
+ if 'quality' in kwargs:
+ args.extend(['-q', str(kwargs['quality'])])
+ if 'lgwin' in kwargs:
+ args.extend(['--lgwin', str(kwargs['lgwin'])])
+ if 'dictionary' in kwargs:
+ args.extend(['--custom-dictionary', str(kwargs['dictionary'])])
+ args.extend(['-i', test_data, '-o', temp_compressed])
+ subprocess.check_call(args, env=TEST_ENV)
+
+ def _compress_pipe(self, test_data, **kwargs):
+ temp_compressed = _test_utils.get_temp_compressed_name(test_data)
+ args = [PYTHON, BRO]
+ if 'quality' in kwargs:
+ args.extend(['-q', str(kwargs['quality'])])
+ if 'lgwin' in kwargs:
+ args.extend(['--lgwin', str(kwargs['lgwin'])])
+ if 'dictionary' in kwargs:
+ args.extend(['--custom-dictionary', str(kwargs['dictionary'])])
+ with open(temp_compressed, 'wb') as out_file:
+ with open(test_data, 'rb') as in_file:
+ subprocess.check_call(
+ args, stdin=in_file, stdout=out_file, env=TEST_ENV)
+
+ def _test_compress_file(self, test_data, **kwargs):
+ self._compress_file(test_data, **kwargs)
+ self._check_decompression(test_data)
+
+ def _test_compress_pipe(self, test_data, **kwargs):
+ self._compress_pipe(test_data, **kwargs)
+ self._check_decompression(test_data)
+
+ def _test_compress_file_custom_dictionary(self, test_data, **kwargs):
+ kwargs['dictionary'] = test_data
+ self._compress_file(test_data, **kwargs)
+ self._check_decompression(test_data, **kwargs)
+
+ def _test_compress_pipe_custom_dictionary(self, test_data, **kwargs):
+ kwargs['dictionary'] = test_data
+ self._compress_pipe(test_data, **kwargs)
+ self._check_decompression(test_data, **kwargs)
+
+
+_test_utils.generate_test_methods(
+ TestBroCompress, variants=TestBroCompress.VARIANTS)
+
+if __name__ == '__main__':
+ unittest.main()