assistant_stream_ce package

class assistant_stream_ce.AssistantStreamResponse(stream, stream_encoder)[source]

Bases: StreamingResponse

Parameters:
async assistant_stream_ce.create_run(callback, *, state=None)[source]
Parameters:
Return type:

AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None]

class assistant_stream_ce.RunController(queue, state_data, parent_id=None)[source]

Bases: object

Parameters:

parent_id (str | None)

with_parent_id(parent_id)[source]

Create a new RunController instance with the specified parent_id.

Parameters:

parent_id (str)

Return type:

RunController

append_text(text_delta)[source]

Append a text delta to the stream.

Parameters:

text_delta (str)

Return type:

None

append_reasoning(reasoning_delta)[source]

Append a reasoning delta to the stream.

Parameters:

reasoning_delta (str)

Return type:

None

async add_tool_call(tool_name, tool_call_id=None)[source]

Add a tool call to the stream.

Parameters:
  • tool_name (str)

  • tool_call_id (str)

Return type:

ToolCallController

add_tool_result(tool_call_id, result)[source]

Add a tool result to the stream.

Parameters:
  • tool_call_id (str)

  • result (Any)

Return type:

None

add_stream(stream)[source]

Append a substream to the main stream.

Parameters:

stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])

Return type:

None

add_data(data)[source]

Emit an event to the main stream.

Parameters:

data (Any)

Return type:

None

add_error(error)[source]

Emit an error to the main stream.

Parameters:

error (str)

Return type:

None

add_source(id, url, title=None)[source]

Add a source to the stream.

Parameters:
Return type:

None

property state

Access the state proxy object for making state updates.

This property provides a proxy object that allows navigating to any path in the state, reading values, and setting values, which will trigger the appropriate state update operation.

If the state is None, this property returns None directly. You can set the root state directly by assigning to this property.

Example

controller.state = {“user”: {“name”: “John”},”messages”: “Hello”} # Sets the entire state controller.state[“user”][“name”] = “Bob” # Sets the value at path [“user”, “name”] name = controller.state[“user”][“name”] # Gets the value at path [“user”, “name”] controller.state[“messages”] += “ world” # Appends text at path [“messages”]

assistant_stream_ce.append_langgraph_event(state, _namespace, type, payload)[source]

Append a LangGraph event to the state object.

Parameters:
  • state (Dict[str, Any]) – The state dictionary to update

  • _namespace (str) – Event namespace (currently unused)

  • type (str) – Event type (‘messages’ or ‘updates’)

  • payload (Any) – Event payload containing the data to append

Return type:

None

assistant_stream_ce.get_tool_call_subgraph_state(controller, namespace, subgraph_node, default_state, *, artifact_field_name=None, tool_name=None)[source]

Get the state for a tool call subgraph by traversing the namespace and checking for subgraph nodes. Ensures there’s a ToolMessage as the last message and returns its artifact field value.

Parameters:
  • controller (RunController) – The run controller managing the state

  • subgraph_node (str | List[str] | Callable[[List[str]], bool]) – Node name(s) to check against, or a function that checks node names

  • namespace (Tuple[str, ...]) – Tuple of strings in format ‘node_name:task_id’

  • artifact_field_name (str | None) – Optional field name to extract from artifact

  • default_state (Dict[str, Any]) – Default state to use if artifact field is None

  • tool_name (str | List[str] | None)

Returns:

The artifact field value from the ToolMessage. If the last message is already a ToolMessage, returns its artifact field. If it’s an AI message with tool calls, creates a ToolMessage and returns the appropriate artifact field value.

Return type:

Dict[str, Any]

class assistant_stream_ce.create_run.RunController(queue, state_data, parent_id=None)[source]

Bases: object

Parameters:

parent_id (str | None)

with_parent_id(parent_id)[source]

Create a new RunController instance with the specified parent_id.

Parameters:

parent_id (str)

Return type:

RunController

append_text(text_delta)[source]

Append a text delta to the stream.

Parameters:

text_delta (str)

Return type:

None

append_reasoning(reasoning_delta)[source]

Append a reasoning delta to the stream.

Parameters:

reasoning_delta (str)

Return type:

None

async add_tool_call(tool_name, tool_call_id=None)[source]

Add a tool call to the stream.

Parameters:
  • tool_name (str)

  • tool_call_id (str)

Return type:

ToolCallController

add_tool_result(tool_call_id, result)[source]

Add a tool result to the stream.

Parameters:
  • tool_call_id (str)

  • result (Any)

Return type:

None

add_stream(stream)[source]

Append a substream to the main stream.

Parameters:

stream (AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None])

Return type:

None

add_data(data)[source]

Emit an event to the main stream.

Parameters:

data (Any)

Return type:

None

add_error(error)[source]

Emit an error to the main stream.

Parameters:

error (str)

Return type:

None

add_source(id, url, title=None)[source]

Add a source to the stream.

Parameters:
Return type:

None

property state

Access the state proxy object for making state updates.

This property provides a proxy object that allows navigating to any path in the state, reading values, and setting values, which will trigger the appropriate state update operation.

If the state is None, this property returns None directly. You can set the root state directly by assigning to this property.

Example

controller.state = {“user”: {“name”: “John”},”messages”: “Hello”} # Sets the entire state controller.state[“user”][“name”] = “Bob” # Sets the value at path [“user”, “name”] name = controller.state[“user”][“name”] # Gets the value at path [“user”, “name”] controller.state[“messages”] += “ world” # Appends text at path [“messages”]

async assistant_stream_ce.create_run.create_run(callback, *, state=None)[source]
Parameters:
Return type:

AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None]

class assistant_stream_ce.assistant_stream_chunk.TextDeltaChunk(text_delta: str, type: str = 'text-delta', parent_id: str | None = None)[source]

Bases: object

Parameters:
  • text_delta (str)

  • type (str)

  • parent_id (str | None)

text_delta: str
type: str = 'text-delta'
parent_id: str | None = None
class assistant_stream_ce.assistant_stream_chunk.ReasoningDeltaChunk(reasoning_delta: str, type: str = 'reasoning-delta', parent_id: str | None = None)[source]

Bases: object

Parameters:
  • reasoning_delta (str)

  • type (str)

  • parent_id (str | None)

reasoning_delta: str
type: str = 'reasoning-delta'
parent_id: str | None = None
class assistant_stream_ce.assistant_stream_chunk.ToolCallBeginChunk(tool_call_id: str, tool_name: str, type: str = 'tool-call-begin', parent_id: str | None = None)[source]

Bases: object

Parameters:
  • tool_call_id (str)

  • tool_name (str)

  • type (str)

  • parent_id (str | None)

tool_call_id: str
tool_name: str
type: str = 'tool-call-begin'
parent_id: str | None = None
class assistant_stream_ce.assistant_stream_chunk.ToolCallDeltaChunk(tool_call_id: str, args_text_delta: str, type: str = 'tool-call-delta')[source]

Bases: object

Parameters:
  • tool_call_id (str)

  • args_text_delta (str)

  • type (str)

tool_call_id: str
args_text_delta: str
type: str = 'tool-call-delta'
class assistant_stream_ce.assistant_stream_chunk.ToolResultChunk(tool_call_id: str, result: Any, artifact: Any | None = None, is_error: bool = False, type: str = 'tool-result')[source]

Bases: object

Parameters:
  • tool_call_id (str)

  • result (Any)

  • artifact (Any | None)

  • is_error (bool)

  • type (str)

tool_call_id: str
result: Any
artifact: Any | None = None
is_error: bool = False
type: str = 'tool-result'
class assistant_stream_ce.assistant_stream_chunk.DataChunk(data: Any, type: str = 'data')[source]

Bases: object

Parameters:
data: Any
type: str = 'data'
class assistant_stream_ce.assistant_stream_chunk.ErrorChunk(error: str, type: str = 'error')[source]

Bases: object

Parameters:
error: str
type: str = 'error'
class assistant_stream_ce.assistant_stream_chunk.ObjectStreamSetOperation[source]

Bases: TypedDict

path: List[str]
value: Any
type: Literal['set']
class assistant_stream_ce.assistant_stream_chunk.ObjectStreamAppendTextOperation[source]

Bases: TypedDict

path: List[str]
value: str
type: Literal['append-text']
class assistant_stream_ce.assistant_stream_chunk.UpdateStateChunk(operations: List[assistant_stream_ce.assistant_stream_chunk.ObjectStreamSetOperation | assistant_stream_ce.assistant_stream_chunk.ObjectStreamAppendTextOperation], type: str = 'update-state')[source]

Bases: object

Parameters:
operations: List[ObjectStreamSetOperation | ObjectStreamAppendTextOperation]
type: str = 'update-state'
class assistant_stream_ce.assistant_stream_chunk.SourceChunk(id: str, url: str, source_type: str = 'url', title: str | None = None, type: str = 'source', parent_id: str | None = None)[source]

Bases: object

Parameters:
  • id (str)

  • url (str)

  • source_type (str)

  • title (str | None)

  • type (str)

  • parent_id (str | None)

id: str
url: str
source_type: str = 'url'
title: str | None = None
type: str = 'source'
parent_id: str | None = None
class assistant_stream_ce.state_manager.StateManager(put_chunk_callback, state_data=None)[source]

Bases: object

Manages state operations with efficient batching and local updates.

Parameters:
property state: Any

Access the state proxy object for making state updates.

If state is None, returns None directly instead of a proxy. Otherwise returns a proxy object for the state.

property state_data: Dict[str, Any]

Current state data.

add_operations(operations)[source]

Add operations to pending batch and apply locally.

Parameters:

operations (List[ObjectStreamSetOperation | ObjectStreamAppendTextOperation])

Return type:

None

flush()[source]

Explicitly flush any pending operations.

This should be called before the run completes to ensure all state updates are sent.

Return type:

None

get_value_at_path(path)[source]

Get value at path, raising KeyError for invalid paths.

Parameters:

path (List[str])

Return type:

Any

class assistant_stream_ce.state_proxy.StateProxy(state_manager, path=None)[source]

Bases: object

Proxy object for state access and updates using dictionary-style access.

Example

state_proxy[“user”][“name”] = “John” name = state_proxy[“user”][“name”] state_proxy[“messages”] += “Hello” state_proxy[“items”].append(“item”)

Parameters:
append(item)[source]

Append an item to a list.

Parameters:

item (Any)

Return type:

None

extend(iterable)[source]

Extend a list with items from an iterable.

Parameters:

iterable (Any)

Return type:

None

clear()[source]

Clear a list or dictionary.

Return type:

None

get(key, default=None)[source]

Get dictionary value with default.

Parameters:
Return type:

Any

keys()[source]

Dictionary keys view.

values()[source]

Dictionary values view.

items()[source]

Dictionary items view.

setdefault(key, default=None)[source]

Set default value if key doesn’t exist.

insert(index, item)[source]

Not supported - would require sending entire list.

Parameters:
Return type:

None

pop(*args)[source]

Not supported - would require sending entire collection.

remove(item)[source]

Not supported - would require sending entire list.

Parameters:

item (Any)

Return type:

None

update(*args, **kwargs)[source]

Not supported - would require sending entire dictionary.

popitem()[source]

Not supported - would require sending entire dictionary.