serialization¶
- class assistant_stream_ce.serialization.assistant_stream_response.AssistantStreamResponse(stream, stream_encoder)[source]¶
Bases:
StreamingResponse- Parameters:
stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])
stream_encoder (StreamEncoder)
- class assistant_stream_ce.serialization.stream_encoder.StreamEncoder[source]¶
Bases:
ABCAbstract base class for stream encoders, requiring an implementation of encode_stream.
- abstractmethod async encode_stream(stream)[source]¶
Encode the stream of AssistantStreamChunk into a specific format. This method must be implemented by subclasses.
- Parameters:
stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])
- Return type:
AsyncGenerator[str, None]
- class assistant_stream_ce.serialization.data_stream.StateProxyJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]¶
Bases:
JSONEncoderCustom JSON encoder that can handle StateProxy objects.
- default(obj)[source]¶
Implement this method in a subclass such that it returns a serializable object for
o, or calls the base implementation (to raise aTypeError).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return super().default(o)
- class assistant_stream_ce.serialization.data_stream.DataStreamEncoder[source]¶
Bases:
StreamEncoder- encode_chunk(chunk)[source]¶
- Parameters:
chunk (TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk)
- Return type:
- async encode_stream(stream)[source]¶
Encode the stream of AssistantStreamChunk into a specific format. This method must be implemented by subclasses.
- Parameters:
stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])
- Return type:
AsyncGenerator[str, None]
- class assistant_stream_ce.serialization.data_stream.DataStreamResponse(stream)[source]¶
Bases:
AssistantStreamResponse- Parameters:
stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])
- class assistant_stream_ce.serialization.assistant_transport.StateProxyJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]¶
Bases:
JSONEncoderCustom JSON encoder that can handle StateProxy objects.
- default(obj)[source]¶
Implement this method in a subclass such that it returns a serializable object for
o, or calls the base implementation (to raise aTypeError).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return super().default(o)
- class assistant_stream_ce.serialization.assistant_transport.AssistantTransportEncoder[source]¶
Bases:
StreamEncoderAssistantTransportEncoder encodes AssistantStreamChunks into SSE format and emits [DONE] when the stream completes.
- async encode_stream(stream)[source]¶
Encode the stream of AssistantStreamChunk into a specific format. This method must be implemented by subclasses.
- Parameters:
stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])
- Return type:
AsyncGenerator[str, None]
- class assistant_stream_ce.serialization.assistant_transport.AssistantTransportResponse(stream)[source]¶
Bases:
AssistantStreamResponse- Parameters:
stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])
- class assistant_stream_ce.serialization.openai_stream.OpenAIStreamEncoder(model='assistant_stream', system_fingerprint='fp_0000000000')[source]¶
Bases:
StreamEncoder- encode_chunk(chunk)[source]¶
Encodes the chunk into OpenAI’s SSE format.
- Parameters:
chunk (TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk)
- Return type:
- async encode_stream(stream)[source]¶
Asynchronously encodes chunks into SSE-formatted strings.
- Parameters:
stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])
- Return type:
AsyncGenerator[str, None]
- class assistant_stream_ce.serialization.openai_stream.OpenAIStreamResponse(stream)[source]¶
Bases:
AssistantStreamResponse- Parameters:
stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])