assistant_stream_ce package¶
- class assistant_stream_ce.AssistantStreamResponse(stream, stream_encoder)[source]¶
Bases:
StreamingResponse- Parameters:
stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])
stream_encoder (StreamEncoder)
- async assistant_stream_ce.create_run(callback, *, state=None)[source]¶
- Parameters:
- Return type:
AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None]
- class assistant_stream_ce.RunController(queue, state_data, parent_id=None)[source]¶
Bases:
object- Parameters:
parent_id (str | None)
- with_parent_id(parent_id)[source]¶
Create a new RunController instance with the specified parent_id.
- Parameters:
parent_id (str)
- Return type:
- append_text(text_delta)[source]¶
Append a text delta to the stream.
- Parameters:
text_delta (str)
- Return type:
None
- append_reasoning(reasoning_delta)[source]¶
Append a reasoning delta to the stream.
- Parameters:
reasoning_delta (str)
- Return type:
None
- async add_tool_call(tool_name, tool_call_id=None)[source]¶
Add a tool call to the stream.
- Parameters:
- Return type:
- add_stream(stream)[source]¶
Append a substream to the main stream.
- Parameters:
stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])
- Return type:
None
- add_error(error)[source]¶
Emit an error to the main stream.
- Parameters:
error (str)
- Return type:
None
- property state¶
Access the state proxy object for making state updates.
This property provides a proxy object that allows navigating to any path in the state, reading values, and setting values, which will trigger the appropriate state update operation.
If the state is None, this property returns None directly. You can set the root state directly by assigning to this property.
Example
controller.state = {“user”: {“name”: “John”},”messages”: “Hello”} # Sets the entire state controller.state[“user”][“name”] = “Bob” # Sets the value at path [“user”, “name”] name = controller.state[“user”][“name”] # Gets the value at path [“user”, “name”] controller.state[“messages”] += “ world” # Appends text at path [“messages”]
- assistant_stream_ce.append_langgraph_event(state, _namespace, type, payload)[source]¶
Append a LangGraph event to the state object.
- assistant_stream_ce.get_tool_call_subgraph_state(controller, namespace, subgraph_node, default_state, *, artifact_field_name=None, tool_name=None)[source]¶
Get the state for a tool call subgraph by traversing the namespace and checking for subgraph nodes. Ensures there’s a ToolMessage as the last message and returns its artifact field value.
- Parameters:
controller (RunController) – The run controller managing the state
subgraph_node (str | List[str] | Callable[[List[str]], bool]) – Node name(s) to check against, or a function that checks node names
namespace (Tuple[str, ...]) – Tuple of strings in format ‘node_name:task_id’
artifact_field_name (str | None) – Optional field name to extract from artifact
default_state (Dict[str, Any]) – Default state to use if artifact field is None
- Returns:
The artifact field value from the ToolMessage. If the last message is already a ToolMessage, returns its artifact field. If it’s an AI message with tool calls, creates a ToolMessage and returns the appropriate artifact field value.
- Return type:
- class assistant_stream_ce.create_run.RunController(queue, state_data, parent_id=None)[source]¶
Bases:
object- Parameters:
parent_id (str | None)
- with_parent_id(parent_id)[source]¶
Create a new RunController instance with the specified parent_id.
- Parameters:
parent_id (str)
- Return type:
- append_text(text_delta)[source]¶
Append a text delta to the stream.
- Parameters:
text_delta (str)
- Return type:
None
- append_reasoning(reasoning_delta)[source]¶
Append a reasoning delta to the stream.
- Parameters:
reasoning_delta (str)
- Return type:
None
- async add_tool_call(tool_name, tool_call_id=None)[source]¶
Add a tool call to the stream.
- Parameters:
- Return type:
- add_stream(stream)[source]¶
Append a substream to the main stream.
- Parameters:
stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])
- Return type:
None
- add_error(error)[source]¶
Emit an error to the main stream.
- Parameters:
error (str)
- Return type:
None
- property state¶
Access the state proxy object for making state updates.
This property provides a proxy object that allows navigating to any path in the state, reading values, and setting values, which will trigger the appropriate state update operation.
If the state is None, this property returns None directly. You can set the root state directly by assigning to this property.
Example
controller.state = {“user”: {“name”: “John”},”messages”: “Hello”} # Sets the entire state controller.state[“user”][“name”] = “Bob” # Sets the value at path [“user”, “name”] name = controller.state[“user”][“name”] # Gets the value at path [“user”, “name”] controller.state[“messages”] += “ world” # Appends text at path [“messages”]
- async assistant_stream_ce.create_run.create_run(callback, *, state=None)[source]¶
- Parameters:
- Return type:
AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None]
- class assistant_stream_ce.assistant_stream_chunk.TextDeltaChunk(text_delta: str, type: str = 'text-delta', parent_id: str | None = None)[source]¶
Bases:
object
- class assistant_stream_ce.assistant_stream_chunk.ReasoningDeltaChunk(reasoning_delta: str, type: str = 'reasoning-delta', parent_id: str | None = None)[source]¶
Bases:
object
- class assistant_stream_ce.assistant_stream_chunk.ToolCallBeginChunk(tool_call_id: str, tool_name: str, type: str = 'tool-call-begin', parent_id: str | None = None)[source]¶
Bases:
object
- class assistant_stream_ce.assistant_stream_chunk.ToolCallDeltaChunk(tool_call_id: str, args_text_delta: str, type: str = 'tool-call-delta')[source]¶
Bases:
object
- class assistant_stream_ce.assistant_stream_chunk.ToolResultChunk(tool_call_id: str, result: Any, artifact: Any | None = None, is_error: bool = False, type: str = 'tool-result')[source]¶
Bases:
object
- class assistant_stream_ce.assistant_stream_chunk.DataChunk(data: Any, type: str = 'data')[source]¶
Bases:
object
- class assistant_stream_ce.assistant_stream_chunk.ErrorChunk(error: str, type: str = 'error')[source]¶
Bases:
object
- class assistant_stream_ce.assistant_stream_chunk.ObjectStreamAppendTextOperation[source]¶
Bases:
TypedDict
- class assistant_stream_ce.assistant_stream_chunk.UpdateStateChunk(operations: List[assistant_stream_ce.assistant_stream_chunk.ObjectStreamSetOperation | assistant_stream_ce.assistant_stream_chunk.ObjectStreamAppendTextOperation], type: str = 'update-state')[source]¶
Bases:
object- Parameters:
operations (List[ObjectStreamSetOperation | ObjectStreamAppendTextOperation])
type (str)
- operations: List[ObjectStreamSetOperation | ObjectStreamAppendTextOperation]¶
- class assistant_stream_ce.assistant_stream_chunk.SourceChunk(id: str, url: str, source_type: str = 'url', title: str | None = None, type: str = 'source', parent_id: str | None = None)[source]¶
Bases:
object- Parameters:
- class assistant_stream_ce.state_manager.StateManager(put_chunk_callback, state_data=None)[source]¶
Bases:
objectManages state operations with efficient batching and local updates.
- Parameters:
put_chunk_callback (Callable[[UpdateStateChunk], None])
state_data (Any | None)
- property state: Any¶
Access the state proxy object for making state updates.
If state is None, returns None directly instead of a proxy. Otherwise returns a proxy object for the state.
- add_operations(operations)[source]¶
Add operations to pending batch and apply locally.
- Parameters:
operations (List[ObjectStreamSetOperation | ObjectStreamAppendTextOperation])
- Return type:
None
- class assistant_stream_ce.state_proxy.StateProxy(state_manager, path=None)[source]¶
Bases:
objectProxy object for state access and updates using dictionary-style access.
Example
state_proxy[“user”][“name”] = “John” name = state_proxy[“user”][“name”] state_proxy[“messages”] += “Hello” state_proxy[“items”].append(“item”)
- Parameters:
state_manager (StateManager)
- extend(iterable)[source]¶
Extend a list with items from an iterable.
- Parameters:
iterable (Any)
- Return type:
None