LZ77 Compression
A little LZ77-ish compression alogirithm I made in a day cause I thought'd be fun.
It scores around 50% compression on HTML.
Note: Since this is running on WASM python, it is not fast for anything over a few MBs
Source Code
from __future__ import annotations
import struct
from dataclasses import dataclass
import typing as t
MAX_LOOKBEHIND_OFFSET = (2 ** 8) - 1
MIN_LOOKBEHIND_SIZE = 5
MAX_LOOKBEHIND_SIZE = (2 ** 8) - 1
MAX_DATA_SIZE = (2 ** 8) - 1
LOOKBEHIND_KIND = b"L"
LOOKBEHIND_KIND_BYTE = LOOKBEHIND_KIND[0]
LOOKBEHIND_FORAMT = ">cBB"
LOOKBEHIND_SIZE = struct.calcsize(LOOKBEHIND_FORAMT)
DATAHEADER_KIND = b"D"
DATAHEADER_KIND_BYTE = DATAHEADER_KIND[0]
DATAHEADER_FORAMT = ">cB"
DATAHEADER_SIZE = struct.calcsize(DATAHEADER_FORAMT)
@dataclass(slots=True, frozen=True)
class StreamSegment:
cursor: int
data: bytes
@dataclass(slots=True, frozen=True)
class LookBehind:
length: int
offset: int
def compress(data: bytes) -> bytes:
return b"".join(x.data for x in compress_stream(data))
def compress_stream(data: bytes) -> t.Iterable[StreamSegment]:
cursor = 0
text_buffer = bytearray()
def pop_text_buffer():
nonlocal text_buffer
if len(text_buffer) == 0:
return
header = struct.pack(DATAHEADER_FORAMT, DATAHEADER_KIND, len(text_buffer))
yield StreamSegment(cursor, header)
yield StreamSegment(cursor, text_buffer)
text_buffer = bytearray()
at_start = True
at_end = False
length = len(data)
while cursor < length:
at_start = at_start and cursor < MIN_LOOKBEHIND_SIZE
at_end = at_end or cursor + MIN_LOOKBEHIND_SIZE >= length
if at_start or at_end:
lookbehind = None
else:
lookbehind = find_lookbehind(data, cursor)
if lookbehind:
yield from pop_text_buffer()
header = struct.pack(LOOKBEHIND_FORAMT, LOOKBEHIND_KIND, lookbehind.offset, lookbehind.length)
yield StreamSegment(cursor, header)
cursor += lookbehind.length
elif len(text_buffer) >= MAX_DATA_SIZE:
yield from pop_text_buffer()
else:
text_buffer.append(data[cursor])
cursor += 1
yield from pop_text_buffer()
def find_lookbehind(data: bytes, cursor: int) -> LookBehind | None:
# inline variables for minor performance bump
index = data.find(
data[cursor:cursor + MIN_LOOKBEHIND_SIZE], # match
cursor - MAX_LOOKBEHIND_OFFSET, # start
cursor # end
)
if index == -1:
return None
length = MIN_LOOKBEHIND_SIZE
while True:
if index + length >= cursor:
break
if length >= MAX_LOOKBEHIND_SIZE:
break
a = data[index: index + length + 1]
b = data[cursor: cursor + length + 1]
is_match = a == b
if not is_match:
break
length += 1
offset = cursor - (index + length)
return LookBehind(length, offset)
def decompress(data: bytes) -> bytes:
return b"".join(x.data for x in decompress_stream(data))
def decompress_stream(data: bytes) -> t.Iterable[StreamSegment]:
cursor = 0
output = bytearray()
while cursor < len(data):
kind = data[cursor]
if kind == DATAHEADER_KIND_BYTE:
slice = data[cursor: cursor + DATAHEADER_SIZE]
[_, length] = struct.unpack(DATAHEADER_FORAMT, slice)
cursor += DATAHEADER_SIZE
fragment_data = data[cursor: cursor + length]
output.extend(fragment_data)
yield StreamSegment(cursor, fragment_data)
cursor += length
elif kind == LOOKBEHIND_KIND_BYTE:
slice = data[cursor: cursor + LOOKBEHIND_SIZE]
[_, offset, length] = struct.unpack(LOOKBEHIND_FORAMT, slice)
cursor += LOOKBEHIND_SIZE
end = len(output) - offset
start = end - length
fragment_data = output[start:end]
output.extend(fragment_data)
output[:] = output[-MAX_LOOKBEHIND_OFFSET:]
yield StreamSegment(cursor, fragment_data)
else:
raise ValueError(f"invalid kind: {chr(kind)}")