
The Model Context Protocol (MCP) gives AI agents a standardized interface to call tools — file systems, databases, web browsers. Email is the obvious next integration, and also the most underbuilt. Most teams duct-tape SMTP calls into their agent's Python code and call it done. That works until you need inbound email, threading context, attachment parsing, or per-agent routing — and then it falls apart fast.
This post walks through building a proper MCP email server: one that exposes email as first-class tools an agent can call, handles both sending and receiving, and gives your agent enough context to reason about conversations, not just individual messages.
What MCP actually is (and isn't)
MCP is a JSON-RPC 2.0 protocol. An MCP server exposes three primitives: tools (functions the LLM can call), resources (data the LLM can read), and prompts (templated instructions). Your email server will primarily expose tools — send_email, get_thread, search_inbox, mark_read — and resources for mailbox state.
MCP is transport-agnostic. You can run it over stdio (for local agents), HTTP with Server-Sent Events (for remote agents), or WebSockets. For email infrastructure that agents call over the network, HTTP+SSE is the right choice.
What MCP is not is a magic deliverability layer or an IMAP abstraction. You still need real email infrastructure underneath — SMTP for sending, an inbound parsing webhook or IMAP polling for receiving. MCP is the interface your agent uses to talk to that infrastructure.
Architecture overview
The full stack looks like this:
AI Agent (Claude, GPT-4, etc.)
│
│ MCP tool calls (JSON-RPC over HTTP+SSE)
▼
MCP Email Server ←──── your business logic
│
├── Outbound: SMTP / ESP API (send_email)
├── Inbound: Webhook receiver or IMAP poll
├── Storage: Thread index, message store
└── Auth: Per-agent API keys / mailbox mapping
The MCP server is the layer you're building. It translates tool calls from the agent into real email operations, then returns structured results the agent can reason about.
Defining your tool schema
Start with the tools. Well-designed tool schemas matter more than implementation — they determine whether the LLM calls your tools correctly.
{
"name": "send_email",
"description": "Send an email from the agent's assigned mailbox. Use this when you need to compose and deliver a new message or reply to an existing thread.",
"inputSchema": {
"type": "object",
"properties": {
"to": {
"type": "array",
"items": { "type": "string", "format": "email" },
"description": "Recipient email addresses"
},
"subject": { "type": "string" },
"body": { "type": "string", "description": "Plain text body" },
"html_body": { "type": "string", "description": "Optional HTML body" },
"in_reply_to": {
"type": "string",
"description": "Message-ID of the email being replied to. Include this to maintain thread continuity."
},
"thread_id": {
"type": "string",
"description": "Internal thread identifier. Used to load thread context automatically."
}
},
"required": ["to", "subject", "body"]
}
}
Notice in_reply_to — this maps directly to the In-Reply-To and References headers in RFC 5322. When the agent passes a message ID here, your server sets those headers, and email clients will correctly thread the message. That's the difference between an agent that can participate in a conversation and one that just fires off isolated messages.
Define similar schemas for:
get_thread— returns all messages in a thread, ordered chronologicallysearch_inbox— searches by sender, subject, date range, or full textget_message— fetches a single message with full headers and parsed bodylist_threads— paginated thread listing with unread countsmark_read/mark_unread— state managementget_attachments— returns attachment metadata and optionally content
Server implementation
Here's a minimal MCP email server in Python using the mcp SDK:
import asyncio
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from mcp.types import Tool, TextContent
from starlette.applications import Starlette
from starlette.routing import Route
import httpx
import json
app = Server("email-server")
@app.list_tools()
async def list_tools():
return [
Tool(
name="send_email",
description="Send an email from the agent's mailbox",
inputSchema=SEND_EMAIL_SCHEMA # defined separately
),
Tool(
name="get_thread",
description="Retrieve all messages in a thread by thread_id",
inputSchema=GET_THREAD_SCHEMA
),
Tool(
name="search_inbox",
description="Search inbox by query, sender, or date range",
inputSchema=SEARCH_SCHEMA
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "send_email":
return await handle_send_email(arguments)
elif name == "get_thread":
return await handle_get_thread(arguments)
elif name == "search_inbox":
return await handle_search(arguments)
else:
raise ValueError(f"Unknown tool: {name}")
async def handle_send_email(args: dict):
# Build the SMTP message
headers = {
"To": ", ".join(args["to"]),
"Subject": args["subject"],
}
if "in_reply_to" in args:
headers["In-Reply-To"] = args["in_reply_to"]
# Fetch existing References header from stored thread
thread = await get_thread_by_message_id(args["in_reply_to"])
if thread:
existing_refs = thread.get("references", "")
headers["References"] = f"{existing_refs} {args['in_reply_to']}".strip()
# Send via your ESP or SMTP relay
message_id = await send_via_smtp(args, headers)
return [TextContent(
type="text",
text=json.dumps({
"status": "sent",
"message_id": message_id,
"thread_id": args.get("thread_id")
})
)]
The SSE transport wires this up over HTTP:
from starlette.requests import Request
transport = SseServerTransport("/messages")
async def handle_sse(request: Request):
# Authenticate the agent by API key
api_key = request.headers.get("x-api-key")
agent_context = await authenticate_agent(api_key)
async with transport.connect_sse(
request.scope, request.receive, request._send
) as streams:
await app.run(
streams[0], streams[1],
app.create_initialization_options()
)
starlette_app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Route("/messages", endpoint=transport.handle_post_message)
]
)
Handling inbound email
This is where most implementations cut corners. Sending is easy — receiving is the hard part.
For inbound, you have two options:
Webhook-based (preferred): Your email infrastructure parses inbound messages and POSTs structured JSON to your MCP server. You store the message, update the thread index, and optionally trigger the agent immediately.
IMAP polling: Your server connects to an IMAP server on a schedule, fetches new messages, and processes them. Works with any mailbox but adds latency and complexity.
A webhook handler looks like this:
from starlette.requests import Request
from starlette.responses import JSONResponse
async def handle_inbound_webhook(request: Request):
payload = await request.json()
# Payload from your email provider includes:
# - from, to, subject, text, html
# - headers (Message-ID, In-Reply-To, References)
# - attachments with content_type and data
message_id = payload["headers"].get("message-id")
in_reply_to = payload["headers"].get("in-reply-to")
# Resolve or create thread
thread_id = await resolve_thread(
message_id=message_id,
in_reply_to=in_reply_to,
references=payload["headers"].get("references", "")
)
# Store message
await store_message({
"thread_id": thread_id,
"message_id": message_id,
"from": payload["from"],
"to": payload["to"],
"subject": payload["subject"],
"text": payload["text"],
"html": payload.get("html"),
"received_at": payload["timestamp"],
"headers": payload["headers"]
})
# Optionally: wake up the agent that owns this mailbox
await notify_agent(thread_id, message_id)
return JSONResponse({"status": "ok"})
The resolve_thread function is critical. RFC 5322 threading works via Message-ID, In-Reply-To, and References headers. You need to check whether the incoming message's In-Reply-To matches any stored Message-ID, and if so, add it to the existing thread. If not, create a new one. This is the same algorithm Gmail uses.
Mails.ai's inbound email parsing handles this webhook delivery with full header preservation, attachment extraction, and MIME parsing — which saves you from writing a MIME parser from scratch.
Thread context for the agent
When your agent calls get_thread, return enough context for it to reason about the conversation:
{
"thread_id": "thread_abc123",
"subject": "Re: API integration question",
"participants": ["user@example.com", "agent@yourdomain.com"],
"message_count": 4,
"unread_count": 1,
"messages": [
{
"message_id": "<unique-id@mail.example.com>",
"from": "user@example.com",
"to": ["agent@yourdomain.com"],
"sent_at": "2025-01-15T10:23:00Z",
"body": "Hi, I have a question about...",
"is_from_agent": false
},
{
"message_id": "<reply-id@yourdomain.com>",
"from": "agent@yourdomain.com",
"in_reply_to": "<unique-id@mail.example.com>",
"sent_at": "2025-01-15T10:31:00Z",
"body": "Thanks for reaching out...",
"is_from_agent": true
}
]
}
Include is_from_agent so the agent can distinguish its own prior messages from the user's. Without it, agents will hallucinate conversation history or repeat themselves.
Authentication and mailbox isolation
Running multiple agents — each with their own mailbox — requires strict isolation. An agent authenticated with key A must never be able to call get_thread on a thread belonging to agent B.
The cleanest approach: encode the mailbox ID into the API key at creation time, and enforce it at every tool call:
async def authenticate_agent(api_key: str) -> AgentContext:
# Look up key in your store
key_record = await db.get(f"apikey:{api_key}")
if not key_record:
raise AuthError("Invalid API key")
return AgentContext(
agent_id=key_record["agent_id"],
mailbox=key_record["mailbox_address"], # e.g. agent-42@yourdomain.com
permissions=key_record["permissions"]
)
async def handle_get_thread(args: dict, ctx: AgentContext):
thread = await db.get_thread(args["thread_id"])
# Enforce ownership
if thread["mailbox"] != ctx.mailbox:
raise PermissionError("Thread does not belong to this agent's mailbox")
return format_thread_response(thread)
Deliverability for automated senders
Agents send programmatically — sometimes at high volume, often from fresh domains. That creates deliverability risk human senders don't face.
You need proper DNS authentication on every outbound message:
- SPF:
TXTrecord on your sending domain listing authorized mail servers.v=spf1 include:your-esp.com ~all - DKIM: 2048-bit RSA keypair. Your MCP server signs outbound messages, or your ESP signs on your behalf. The public key goes in DNS as
selector._domainkey.yourdomain.com. - DMARC: Enforces SPF and DKIM alignment. Start with
p=nonefor monitoring, move top=quarantineonce you've verified alignment.TXTrecord at_dmarc.yourdomain.com.
For agents sending at any meaningful volume, a dedicated IP address keeps your agent's sending reputation separate from other senders. Shared IPs mean your deliverability is partly determined by what everyone else on that IP does.
Also configure your tool descriptions to discourage abuse patterns — agents sending the same message to 500 recipients should be running a bulk email flow, not calling send_email 500 times.
Email classification as a tool
Once inbound is working, you can expose email classification as a tool too. Instead of making the agent read every message and decide what it is, pre-classify on inbound and expose the result:
{
"name": "classify_message",
"description": "Returns the classification of an inbound message — support request, sales inquiry, spam, auto-reply, etc.",
"inputSchema": {
"type": "object",
"properties": {
"message_id": { "type": "string" }
},
"required": ["message_id"]
}
}
This costs one LLM call per inbound message at the infrastructure layer but saves multiple tool call round-trips during agent execution. For high-volume mailboxes, it's worth it.
Testing your MCP email server
Test the MCP layer independently from the email infrastructure. The mcp SDK includes a client you can use in tests:
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def test_send_email():
async with stdio_client(StdioServerParameters(
command="python",
args=["your_server.py"]
)) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
result = await session.call_tool(
"send_email",
arguments={
"to": ["test@example.com"],
"subject": "Test message",
"body": "Hello from the test suite"
}
)
assert result.content[0].text # non-empty response
response = json.loads(result.content[0].text)
assert response["status"] == "sent"
assert "message_id" in response
asyncio.run(test_send_email())
Use a test ESP (Mailtrap or similar) for SMTP calls during testing so you don't send real email on every test run.
Frequently Asked Questions
What's the difference between building an MCP email server and just calling an email API from agent code?
A direct API call is fine for one-off sends. MCP becomes valuable when you want: (1) the LLM to decide when to send based on reasoning, (2) multiple agents sharing standardized email tooling, (3) tool results the LLM can inspect and act on iteratively (e.g., check if a reply came in, then decide next step), and (4) consistent schema so you can swap underlying email providers without changing agent prompts.
How do I handle MIME parsing for inbound emails with attachments?
Don't parse raw MIME yourself unless you enjoy debugging quoted-printable encoding edge cases. Use an inbound email provider that delivers pre-parsed JSON — attachments as base64-encoded fields with content type and filename metadata. Your webhook handler then stores attachment metadata separately and exposes a get_attachments tool that returns them on demand, rather than including binary data in every get_thread response.
Should each AI agent get its own email address or can they share one?
Give each agent its own address. Shared mailboxes create race conditions (two agents trying to process the same inbound message), make thread ownership ambiguous, and complicate deliverability — especially if agents have different sending patterns or audiences. Subdomain addressing like agent-name@agents.yourdomain.com scales well and keeps SPF/DKIM configuration simple.
How do I prevent my agent from accidentally sending duplicate emails?
Implement idempotency keys. Before calling your SMTP relay, check a Redis set or database for a hash of (thread_id, subject, body_hash, recipient_list). If the hash exists and the send was within the last 60 seconds, return the existing message ID without sending again. LLMs can retry tool calls on timeout, and you don't want that to mean duplicate messages to real users.
What's the right way to expose email threading context to the LLM without blowing up the context window?
For short threads (under 10 messages), return the full get_thread response. For longer threads, return a summary plus the last 3-5 messages in full. Include total message count so the agent knows it's working with a truncated view. Expose a separate get_message tool for fetching specific older messages when needed. Don't include raw HTML in thread responses — strip to plain text at the storage layer.
Can I use this architecture for an agent that monitors a shared support inbox?
Yes, with an added routing layer. Inbound messages arrive at a shared address, your webhook handler classifies them (by topic, urgency, customer tier), and assigns each thread to a specific agent instance or queue. The email classification and routing step happens before the thread is written to any agent's view. Each agent then only sees threads assigned to it, using the same list_threads and get_thread tools described above.