Source code for assistant_stream_ce.assistant_stream_chunk
from dataclasses import dataclass
from typing import Any, Dict, List, Literal, Optional, TypedDict, Union
# Define the data classes for different chunk types
[docs]
@dataclass
class TextDeltaChunk:
text_delta: str
type: str = "text-delta"
parent_id: Optional[str] = None
[docs]
@dataclass
class ReasoningDeltaChunk:
reasoning_delta: str
type: str = "reasoning-delta"
parent_id: Optional[str] = None
[docs]
@dataclass
class DataChunk:
data: Any
type: str = "data"
[docs]
@dataclass
class ErrorChunk:
error: str
type: str = "error"
# Define ObjectStream operation types as TypedDict
[docs]
class ObjectStreamSetOperation(TypedDict):
path: List[str]
value: Any
type: Literal["set"]
[docs]
class ObjectStreamAppendTextOperation(TypedDict):
path: List[str]
value: str
type: Literal["append-text"]
ObjectStreamOperation = Union[ObjectStreamSetOperation, ObjectStreamAppendTextOperation]
[docs]
@dataclass
class UpdateStateChunk:
operations: List[ObjectStreamOperation]
type: str = "update-state"
[docs]
@dataclass
class SourceChunk:
id: str
url: str
source_type: str = "url"
title: Optional[str] = None
type: str = "source"
parent_id: Optional[str] = None
# Define the union type for AssistantStreamChunk
AssistantStreamChunk = Union[
TextDeltaChunk,
ReasoningDeltaChunk,
ToolCallBeginChunk,
ToolCallDeltaChunk,
ToolResultChunk,
DataChunk,
ErrorChunk,
UpdateStateChunk,
SourceChunk,
]