Source code for assistant_stream_ce.serialization.openai_stream
from assistant_stream_ce.assistant_stream_chunk import AssistantStreamChunk
import json
import time
import string
import random
from typing import AsyncGenerator
from assistant_stream_ce.serialization.assistant_stream_response import (
AssistantStreamResponse,
)
from assistant_stream_ce.serialization.stream_encoder import StreamEncoder
[docs]
def generate_openai_style_id():
prefix = "chatcmpl-"
characters = string.ascii_letters + string.digits
random_id = "".join(random.choices(characters, k=24))
return prefix + random_id
[docs]
class OpenAIStreamEncoder(StreamEncoder):
def __init__(self, model="assistant_stream", system_fingerprint="fp_0000000000"):
self.id = generate_openai_style_id()
self.model = model
self.system_fingerprint = system_fingerprint
pass
def _create_chunk(self, delta={}, finish_reason=None):
response = {
"id": self.id,
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": self.model,
"system_fingerprint": self.system_fingerprint,
"choices": [
{
"index": 0,
"delta": delta,
"logprobs": None,
"finish_reason": finish_reason,
}
],
}
return f"data: {json.dumps(response, ensure_ascii=False)}\n\n"
[docs]
def encode_chunk(self, chunk: AssistantStreamChunk) -> str:
"""
Encodes the chunk into OpenAI's SSE format.
"""
if chunk.type == "text-delta":
# Construct the delta for text content
return self._create_chunk({"content": chunk.text_delta})
else:
# Handle unknown chunk types gracefully
return ""
[docs]
async def encode_stream(
self, stream: AsyncGenerator[AssistantStreamChunk, None]
) -> AsyncGenerator[str, None]:
"""
Asynchronously encodes chunks into SSE-formatted strings.
"""
async for chunk in stream:
encoded_chunk = self.encode_chunk(chunk)
if encoded_chunk:
yield encoded_chunk
yield self._create_chunk(finish_reason="stop")
yield "data: [DONE]\n\n"
[docs]
class OpenAIStreamResponse(AssistantStreamResponse):
def __init__(
self,
stream: AsyncGenerator[AssistantStreamChunk, None],
):
"""
Initializes the response with the OpenAI SSE encoder.
"""
super().__init__(stream, OpenAIStreamEncoder())