Skip to content

Core

llm_expose.core.content_parts

Utilities for OpenAI-compatible multimodal message content parts.

build_local_attachment_descriptor(path, *, kind, include_path, attachment_ref=None)

Build a normalized descriptor for a local attachment path.

Source code in llm_expose/core/content_parts.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def build_local_attachment_descriptor(
    path: str | Path,
    *,
    kind: str,
    include_path: bool,
    attachment_ref: str | None = None,
) -> dict[str, Any]:
    """Build a normalized descriptor for a local attachment path."""
    file_path = Path(path).expanduser().resolve()
    media_type, _ = mimetypes.guess_type(str(file_path))
    return {
        "kind": kind,
        "source_type": "local_path",
        "media_type": media_type,
        "filename": file_path.name,
        "size_bytes": file_path.stat().st_size if file_path.exists() else None,
        "path": str(file_path) if include_path else None,
        "attachment_ref": attachment_ref,
    }

build_user_content(text, *, image_urls=None, image_detail='auto')

Build user content string or multimodal content list.

Source code in llm_expose/core/content_parts.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def build_user_content(
    text: str | None,
    *,
    image_urls: list[str] | None = None,
    image_detail: str = "auto",
) -> str | list[dict[str, Any]]:
    """Build user content string or multimodal content list."""
    normalized_text = (text or "").strip()
    urls = [url for url in (image_urls or []) if url and url.strip()]

    if not urls:
        return normalized_text

    parts: list[dict[str, Any]] = []
    if normalized_text:
        parts.append({"type": "text", "text": normalized_text})

    for url in urls:
        parts.append(
            {
                "type": "image_url",
                "image_url": {
                    "url": url,
                    "detail": image_detail,
                },
            }
        )

    return parts

content_has_images(content)

Return True when content contains at least one image block.

Source code in llm_expose/core/content_parts.py
113
114
115
116
117
118
119
120
121
122
123
124
def content_has_images(content: Any) -> bool:
    """Return True when content contains at least one image block."""
    if not isinstance(content, list):
        return False
    for part in content:
        if not isinstance(part, dict):
            continue
        if part.get("type") == "image_url":
            image_url = part.get("image_url")
            if isinstance(image_url, dict) and isinstance(image_url.get("url"), str):
                return True
    return False

extract_image_urls(content)

Extract image URLs from OpenAI-style multimodal content parts.

Source code in llm_expose/core/content_parts.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def extract_image_urls(content: Any) -> list[str]:
    """Extract image URLs from OpenAI-style multimodal content parts."""
    if not isinstance(content, list):
        return []

    image_urls: list[str] = []
    for part in content:
        if not isinstance(part, dict) or part.get("type") != "image_url":
            continue
        image_url = part.get("image_url")
        if not isinstance(image_url, dict):
            continue
        url = image_url.get("url")
        if isinstance(url, str) and url:
            image_urls.append(url)
    return image_urls

extract_invocation_attachments(content)

Extract normalized attachment descriptors from user content blocks.

Source code in llm_expose/core/content_parts.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def extract_invocation_attachments(content: Any) -> list[dict[str, Any]]:
    """Extract normalized attachment descriptors from user content blocks."""
    if not isinstance(content, list):
        return []

    attachments: list[dict[str, Any]] = []
    for index, part in enumerate(content):
        if not isinstance(part, dict) or part.get("type") != "image_url":
            continue

        image_url = part.get("image_url")
        if not isinstance(image_url, dict):
            continue
        url = image_url.get("url")
        if not isinstance(url, str) or not url:
            continue

        media_type, size_bytes = _parse_data_url(url)
        descriptor: dict[str, Any] = {
            "kind": "image",
            "source_type": "data_url" if url.startswith("data:") else "url",
            "media_type": media_type,
            "filename": None,
            "size_bytes": size_bytes,
            "invocation_index": index,
        }
        if descriptor["source_type"] == "data_url":
            descriptor["data_url"] = url
        else:
            descriptor["url"] = url
        attachments.append(descriptor)

    return attachments

file_to_data_url(path)

Convert a local file into a data URL for image_url blocks.

Source code in llm_expose/core/content_parts.py
199
200
201
202
203
204
205
206
207
def file_to_data_url(path: str | Path) -> str:
    """Convert a local file into a data URL for image_url blocks."""
    file_path = Path(path)
    data = file_path.read_bytes()
    media_type, _ = mimetypes.guess_type(str(file_path))
    if not media_type:
        media_type = "image/jpeg"
    encoded = base64.b64encode(data).decode("ascii")
    return f"data:{media_type};base64,{encoded}"

messages_have_images(messages)

Return True when any message includes image blocks.

Source code in llm_expose/core/content_parts.py
127
128
129
130
131
132
def messages_have_images(messages: list[Message]) -> bool:
    """Return True when any message includes image blocks."""
    for message in messages:
        if content_has_images(message.get("content")):
            return True
    return False

normalize_mcp_content(content)

Normalize MCP content blocks into OpenAI-compatible text/image_url parts.

Source code in llm_expose/core/content_parts.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def normalize_mcp_content(content: Any) -> list[dict[str, Any]]:
    """Normalize MCP content blocks into OpenAI-compatible text/image_url parts."""
    if not isinstance(content, list):
        return []

    normalized: list[dict[str, Any]] = []

    for item in content:
        if not isinstance(item, dict):
            text_value = str(item)
            if text_value:
                normalized.append({"type": "text", "text": text_value})
            continue

        item_type = item.get("type")

        if item_type == "text":
            text_part = item.get("text")
            if isinstance(text_part, str) and text_part:
                normalized.append({"type": "text", "text": text_part})
            continue

        if item_type == "image_url":
            image_url = item.get("image_url")
            if isinstance(image_url, dict) and isinstance(image_url.get("url"), str):
                normalized.append(
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": image_url["url"],
                            "detail": image_url.get("detail", "auto"),
                        },
                    }
                )
            continue

        if item_type == "image":
            source = item.get("source")
            if not isinstance(source, dict):
                continue

            source_type = source.get("type")
            if source_type == "url":
                url = source.get("url")
                if isinstance(url, str) and url:
                    normalized.append(
                        {
                            "type": "image_url",
                            "image_url": {"url": url, "detail": "auto"},
                        }
                    )
                continue

            # MCP image source commonly uses inline base64 payloads.
            if source_type in {"base64", "data"}:
                data = source.get("data")
                if not isinstance(data, str) or not data:
                    continue
                media_type = (
                    source.get("media_type") or source.get("mime_type") or "image/png"
                )
                normalized.append(
                    {
                        "type": "image_url",
                        "image_url": {
                            "url": f"data:{media_type};base64,{data}",
                            "detail": "auto",
                        },
                    }
                )

    return normalized

strip_image_parts(messages)

Return a copy of messages with all image parts removed.

Returns:

Type Description
tuple[list[Message], int]

A tuple of (new_messages, stripped_count).

Source code in llm_expose/core/content_parts.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def strip_image_parts(messages: list[Message]) -> tuple[list[Message], int]:
    """Return a copy of messages with all image parts removed.

    Returns:
        A tuple of (new_messages, stripped_count).
    """
    stripped_count = 0
    new_messages: list[Message] = []

    for message in messages:
        copied = dict(message)
        content = copied.get("content")
        if isinstance(content, list):
            kept_parts: list[Any] = []
            for part in content:
                if isinstance(part, dict) and part.get("type") == "image_url":
                    image_url = part.get("image_url")
                    if isinstance(image_url, dict) and isinstance(
                        image_url.get("url"), str
                    ):
                        stripped_count += 1
                        continue
                kept_parts.append(part)

            if kept_parts:
                copied["content"] = kept_parts
            else:
                copied["content"] = ""
        new_messages.append(copied)

    return new_messages, stripped_count

llm_expose.core.builtin_mcp

Builtin in-process MCP tools for llm-expose.

BuiltinMCPClient

Minimal in-process MCP-like client for builtin llm-expose tools.

Source code in llm_expose/core/builtin_mcp.py
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
class BuiltinMCPClient:
    """Minimal in-process MCP-like client for builtin llm-expose tools."""

    def __init__(self, server_name: str) -> None:
        self._server_name = server_name
        self._tools: dict[str, _BuiltinTool] = {
            tool.name: tool
            for tool in [
                _GetInvocationContextTool(),
                _GetPairingIdsTool(),
                _GetInvocationAttachmentsTool(),
                _SendTextMessageTool(),
                _SendFileMessageTool(),
                _SendImageMessageTool(),
            ]
        }

    async def __aenter__(self) -> BuiltinMCPClient:
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        return None

    async def list_tools(self) -> list[dict[str, Any]]:
        return [
            {
                "name": tool.name,
                "description": tool.description,
                "inputSchema": tool.input_schema,
            }
            for tool in self._tools.values()
        ]

    async def list_prompts(self) -> list[dict[str, Any]]:
        return []

    async def get_prompt(self, prompt_name: str) -> dict[str, Any]:
        raise ValueError(
            f"Builtin MCP server '{self._server_name}' has no prompt named '{prompt_name}'."
        )

    async def call_tool(
        self, tool_name: str, arguments: dict[str, Any]
    ) -> dict[str, Any]:
        return await self.call_tool_with_context(
            tool_name,
            arguments,
            execution_context=None,
        )

    async def call_tool_with_context(
        self,
        tool_name: str,
        arguments: dict[str, Any],
        *,
        execution_context: ToolExecutionContext | None,
    ) -> dict[str, Any]:
        tool = self._tools.get(tool_name)
        if tool is None:
            raise ValueError(f"Unknown builtin MCP tool '{tool_name}'.")
        return await tool.execute(arguments, execution_context=execution_context)

ToolExecutionContext dataclass

Execution metadata passed to builtin MCP tools.

Source code in llm_expose/core/builtin_mcp.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@dataclass(slots=True)
class ToolExecutionContext:
    """Execution metadata passed to builtin MCP tools."""

    execution_mode: Literal["chat", "one-shot"]
    channel_id: str
    channel_name: str | None = None
    subject_id: str | None = None
    subject_kind: Literal["user", "group", "chat", "unknown"] = "unknown"
    initiator_user_id: str | None = None
    platform: str | None = None
    chat_type: str | None = None
    attachments: list[dict[str, Any]] = field(default_factory=list)
    attachment_paths_by_ref: dict[str, str] = field(default_factory=dict)
    sender: _MessageSenderProtocol | None = None
    invoked_at: str = field(default_factory=lambda: datetime.now(UTC).isoformat())

    def to_public_dict(self) -> dict[str, Any]:
        """Return a JSON-serializable public view of the execution context."""
        user_id = self.subject_id if self.subject_kind == "user" else None
        group_id = self.subject_id if self.subject_kind == "group" else None
        return {
            "execution_mode": self.execution_mode,
            "channel_id": self.channel_id,
            "channel_name": self.channel_name,
            "subject_id": self.subject_id,
            "subject_kind": self.subject_kind,
            "user_id": user_id,
            "group_id": group_id,
            "initiator_user_id": self.initiator_user_id,
            "platform": self.platform,
            "chat_type": self.chat_type,
            "attachments": list(self.attachments),
            "invoked_at": self.invoked_at,
        }

to_public_dict()

Return a JSON-serializable public view of the execution context.

Source code in llm_expose/core/builtin_mcp.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def to_public_dict(self) -> dict[str, Any]:
    """Return a JSON-serializable public view of the execution context."""
    user_id = self.subject_id if self.subject_kind == "user" else None
    group_id = self.subject_id if self.subject_kind == "group" else None
    return {
        "execution_mode": self.execution_mode,
        "channel_id": self.channel_id,
        "channel_name": self.channel_name,
        "subject_id": self.subject_id,
        "subject_kind": self.subject_kind,
        "user_id": user_id,
        "group_id": group_id,
        "initiator_user_id": self.initiator_user_id,
        "platform": self.platform,
        "chat_type": self.chat_type,
        "attachments": list(self.attachments),
        "invoked_at": self.invoked_at,
    }

llm_expose.core.tool_aware_completion

Reusable tool-aware LLM completion handler (auto-execute mode).

ToolAwareCompletion

Provider-agnostic tool-aware completion with automatic tool execution.

This handler executes all tool calls automatically without approval. For approval-based workflows, use Orchestrator's approval handlers.

Usage

async with ToolAwareCompletion(...) as handler: response = await handler.complete(messages)

Source code in llm_expose/core/tool_aware_completion.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
class ToolAwareCompletion:
    """Provider-agnostic tool-aware completion with automatic tool execution.

    This handler executes all tool calls automatically without approval.
    For approval-based workflows, use Orchestrator's approval handlers.

    Usage:
        async with ToolAwareCompletion(...) as handler:
            response = await handler.complete(messages)
    """

    def __init__(
        self,
        provider: BaseProvider,
        mcp_config: MCPConfig | None = None,
        mcp_runtime: MCPRuntimeManager | None = None,
        requested_servers: list[str] | None = None,
        timeout_seconds: int = 30,
    ):
        """Initialize tool-aware completion handler.

        Args:
            provider: LLM provider (LiteLLM, etc.)
            mcp_config: MCP configuration (creates new runtime if provided)
            mcp_runtime: Existing runtime (reuse if provided, e.g., from Orchestrator)
            requested_servers: Filter to these server names (used with mcp_config)
            timeout_seconds: Tool execution timeout

        Note: Provide EITHER mcp_config OR mcp_runtime, not both.
        """
        self._provider = provider
        self._timeout_seconds = timeout_seconds
        self._owns_runtime = mcp_runtime is None

        if mcp_runtime is not None:
            # Reuse existing runtime (Orchestrator pattern)
            self._mcp_runtime = mcp_runtime
        elif mcp_config is not None:
            # Create new runtime (message command pattern)
            if requested_servers:
                # Filter config to requested servers
                filtered_config = MCPConfig(
                    servers=[
                        s for s in mcp_config.servers if s.name in requested_servers
                    ],
                    settings=mcp_config.settings,
                )
                self._mcp_runtime = MCPRuntimeManager(filtered_config)
            else:
                self._mcp_runtime = MCPRuntimeManager(mcp_config)
        else:
            raise ValueError("Must provide either mcp_config or mcp_runtime")

    async def __aenter__(self) -> ToolAwareCompletion:
        """Context manager entry: initialize runtime if we own it."""
        if self._owns_runtime:
            await self._mcp_runtime.initialize()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit: shutdown runtime if we own it."""
        if self._owns_runtime:
            await self._mcp_runtime.shutdown()

    async def complete(
        self,
        messages: list[Message],
        *,
        execution_context: ToolExecutionContext | None = None,
        max_rounds: int = 8,
    ) -> str:
        """Execute tool-aware completion loop (auto-execute all tools).

        Args:
            messages: Conversation history (OpenAI format)
            max_rounds: Maximum tool execution rounds

        Returns:
            Final assistant response text
        """
        # Clone messages to avoid mutation
        history = messages[:]
        tools = self._mcp_runtime.tools

        for _round_num in range(max_rounds):
            # Call LLM with tools
            assistant_message = await self._provider_complete_message(
                history,
                tools=tools,
                tool_choice="auto",
            )
            history.append(assistant_message)

            # Check for tool calls
            tool_calls = assistant_message.get("tool_calls") or []
            if not tool_calls:
                # No more tools—return final response
                content = assistant_message.get("content") or ""
                return str(content)

            # Execute all tool calls
            await self._execute_tool_calls(
                history,
                tool_calls,
                execution_context=execution_context,
            )

        # Max rounds exceeded
        fallback = (
            "Tool execution exceeded maximum rounds; stopping to avoid infinite loop."
        )
        history.append({"role": "assistant", "content": fallback})
        return fallback

    async def _provider_complete_message(
        self,
        history: list[Message],
        *,
        tools: list[ToolSpec],
        tool_choice: str,
    ) -> Message:
        """Call provider with tools, returning Message dict.

        Mirrors Orchestrator._provider_complete_message pattern:
        - Try complete_with_message() first (returns Message with tool_calls)
        - Fallback to complete() if not available
        """
        complete_with_message = getattr(self._provider, "complete_with_message", None)
        if callable(complete_with_message):
            maybe_message = complete_with_message(
                history,
                tools=tools,
                tool_choice=tool_choice,
            )
            if inspect.isawaitable(maybe_message):
                message = await maybe_message
                if isinstance(message, dict):
                    return message
                if hasattr(message, "model_dump"):
                    dumped = message.model_dump(exclude_none=True)
                    if isinstance(dumped, dict):
                        return dumped
                return {
                    "role": getattr(message, "role", "assistant"),
                    "content": getattr(message, "content", ""),
                    "tool_calls": getattr(message, "tool_calls", None),
                }

        # Fallback to simple complete()
        content = await self._provider.complete(
            history,
            tools=tools,
            tool_choice=tool_choice,
        )
        return {"role": "assistant", "content": content}

    async def _execute_tool_calls(
        self,
        history: list[Message],
        tool_calls: list[Any],
        *,
        execution_context: ToolExecutionContext | None = None,
    ) -> None:
        """Execute tool calls and append results to history.

        Mirrors Orchestrator._execute_tool_calls pattern.
        """
        for call in tool_calls:
            call_id = self._tool_call_id(call)
            try:
                tool_result = await asyncio.wait_for(
                    self._mcp_runtime.execute_tool_call(
                        call,
                        execution_context=execution_context,
                    ),
                    timeout=self._timeout_seconds,
                )
            except TimeoutError:
                tool_result = f"MCP tool execution timed out after {self._timeout_seconds} seconds."
            except Exception as exc:
                tool_result = f"MCP tool execution failed: {exc}"

            history.append(
                {
                    "role": "tool",
                    "tool_call_id": call_id,
                    "content": tool_result,
                }
            )

    @staticmethod
    def _tool_call_id(tool_call: Any) -> str:
        """Extract tool call ID (OpenAI format).

        Mirrors Orchestrator._tool_call_id static method.
        """
        if isinstance(tool_call, dict):
            return str(tool_call.get("id") or "unknown_tool_call")
        return str(getattr(tool_call, "id", "unknown_tool_call"))

__aenter__() async

Context manager entry: initialize runtime if we own it.

Source code in llm_expose/core/tool_aware_completion.py
71
72
73
74
75
async def __aenter__(self) -> ToolAwareCompletion:
    """Context manager entry: initialize runtime if we own it."""
    if self._owns_runtime:
        await self._mcp_runtime.initialize()
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Context manager exit: shutdown runtime if we own it.

Source code in llm_expose/core/tool_aware_completion.py
77
78
79
80
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit: shutdown runtime if we own it."""
    if self._owns_runtime:
        await self._mcp_runtime.shutdown()

__init__(provider, mcp_config=None, mcp_runtime=None, requested_servers=None, timeout_seconds=30)

Initialize tool-aware completion handler.

Parameters:

Name Type Description Default
provider BaseProvider

LLM provider (LiteLLM, etc.)

required
mcp_config MCPConfig | None

MCP configuration (creates new runtime if provided)

None
mcp_runtime MCPRuntimeManager | None

Existing runtime (reuse if provided, e.g., from Orchestrator)

None
requested_servers list[str] | None

Filter to these server names (used with mcp_config)

None
timeout_seconds int

Tool execution timeout

30

Note: Provide EITHER mcp_config OR mcp_runtime, not both.

Source code in llm_expose/core/tool_aware_completion.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
    self,
    provider: BaseProvider,
    mcp_config: MCPConfig | None = None,
    mcp_runtime: MCPRuntimeManager | None = None,
    requested_servers: list[str] | None = None,
    timeout_seconds: int = 30,
):
    """Initialize tool-aware completion handler.

    Args:
        provider: LLM provider (LiteLLM, etc.)
        mcp_config: MCP configuration (creates new runtime if provided)
        mcp_runtime: Existing runtime (reuse if provided, e.g., from Orchestrator)
        requested_servers: Filter to these server names (used with mcp_config)
        timeout_seconds: Tool execution timeout

    Note: Provide EITHER mcp_config OR mcp_runtime, not both.
    """
    self._provider = provider
    self._timeout_seconds = timeout_seconds
    self._owns_runtime = mcp_runtime is None

    if mcp_runtime is not None:
        # Reuse existing runtime (Orchestrator pattern)
        self._mcp_runtime = mcp_runtime
    elif mcp_config is not None:
        # Create new runtime (message command pattern)
        if requested_servers:
            # Filter config to requested servers
            filtered_config = MCPConfig(
                servers=[
                    s for s in mcp_config.servers if s.name in requested_servers
                ],
                settings=mcp_config.settings,
            )
            self._mcp_runtime = MCPRuntimeManager(filtered_config)
        else:
            self._mcp_runtime = MCPRuntimeManager(mcp_config)
    else:
        raise ValueError("Must provide either mcp_config or mcp_runtime")

complete(messages, *, execution_context=None, max_rounds=8) async

Execute tool-aware completion loop (auto-execute all tools).

Parameters:

Name Type Description Default
messages list[Message]

Conversation history (OpenAI format)

required
max_rounds int

Maximum tool execution rounds

8

Returns:

Type Description
str

Final assistant response text

Source code in llm_expose/core/tool_aware_completion.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
async def complete(
    self,
    messages: list[Message],
    *,
    execution_context: ToolExecutionContext | None = None,
    max_rounds: int = 8,
) -> str:
    """Execute tool-aware completion loop (auto-execute all tools).

    Args:
        messages: Conversation history (OpenAI format)
        max_rounds: Maximum tool execution rounds

    Returns:
        Final assistant response text
    """
    # Clone messages to avoid mutation
    history = messages[:]
    tools = self._mcp_runtime.tools

    for _round_num in range(max_rounds):
        # Call LLM with tools
        assistant_message = await self._provider_complete_message(
            history,
            tools=tools,
            tool_choice="auto",
        )
        history.append(assistant_message)

        # Check for tool calls
        tool_calls = assistant_message.get("tool_calls") or []
        if not tool_calls:
            # No more tools—return final response
            content = assistant_message.get("content") or ""
            return str(content)

        # Execute all tool calls
        await self._execute_tool_calls(
            history,
            tool_calls,
            execution_context=execution_context,
        )

    # Max rounds exceeded
    fallback = (
        "Tool execution exceeded maximum rounds; stopping to avoid infinite loop."
    )
    history.append({"role": "assistant", "content": fallback})
    return fallback