Skip to content

bovine.crypto

This module includes wrappers for creating cryptographic identities and the functionality to verify signatures of http requests. BovineClient and BovineActor take care of making properly signed requests.

bovine.crypto

build_validate_http_signature

build_validate_http_signature(
    key_retriever: Callable[
        [str],
        Awaitable[
            Tuple[str | None, str | None]
            | CryptographicIdentifier
            | None
        ],
    ],
    skip_digest_check: bool = False,
)

Creates a validate_signature function. validate_signature takes the request as parameter and returns the owner if the http signature is valid. If you do not wish to use quart (or a compatible framework), you should use build_validate_http_signature_raw.

Example for the key_retriever argument.

from bovine.crypto.types import CryptographicIdentifier

async def retrieve(key_id):
    async with aiohttp.ClientSession() as session:
        response = await session.get(key_id)
        data = await response.json()
        return CryptographicIdentifier.from_publickey(
            data.get("publicKey", data)
        )

validator = build_validate_http_signature(retrieve)

validator then accepts as argument a werzeug.wrappers.Request object.

Parameters:

Name Type Description Default
key_retriever Callable[[str], Awaitable[Tuple[str | None, str | None] | CryptographicIdentifier | None]]

A coroutine that given a key id returns the corresponding CryptographicIdentifier or a tuple public_key, owner. Here public_key is assumed to be PEM encoded and owner is an URI. In the Fediverse use case, owner will be the actor id.

required
skip_digest_check bool

Set to true to skip digest check

False

Returns:

Type Description
Source code in bovine/bovine/crypto/__init__.py
def build_validate_http_signature(
    key_retriever: Callable[
        [str], Awaitable[Tuple[str | None, str | None] | CryptographicIdentifier | None]
    ],
    skip_digest_check: bool = False,
):
    """Creates a validate_signature function. validate_signature takes the request
    as parameter and returns the owner if the http signature is valid. If you do not wish to use [quart](https://quart.palletsprojects.com/en/latest/) (or a compatible framework), you should use [build_validate_http_signature_raw][bovine.crypto.build_validate_http_signature_raw].

    Example for the `key_retriever` argument.

    ```python
    from bovine.crypto.types import CryptographicIdentifier

    async def retrieve(key_id):
        async with aiohttp.ClientSession() as session:
            response = await session.get(key_id)
            data = await response.json()
            return CryptographicIdentifier.from_publickey(
                data.get("publicKey", data)
            )

    validator = build_validate_http_signature(retrieve)
    ```

    `validator` then accepts as argument a `werzeug.wrappers.Request` object.


    :param key_retriever:
        A coroutine that given a key id returns the corresponding
        CryptographicIdentifier or a tuple `public_key, owner`. Here
        `public_key` is assumed to be PEM encoded and owner is an URI. In the Fediverse
        use case, owner will be the actor id.
    :param skip_digest_check: Set to true to skip digest check

    :return: The coroutine [SignatureChecker.validate_signature_request][bovine.crypto.signature_checker.SignatureChecker.validate_signature_request]
    """

    signature_checker = SignatureChecker(
        key_retriever, skip_digest_check=skip_digest_check
    )
    return signature_checker.validate_signature_request

build_validate_http_signature_raw

build_validate_http_signature_raw(
    key_retriever: Callable[
        [str],
        Awaitable[
            Tuple[str | None, str | None]
            | CryptographicIdentifier
            | None
        ],
    ],
    skip_digest_check: bool = False,
)

Creates a validate_signature function. validate_signature takes (method, url, headers, body) as parameters and returns the owner if the http signature is valid. The rest of behavior is as build_validate_http_signature.

Parameters:

Name Type Description Default
skip_digest_check bool

Set to true to skip digest check

False

Returns:

Type Description
Source code in bovine/bovine/crypto/__init__.py
def build_validate_http_signature_raw(
    key_retriever: Callable[
        [str], Awaitable[Tuple[str | None, str | None] | CryptographicIdentifier | None]
    ],
    skip_digest_check: bool = False,
):
    """Creates a validate_signature function. validate_signature takes
    `(method, url, headers, body)` as parameters and returns
    the owner if the http signature is valid.
    The rest of behavior is as `build_validate_http_signature`.

    :param skip_digest_check: Set to true to skip digest check

    :return: The coroutine [SignatureChecker.validate_signature][bovine.crypto.signature_checker.SignatureChecker.validate_signature]
    """

    signature_checker = SignatureChecker(
        key_retriever, skip_digest_check=skip_digest_check
    )
    return signature_checker.validate_signature

generate_ed25519_private_key

generate_ed25519_private_key() -> str

Returns a multicodec/multibase encoded ed25519 private key

Source code in bovine/bovine/crypto/__init__.py
def generate_ed25519_private_key() -> str:
    """Returns a multicodec/multibase encoded ed25519 private key"""
    private_key = ed25519.Ed25519PrivateKey.generate()

    return private_key_to_base58(private_key)

generate_rsa_public_private_key

generate_rsa_public_private_key() -> Tuple[str, str]

Generates a new pair of RSA public and private keys.

Returns:

Type Description
Tuple[str, str]

pem encoded public and private key

Source code in bovine/bovine/crypto/__init__.py
def generate_rsa_public_private_key() -> Tuple[str, str]:
    """Generates a new pair of RSA public and private keys.

    :returns: pem encoded public and private key
    """

    private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
    private_key_pem = private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption(),
    )

    public_key = private_key.public_key()
    public_key_pem = public_key.public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo,
    )

    return public_key_pem.decode("utf-8"), private_key_pem.decode("utf-8")

private_key_to_did_key

private_key_to_did_key(private_key_str: str) -> str

Computes public key in did key form of Ed25519 private key

Parameters:

Name Type Description Default
private_key_str str

multibase/multicodec encoded Ed25519 private key

required

Returns:

Type Description
str

did:key

Source code in bovine/bovine/crypto/__init__.py
def private_key_to_did_key(private_key_str: str) -> str:
    """Computes public key in did key form of Ed25519 private key

    :param private_key_str: multibase/multicodec encoded Ed25519 private key

    :return: did:key"""

    private_key = multibase_to_private_key(private_key_str)

    return public_key_to_did_key(private_key.public_key())

validate_moo_auth_signature async

validate_moo_auth_signature(
    request, domain
) -> Tuple[Optional[str], Optional[str]]

Validates the Moo-Auth-1 <https://blog.mymath.rocks/2023-03-15/BIN1_Moo_Authentication_and_Authoriation>_ signature of the request. Returns the did-key if the signature is valid.

Parameters:

Name Type Description Default
request

The request to validate the signature for.

required
domain

The domain the request is made to.

required

Returns:

Type Description
Tuple[Optional[str], Optional[str]]

On success the did key and domain, on failure None, None When no domain is passed the did key and None is returned

Source code in bovine/bovine/crypto/__init__.py
async def validate_moo_auth_signature(
    request, domain
) -> Tuple[Optional[str], Optional[str]]:
    """Validates the `Moo-Auth-1 <https://blog.mymath.rocks/2023-03-15/BIN1_Moo_Authentication_and_Authoriation>`_ signature of the request.
    Returns the did-key if the signature is valid.

    :param request:
        The request to validate the signature for.
    :param domain:
        The domain the request is made to.

    :returns:
        On success the did key and domain, on failure None, None
        When no domain is passed the did key and None is returned
    """  # noqa: E501
    didkey = request.headers["authorization"][11:]
    signature = request.headers["x-moo-signature"]

    dt = bovine.utils.parse_gmt(request.headers["date"])
    if domain != request.headers["host"]:
        raise ValueError("Invalid host name")
    if not bovine.utils.check_max_offset_now(dt):
        raise ValueError("Invalid date offset")

    if request.method.lower() == "get":
        http_signature = (
            HttpSignature()
            .with_field("(request-target)", "get " + request.path)
            .with_field("host", request.headers["host"])
            .with_field("date", request.headers["date"])
        )
    else:
        raw_data = await request.get_data()
        digest = content_digest_sha256(raw_data)
        if digest != request.headers["digest"]:
            return None, None
        http_signature = (
            HttpSignature()
            .with_field("(request-target)", "post " + request.path)
            .with_field("host", request.headers["host"])
            .with_field("date", request.headers["date"])
            .with_field("digest", request.headers["digest"])
        )

    if http_signature.ed25519_verify(didkey, signature):
        return didkey, None
    return None, None

bovine.crypto.digest

This package contains helpers for dealing with digest headers in the various Fediverse settings.

See RFC 9530 Digest Fields for the current relevant RFC.

digest_multibase

digest_multibase(obj: bytes) -> str

Implements the multibase multihash digest, see here. This was proposed to use in the Fediverse in FEP-ef61: Portable Objects.

>>> digest_multibase(b"multihash")
'zQmYtUc4iTCbbfVSDNKvtQqrfyezPPnFvE33wFmutw9PBBk'
Source code in bovine/bovine/crypto/digest.py
def digest_multibase(obj: bytes) -> str:
    """
    Implements the multibase multihash digest, see [here](https://github.com/multiformats/multihash).
    This was proposed to use in the Fediverse in
    [FEP-ef61: Portable Objects](https://codeberg.org/fediverse/fep/src/branch/main/fep/ef61/fep-ef61.md).

    ```pycon
    >>> digest_multibase(b"multihash")
    'zQmYtUc4iTCbbfVSDNKvtQqrfyezPPnFvE33wFmutw9PBBk'

    ```
    """

    return multibase_58btc_encode(b"\x12\x20" + hashlib.sha256(obj).digest())

validate_digest

validate_digest(headers: dict, body: bytes) -> bool

Validates the digest. First checks the digest header then the content-digest header.

>>> validate_digest({"digest": "sha-256=Kch/yJ/aOjLud24QANj5EK/SfmpAubIsE9BbRcaT5D4="},
...     b'mooo')
True

>>> validate_digest({"digest": "SHA-256=Kch/yJ/aOjLud24QANj5EK/SfmpAubIsE9BbRcaT5D4="},
...     b'mooo')
True

>>> validate_digest({"content-digest": 'sha-256=:R9+ukoir89XSJSq/sL1qyWYmN9ZG5t+dXSdLwzbierw=:'},
...     b'moo')
True

Parameters:

Name Type Description Default
headers dict

The headers of the request

required
body bytes

The body of the request, currently a warning is raised if body is of type str

required

Returns:

Type Description
bool

True if digest is present and valid

Source code in bovine/bovine/crypto/digest.py
def validate_digest(headers: dict, body: bytes) -> bool:
    """Validates the digest. First checks the `digest` header
    then the `content-digest` header.

    ```pycon
    >>> validate_digest({"digest": "sha-256=Kch/yJ/aOjLud24QANj5EK/SfmpAubIsE9BbRcaT5D4="},
    ...     b'mooo')
    True

    >>> validate_digest({"digest": "SHA-256=Kch/yJ/aOjLud24QANj5EK/SfmpAubIsE9BbRcaT5D4="},
    ...     b'mooo')
    True

    >>> validate_digest({"content-digest": 'sha-256=:R9+ukoir89XSJSq/sL1qyWYmN9ZG5t+dXSdLwzbierw=:'},
    ...     b'moo')
    True

    ```

    :param headers: The headers of the request
    :param body: The body of the request, currently a warning is raised if body is of type str
    :return: True if digest is present and valid
    """

    if "digest" in headers:
        request_digest = headers["digest"]
        request_digest = request_digest[:4].lower() + request_digest[4:]
        digest = content_digest_sha256(body)
        if request_digest != digest:
            logger.warning("Different digest")
            return False

        return True

    if "content-digest" in headers:
        try:
            parsed = http_sf.parse(
                headers["content-digest"].encode("utf-8"), tltype="dict"
            )
        except Exception as e:
            logger.warning(
                "Failed to parse header %s with %s",
                headers["content-digest"],
                repr(e),
            )
            return False

        if len(set(parsed.keys()) - {"sha-256", "sha-512"}) > 0:
            logger.warning(
                "Got unsupported hash method in %s", headers["content-digest"]
            )

        valid = False

        if "sha-256" in parsed:
            if parsed["sha-256"][0] == hashlib.sha256(body).digest():
                valid = True
            else:
                return False

        if "sha-512" in parsed:
            if parsed["sha-512"][0] == hashlib.sha512(body).digest():
                valid = True
            else:
                return False

        return valid

    return False

bovine.crypto.multibase

MultiCodec

Bases: Enum

The used Multicodec prefixes

Source code in bovine/bovine/crypto/multibase.py
class MultiCodec(Enum):
    """The used Multicodec prefixes"""

    Ed25519Public = b"\xed\x01"
    EcP256Public = b"\x80$"
    EcP384Public = b"\x81\x24"
    RsaPublic = b"\x85$"

    Ed25519Private = b"\x80\x26"
    EcP256Private = b"\x86\x26"

multibase_58btc_encode

multibase_58btc_encode(data: bytes) -> str

Encodes data in base 58 using the bitcoin alphabet and adds the prefix z

Source code in bovine/bovine/crypto/multibase.py
def multibase_58btc_encode(data: bytes) -> str:
    """Encodes `data` in base 58 using the bitcoin alphabet
    and adds the prefix `z`"""
    return "z" + based58.b58encode(data).decode("utf-8")

multibase_decode

multibase_decode(data: str) -> bytes

Decodes the string data using the multibase algorithm

Parameters:

Name Type Description Default
data str

The string to decode

required

Returns:

Type Description
bytes

The bytes

Source code in bovine/bovine/crypto/multibase.py
def multibase_decode(data: str) -> bytes:
    """Decodes the string data using the multibase algorithm

    :param data: The string to decode
    :return: The bytes"""
    if data[0] == "z":
        return based58.b58decode(data[1:].encode("utf-8"))

    raise ValueError(f"{data} encoded in unknown format")