Source code for assistant_stream_ce.serialization.assistant_transport
from assistant_stream_ce.assistant_stream_chunk import AssistantStreamChunk
from assistant_stream_ce.serialization.assistant_stream_response import (
AssistantStreamResponse,
)
from assistant_stream_ce.serialization.stream_encoder import StreamEncoder
from assistant_stream_ce.state_proxy import StateProxy
from typing import AsyncGenerator, Any
import json
[docs]
class StateProxyJSONEncoder(json.JSONEncoder):
"""Custom JSON encoder that can handle StateProxy objects."""
[docs]
def default(self, obj: Any) -> Any:
if isinstance(obj, StateProxy):
return obj._get_value()
return super().default(obj)
[docs]
class AssistantTransportEncoder(StreamEncoder):
"""
AssistantTransportEncoder encodes AssistantStreamChunks into SSE format
and emits [DONE] when the stream completes.
"""
def _chunk_to_dict(self, chunk: AssistantStreamChunk) -> dict[str, Any]:
"""Convert a chunk to a JSON-serializable dictionary."""
chunk_dict = {"type": chunk.type}
# Add all attributes from the chunk
for key, value in vars(chunk).items():
if key != "type": # Already added
chunk_dict[self._snake_to_camel(key)] = value
return chunk_dict
def _snake_to_camel(self, snake_str: str) -> str:
"""Convert snake_case to camelCase."""
components = snake_str.split("_")
return components[0] + "".join(x.title() for x in components[1:])
[docs]
async def encode_stream(
self, stream: AsyncGenerator[AssistantStreamChunk, None]
) -> AsyncGenerator[str, None]:
async for chunk in stream:
chunk_dict = self._chunk_to_dict(chunk)
chunk_json = json.dumps(chunk_dict, cls=StateProxyJSONEncoder)
yield f"data: {chunk_json}\n\n"
# Emit [DONE] marker when stream completes
yield "data: [DONE]\n\n"
[docs]
class AssistantTransportResponse(AssistantStreamResponse):
def __init__(
self,
stream: AsyncGenerator[AssistantStreamChunk, None],
):
super().__init__(stream, AssistantTransportEncoder())