Source code for assistant_stream_ce.serialization.stream_encoder

from abc import ABC, abstractmethod
from typing import AsyncGenerator
from assistant_stream_ce.assistant_stream_chunk import AssistantStreamChunk


[docs] class StreamEncoder(ABC): """ Abstract base class for stream encoders, requiring an implementation of `encode_stream`. """
[docs] @abstractmethod def get_media_type(self) -> str: """ Returns the MIME type of the stream. """ pass
[docs] @abstractmethod async def encode_stream( self, stream: AsyncGenerator[AssistantStreamChunk, None] ) -> AsyncGenerator[str, None]: """ Encode the stream of AssistantStreamChunk into a specific format. This method must be implemented by subclasses. """ pass