Tutorial: Using BovineClient

BovineClient wraps the functionality necessary to use ActivityPub Client To Server as a Client. As with BovineActor, one needs to pass a configuration object to it, when initializing it. There are two variants for Moo-Auth-1, it has the form

config = {
    "host": host_of_your_activitypub_server,
    "private_key": multicodec_ed25519_key,

where host is the hostname of the ActivityPub Server and private_key a mutlicodec encoded Ed25519 key, whose corresponding did-key has been added to the Actor. The second variant is to use HTTP Signatures, where

config = {
    "account_url": actor_id,
    "public_key_url": url_of_your_public_key
    "private_key": pem_encoded_rsa_private_key,

here account_url is the URL of your actor on the ActivityPub Server, public_key_url is the location of the public key as used in keyId of the HTTP Signature, and private_key is the PEM encoded RSA private key corresponding to the linked public_key_url.

Then one can create a BovineClient with one of the following 4 versions

from bovine import BovineClient

async with BovineClient(config) as client:
    await do_something(client)

async with BovineClient.from_file("config.toml") as client:
    await do_something(client)

client = BovineClient(config)
await client.init()

client = BovineClient.from_file("config.toml")
await client.init(session=session)

For from_file the config file is parsed and used as a the config object. The session parameter is optional and can be used to pass a aiohttp.ClientSession to the BovineClient.

Making a post

BovineActor contains two factories to create ActivityStreams Objects and ActivityStreams Activities. One can obtain them by running

activity_factory, object_factory = client.factories

The simplest usage example is a create wrapping a note, that looks like:

note = object_factory.note(content="Hello").as_public().build()
create = activity_factory.create(note).build()

The result should be the something equivalent to the json

    "@context": "https://www.w3.org/ns/activitystreams",
    "type": "Create",
    "actor": "https://domain/actor",
    "object": {
        "attributedTo": "https://domain/actor",
        "type": "Note",
        "content": "Hello",
        "to": "as:Public",
        "cc": "https://domain/followers_collection"
    "to": "as:Public",
    "cc": "https://domain/followers_collection"

We can now send this activity to our outbox using

response = await client.send_to_outbox(create)
activity_location = response.headers['location']

The note should appear in the public feed of your Actor. The new id of the activity is stored in the returned location header. To retrieve the id of the corresponding object, you will need to proxy the activity via

activity = await client.proxy_element(activity_location)

The id can then be extracted from the object.

The inbox and outbox

By running

inbox = await client.inbox()
outbox = await client.outbox()

one can obtain CollectionHelper objects. These are meant to make it easier to interact with collection objects. In the simplest use case, one can use

await inbox.next_item()

to get the items from the inbox one after the other. It is also possible to print a summary of all elements that have been fetched from the inbox using await inbox.summary(). Finally, it is possible to iterate over the inbox via

async for item in inbox.iterate(max_number=3):

Proxying elements

await actor.proxy_element(object_id)

actor.proxy_element sends a request to the actor’s server for the object. This request is then either answered from the server’s object store or by the server fetching the object. The cache behavior is up to the server. Depending of the evolution of proxyUrl of an Actor, more options might be added here.

Event Source

The event source is demonstrated in examples/sse.py. First, the event source will be specified in a FEP to come. It provides a way to receive updates from the server, whenever a new element is added to the inbox or outbox. The basic usage is

event_source = await actor.event_source()
async for event in event_source:
    if event and event.data:
        data = json.loads(event.data)

If you plan on writing long running applications, the event source does not automatically reconnect, so you will need to implement this. mechanical_bull uses the event source in this way.