LZ77 Compression

A little LZ77-ish compression alogirithm I made in a day cause I thought'd be fun.
It scores around 50% compression on HTML.

Note: Since this is running on WASM python, it is not fast for anything over a few MBs

Source Code

from __future__ import annotations
import struct
from dataclasses import dataclass
import typing as t

MAX_LOOKBEHIND_OFFSET = (2 ** 8) - 1
MIN_LOOKBEHIND_SIZE = 5
MAX_LOOKBEHIND_SIZE = (2 ** 8) - 1
MAX_DATA_SIZE = (2 ** 8) - 1

LOOKBEHIND_KIND = b"L"
LOOKBEHIND_KIND_BYTE = LOOKBEHIND_KIND[0]
LOOKBEHIND_FORAMT = ">cBB"
LOOKBEHIND_SIZE = struct.calcsize(LOOKBEHIND_FORAMT)

DATAHEADER_KIND = b"D"
DATAHEADER_KIND_BYTE = DATAHEADER_KIND[0]
DATAHEADER_FORAMT = ">cB"
DATAHEADER_SIZE = struct.calcsize(DATAHEADER_FORAMT)

@dataclass(slots=True, frozen=True)
class StreamSegment:
    cursor: int
    data: bytes


@dataclass(slots=True, frozen=True)
class LookBehind:
    length: int
    offset: int

def compress(data: bytes) -> bytes:
    return b"".join(x.data for x in compress_stream(data))

def compress_stream(data: bytes) -> t.Iterable[StreamSegment]:
    cursor = 0
    text_buffer = bytearray()

    def pop_text_buffer():
        nonlocal text_buffer
        if len(text_buffer) == 0:
            return

        header = struct.pack(DATAHEADER_FORAMT, DATAHEADER_KIND, len(text_buffer))
        yield StreamSegment(cursor, header)
        yield StreamSegment(cursor, text_buffer)
        text_buffer = bytearray()

    at_start = True
    at_end = False
    length = len(data)

    while cursor < length:
        at_start = at_start and cursor < MIN_LOOKBEHIND_SIZE
        at_end = at_end or cursor + MIN_LOOKBEHIND_SIZE >= length
        if at_start or at_end:
            lookbehind = None
        else:
            lookbehind = find_lookbehind(data, cursor)

        if lookbehind:
            yield from pop_text_buffer()
            header = struct.pack(LOOKBEHIND_FORAMT, LOOKBEHIND_KIND, lookbehind.offset, lookbehind.length)
            yield StreamSegment(cursor, header)
            cursor += lookbehind.length
        elif len(text_buffer) >= MAX_DATA_SIZE:
            yield from pop_text_buffer()
        else:
            text_buffer.append(data[cursor])
            cursor += 1

    yield from pop_text_buffer()


def find_lookbehind(data: bytes, cursor: int) -> LookBehind | None:
    # inline variables for minor performance bump
    index = data.find(
        data[cursor:cursor + MIN_LOOKBEHIND_SIZE], # match
        cursor - MAX_LOOKBEHIND_OFFSET, # start
        cursor # end
    )

    if index == -1:
        return None

    length = MIN_LOOKBEHIND_SIZE
    while True:
        if index + length >= cursor:
            break

        if length >= MAX_LOOKBEHIND_SIZE:
            break

        a = data[index: index + length + 1]
        b = data[cursor: cursor + length + 1]
        is_match = a == b

        if not is_match:
            break

        length += 1

    offset = cursor - (index + length)
    return LookBehind(length, offset)


def decompress(data: bytes) -> bytes:
    return b"".join(x.data for x in decompress_stream(data))

def decompress_stream(data: bytes) -> t.Iterable[StreamSegment]:
    cursor = 0
    output = bytearray()

    while cursor < len(data):
        kind = data[cursor]

        if kind == DATAHEADER_KIND_BYTE:
            slice = data[cursor: cursor + DATAHEADER_SIZE]
            [_, length] = struct.unpack(DATAHEADER_FORAMT, slice)
            cursor += DATAHEADER_SIZE

            fragment_data = data[cursor: cursor + length]
            output.extend(fragment_data)
            yield StreamSegment(cursor, fragment_data)
            cursor += length
        elif kind == LOOKBEHIND_KIND_BYTE:
            slice = data[cursor: cursor + LOOKBEHIND_SIZE]
            [_, offset, length] = struct.unpack(LOOKBEHIND_FORAMT, slice)

            cursor += LOOKBEHIND_SIZE
            end = len(output) - offset
            start = end - length
            fragment_data = output[start:end]
            output.extend(fragment_data)
            output[:] = output[-MAX_LOOKBEHIND_OFFSET:]
            yield StreamSegment(cursor, fragment_data)
        else:
            raise ValueError(f"invalid kind: {chr(kind)}")