modules¶
- class assistant_stream_ce.modules.tool_call.ToolCallController(queue, tool_name, tool_call_id, parent_id=None)[source]¶
Bases:
object- append_args_text(args_text_delta)[source]¶
Append an args text delta to the stream.
- Parameters:
args_text_delta (str)
- Return type:
None
- set_result(result)[source]¶
Set the result of the tool call.
Deprecated: Use set_response() instead.
- Parameters:
result (Any)
- Return type:
None
- async assistant_stream_ce.modules.tool_call.create_tool_call(tool_name, tool_call_id, parent_id=None)[source]¶
- Parameters:
- Return type:
tuple[AsyncGenerator[TextDeltaChunk | ReasoningDeltaChunk | ToolCallBeginChunk | ToolCallDeltaChunk | ToolResultChunk | DataChunk | ErrorChunk | UpdateStateChunk | SourceChunk, None], ToolCallController]
- assistant_stream_ce.modules.langgraph.append_langgraph_event(state, _namespace, type, payload)[source]¶
Append a LangGraph event to the state object.
- assistant_stream_ce.modules.langgraph.get_tool_call_subgraph_state(controller, namespace, subgraph_node, default_state, *, artifact_field_name=None, tool_name=None)[source]¶
Get the state for a tool call subgraph by traversing the namespace and checking for subgraph nodes. Ensures there’s a ToolMessage as the last message and returns its artifact field value.
- Parameters:
controller (RunController) – The run controller managing the state
subgraph_node (str | List[str] | Callable[[List[str]], bool]) – Node name(s) to check against, or a function that checks node names
namespace (Tuple[str, ...]) – Tuple of strings in format ‘node_name:task_id’
artifact_field_name (str | None) – Optional field name to extract from artifact
default_state (Dict[str, Any]) – Default state to use if artifact field is None
- Returns:
The artifact field value from the ToolMessage. If the last message is already a ToolMessage, returns its artifact field. If it’s an AI message with tool calls, creates a ToolMessage and returns the appropriate artifact field value.
- Return type: