serialization

class assistant_stream_ce.serialization.assistant_stream_response.AssistantStreamResponse(stream, stream_encoder)[source]

Bases: StreamingResponse

Parameters:
class assistant_stream_ce.serialization.stream_encoder.StreamEncoder[source]

Bases: ABC

Abstract base class for stream encoders, requiring an implementation of encode_stream.

abstractmethod get_media_type()[source]

Returns the MIME type of the stream.

Return type:

str

abstractmethod async encode_stream(stream)[source]

Encode the stream of AssistantStreamChunk into a specific format. This method must be implemented by subclasses.

Parameters:

stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])

Return type:

AsyncGenerator[str, None]

class assistant_stream_ce.serialization.data_stream.StateProxyJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: JSONEncoder

Custom JSON encoder that can handle StateProxy objects.

default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return super().default(o)
Parameters:

obj (Any)

Return type:

Any

class assistant_stream_ce.serialization.data_stream.DataStreamEncoder[source]

Bases: StreamEncoder

encode_chunk(chunk)[source]
Parameters:

chunk (TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk)

Return type:

str

get_media_type()[source]

Returns the MIME type of the stream.

Return type:

str

async encode_stream(stream)[source]

Encode the stream of AssistantStreamChunk into a specific format. This method must be implemented by subclasses.

Parameters:

stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])

Return type:

AsyncGenerator[str, None]

class assistant_stream_ce.serialization.data_stream.DataStreamResponse(stream)[source]

Bases: AssistantStreamResponse

Parameters:

stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])

class assistant_stream_ce.serialization.assistant_transport.StateProxyJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: JSONEncoder

Custom JSON encoder that can handle StateProxy objects.

default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return super().default(o)
Parameters:

obj (Any)

Return type:

Any

class assistant_stream_ce.serialization.assistant_transport.AssistantTransportEncoder[source]

Bases: StreamEncoder

AssistantTransportEncoder encodes AssistantStreamChunks into SSE format and emits [DONE] when the stream completes.

get_media_type()[source]

Returns the MIME type of the stream.

Return type:

str

async encode_stream(stream)[source]

Encode the stream of AssistantStreamChunk into a specific format. This method must be implemented by subclasses.

Parameters:

stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])

Return type:

AsyncGenerator[str, None]

class assistant_stream_ce.serialization.assistant_transport.AssistantTransportResponse(stream)[source]

Bases: AssistantStreamResponse

Parameters:

stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])

assistant_stream_ce.serialization.openai_stream.generate_openai_style_id()[source]
class assistant_stream_ce.serialization.openai_stream.OpenAIStreamEncoder(model='assistant_stream', system_fingerprint='fp_0000000000')[source]

Bases: StreamEncoder

get_media_type()[source]

Returns the MIME type of the stream.

Return type:

str

encode_chunk(chunk)[source]

Encodes the chunk into OpenAI’s SSE format.

Parameters:

chunk (TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk)

Return type:

str

async encode_stream(stream)[source]

Asynchronously encodes chunks into SSE-formatted strings.

Parameters:

stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])

Return type:

AsyncGenerator[str, None]

class assistant_stream_ce.serialization.openai_stream.OpenAIStreamResponse(stream)[source]

Bases: AssistantStreamResponse

Parameters:

stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])