Source code for assistant_stream_ce.serialization.stream_encoder
from abc import ABC, abstractmethod
from typing import AsyncGenerator
from assistant_stream_ce.assistant_stream_chunk import AssistantStreamChunk
[docs]
class StreamEncoder(ABC):
"""
Abstract base class for stream encoders, requiring an implementation of `encode_stream`.
"""
[docs]
@abstractmethod
async def encode_stream(
self, stream: AsyncGenerator[AssistantStreamChunk, None]
) -> AsyncGenerator[str, None]:
"""
Encode the stream of AssistantStreamChunk into a specific format.
This method must be implemented by subclasses.
"""
pass