Source code for assistant_stream_ce.assistant_stream_chunk

from dataclasses import dataclass
from typing import Any, Dict, List, Literal, Optional, TypedDict, Union


# Define the data classes for different chunk types
[docs] @dataclass class TextDeltaChunk: text_delta: str type: str = "text-delta" parent_id: Optional[str] = None
[docs] @dataclass class ReasoningDeltaChunk: reasoning_delta: str type: str = "reasoning-delta" parent_id: Optional[str] = None
[docs] @dataclass class ToolCallBeginChunk: tool_call_id: str tool_name: str type: str = "tool-call-begin" parent_id: Optional[str] = None
[docs] @dataclass class ToolCallDeltaChunk: tool_call_id: str args_text_delta: str type: str = "tool-call-delta"
[docs] @dataclass class ToolResultChunk: tool_call_id: str result: Any artifact: Any | None = None is_error: bool = False type: str = "tool-result"
[docs] @dataclass class DataChunk: data: Any type: str = "data"
[docs] @dataclass class ErrorChunk: error: str type: str = "error"
# Define ObjectStream operation types as TypedDict
[docs] class ObjectStreamSetOperation(TypedDict): path: List[str] value: Any type: Literal["set"]
[docs] class ObjectStreamAppendTextOperation(TypedDict): path: List[str] value: str type: Literal["append-text"]
ObjectStreamOperation = Union[ObjectStreamSetOperation, ObjectStreamAppendTextOperation]
[docs] @dataclass class UpdateStateChunk: operations: List[ObjectStreamOperation] type: str = "update-state"
[docs] @dataclass class SourceChunk: id: str url: str source_type: str = "url" title: Optional[str] = None type: str = "source" parent_id: Optional[str] = None
# Define the union type for AssistantStreamChunk AssistantStreamChunk = Union[ TextDeltaChunk, ReasoningDeltaChunk, ToolCallBeginChunk, ToolCallDeltaChunk, ToolResultChunk, DataChunk, ErrorChunk, UpdateStateChunk, SourceChunk, ]