... so that the script can know its uncompressed size and calculate the appropriate buffer size.
92 lines
2.7 KiB
Python
92 lines
2.7 KiB
Python
import ctypes
|
|
import functools
|
|
import os
|
|
from pathlib import Path
|
|
|
|
HERE = Path(__file__).resolve().parent
|
|
|
|
|
|
@functools.lru_cache(maxsize=None)
|
|
def _liblzfx():
|
|
so_path = os.path.join(HERE, "../source/Objects/host/lzfx/liblzfx.so")
|
|
liblzfx = ctypes.cdll.LoadLibrary(so_path)
|
|
return liblzfx
|
|
|
|
|
|
@functools.lru_cache(maxsize=None)
|
|
def _fn_lzfx_compress():
|
|
"""Returns the lzfx_compress C function.
|
|
::
|
|
|
|
/* Buffer-to buffer compression.
|
|
|
|
Supply pre-allocated input and output buffers via ibuf and obuf, and
|
|
their size in bytes via ilen and olen. Buffers may not overlap.
|
|
|
|
On success, the function returns a non-negative value and the argument
|
|
olen contains the compressed size in bytes. On failure, a negative
|
|
value is returned and olen is not modified.
|
|
*/
|
|
int lzfx_compress(const void* ibuf, unsigned int ilen,
|
|
void* obuf, unsigned int *olen);
|
|
"""
|
|
|
|
fn = _liblzfx().lzfx_compress
|
|
fn.argtype = [
|
|
ctypes.c_char_p,
|
|
ctypes.c_uint,
|
|
ctypes.c_char_p,
|
|
ctypes.POINTER(ctypes.c_uint),
|
|
]
|
|
fn.restype = ctypes.c_int
|
|
return fn
|
|
|
|
|
|
def compress(data: bytes) -> bytes:
|
|
"""Returns a bytes object of the lzfx-compressed data."""
|
|
|
|
fn_compress = _fn_lzfx_compress()
|
|
|
|
output_buffer_len = len(data) + 8
|
|
|
|
ibuf = data
|
|
ilen = len(ibuf)
|
|
obuf = ctypes.create_string_buffer(output_buffer_len)
|
|
olen = ctypes.c_uint(output_buffer_len)
|
|
|
|
res = fn_compress(ibuf, ilen, obuf, ctypes.byref(olen))
|
|
|
|
if res < 0:
|
|
raise LzfxError(res)
|
|
else:
|
|
return bytes(obuf[: olen.value]) # type: ignore
|
|
|
|
|
|
class LzfxError(Exception):
|
|
"""Exception raised for lzfx compression or decompression error.
|
|
|
|
Attributes:
|
|
error_code -- The source error code, which is a negative integer
|
|
error_name -- The constant name of the error
|
|
message -- explanation of the error
|
|
"""
|
|
|
|
# define LZFX_ESIZE -1 /* Output buffer too small */
|
|
# define LZFX_ECORRUPT -2 /* Invalid data for decompression */
|
|
# define LZFX_EARGS -3 /* Arguments invalid (NULL) */
|
|
|
|
def __init__(self, error_code):
|
|
self.error_code = error_code
|
|
if error_code == -1:
|
|
self.error_name = "LZFX_ESIZE"
|
|
self.message = "Output buffer too small"
|
|
elif error_code == -2:
|
|
self.error_name = "LZFX_ECORRUPT"
|
|
self.message = "Invalid data for decompression"
|
|
elif error_code == -3:
|
|
self.error_name = "LZFX_EARGS"
|
|
self.message = "Arguments invalid (NULL)"
|
|
else:
|
|
self.error_name = "UNKNOWN"
|
|
self.message = "Unknown error"
|