Welcome to assistant-stream

assistant-stream is a small library for building typed assistant streaming responses in Python. It is designed to work well with:

At the core is assistant_stream_ce.create_run.create_run(), which turns an async callback into an async generator that yields structured stream chunks.

Quick example

A minimal FastAPI endpoint streaming assistant output:

from fastapi import FastAPI
from assistant_stream_ce import create_run
from assistant_stream_ce.serialization.data_stream import DataStreamResponse

app = FastAPI()

@app.get("/chat")
async def chat():
    async def run(controller):
        controller.append_text("Hello ")
        controller.append_text("world!")

    return DataStreamResponse(create_run(run))

Next steps: