官方给的 MCP 示例都是带上 AI 魔法的,或者要么就是控制台或进程调用等,不利于我了解 MCP 的机制。本文记录采用本地进程内的 MCP 服务端和客户端相互通讯的方式,方便大家了解 MCP 的基础机制
本文核心重点在 ITransport 接口上。在 https://github.com/modelcontextprotocol/csharp-sdk 给出的官方示例里面,服务端的通讯现在(2025-05-09)还是使用控制台方式作为输入,客户端的部分采用的是启动 Everything 进程。这个过程不利于我在一个进程内,让 MCP 客户端调用同进程内的 MCP 服务端,让我了解其工作机制
创建 MCP 服务端用到的 McpServerFactory.Create
方法需要传入两个参数,其方法签名定义如下
public static class McpServerFactory
{
public static IMcpServer Create
(
ITransport transport,
McpServerOptions serverOptions,
ILoggerFactory? loggerFactory = null,
IServiceProvider? serviceProvider = null
)
...
}
第一个参数就是上文提到的 ITransport 类型参数,用于决定数据的传入和输出。其接口定义如下
public interface ITransport : IAsyncDisposable
{
ChannelReader<JsonRpcMessage> MessageReader { get; }
Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default);
}
这个接口的定义看起来非常舒服,其意图就是从 MessageReader 让 MCP 服务端收到输入。当 MCP 服务端能够完成响应时,通过 SendMessageAsync 将数据进行输出
再来看看 MCP 客户端的创建 McpClientFactory.CreateAsync
方法,这个方法里面包含了将 MCP 客户端创建出来且和 MCP 服务端完成连接的过程
public static partial class McpClientFactory
{
public static async Task<IMcpClient> CreateAsync
(
IClientTransport clientTransport,
McpClientOptions? clientOptions = null,
ILoggerFactory? loggerFactory = null,
CancellationToken cancellationToken = default
)
...
}
再进一步看看 McpClientFactory.CreateAsync
方法的第一个参数 IClientTransport
接口类型的定义
public interface IClientTransport
{
string Name { get; }
Task<ITransport> ConnectAsync(CancellationToken cancellationToken = default);
}
可以看到 IClientTransport 和 ITransport 的不同在于 IClientTransport 是等待连接之后再返回 ITransport 对象
了解到这里,大概也就能明白了如何在同一个进程内的示例代码编写思路了
在同进程内,客户端连接服务端的 IClientTransport.ConnectAsync
肯定是瞬间完成的,即可以将其约掉,等同于客户端 McpClientFactory.CreateAsync
方法也要有一个 ITransport 接口对象传入
如此可以看到 MCP 客户端和服务端的 ITransport 正好可以传入一对对象,让一方的输入成为另一方的输出,让另一方的输出成为这一方的输入。如此就可以完成通讯逻辑
为了简单起见,我编写了名为 InterprocessTransportFactory 的辅助类,其作用就是创建两个 Channel<JsonRpcMessage>
对象,分别给到客户端和服务端,代码如下
class InterprocessTransportFactory
{
public InterprocessTransportFactory()
{
_clientTransport = new ClientTransport(this);
_serverTransport = new ServerTransport(this);
}
private readonly ClientTransport _clientTransport;
private readonly ServerTransport _serverTransport;
public IClientTransport GetClientTransport()
=> _clientTransport;
public ITransport GetServerTransport()
=> _serverTransport;
class ClientTransport(InterprocessTransportFactory factory) : TransportBase, IClientTransport, ITransport
{
private readonly InterprocessTransportFactory _factory = factory;
public Task<ITransport> ConnectAsync(CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult<ITransport>(this);
}
public string Name => "ClientTransport";
public override async Task SendMessageAsync(JsonRpcMessage message,
CancellationToken cancellationToken = new CancellationToken())
{
await _factory._serverTransport.Channel.Writer.WriteAsync(message, cancellationToken);
}
}
class ServerTransport(InterprocessTransportFactory factory) : TransportBase
{
private readonly InterprocessTransportFactory _factory = factory;
public override async Task SendMessageAsync(JsonRpcMessage message,
CancellationToken cancellationToken = new CancellationToken())
{
await _factory._clientTransport.Channel.Writer.WriteAsync(message, cancellationToken);
}
}
abstract class TransportBase : ITransport
{
public Channel<JsonRpcMessage> Channel { get; } =
System.Threading.Channels.Channel.CreateUnbounded<JsonRpcMessage>();
public ValueTask DisposeAsync()
{
Channel.Writer.Complete();
return ValueTask.CompletedTask;
}
public abstract Task SendMessageAsync(JsonRpcMessage message,
CancellationToken cancellationToken = new CancellationToken());
public ChannelReader<JsonRpcMessage> MessageReader => Channel.Reader;
}
}
核心逻辑就是在 ClientTransport 的 SendMessageAsync 方法,通过 await _factory._serverTransport.Channel.Writer.WriteAsync(message, cancellationToken);
写入到服务端,作为服务端的输入。反之,在服务端的 ServerTransport.SendMessageAsync 方法,也将输出写入到客户端
如以上代码所示,由于是在相同的进程内,客户端连接是瞬时发生的,对应到代码里面就是 ClientTransport.ConnectAsync
方法直接返回自身
完成基础逻辑之后,以下思路就是先创建服务端,再创建客户端,让客户端连接上服务端,且尝试调用服务端的工具
创建服务端需要有两个必要参数,分别是 ITransport 和 McpServerOptions 类型的参数。其中 ITransport 已经在上文准备好了,接下来继续看一下 McpServerOptions 类型的参数
按照设计,将在 McpServerOptions 参数里面放入工具的枚举,即列举出当前服务端有哪些工具的处理,以及收到客户端调用工具时的处理逻辑。这两个处理逻辑底层只是两个委托而已,具体如何实现都需要看咱自己处理,其代码如下
McpServerOptions options = new()
{
ServerInfo = new Implementation() { Name = "MyServer", Version = "1.0.0" },
Capabilities = new ServerCapabilities()
{
Tools = new ToolsCapability()
{
ListToolsHandler = (request, cancellationToken) =>
...,// 返回工具列表
CallToolHandler = (request, cancellationToken) =>
...,// 处理调用工具
}
},
};
框架上层的封装,如官方示例的以下代码,也仅仅只是帮忙封装 ListToolsHandler 和 CallToolHandler 两个属性而已
[McpServerToolType]
public static class EchoTool
{
[McpServerTool, Description("Echoes the message back to the client.")]
public static string Echo(string message) => $"hello {message}";
}
按照官方给的无魔法的实现方式,其示例代码如下
McpServerOptions options = new()
{
ServerInfo = new Implementation() { Name = "MyServer", Version = "1.0.0" },
Capabilities = new ServerCapabilities()
{
Tools = new ToolsCapability()
{
ListToolsHandler = (request, cancellationToken) =>
ValueTask.FromResult(new ListToolsResult()
{
Tools =
[
new Tool()
{
Name = "echo",
Description = "Echoes the input back to the client.",
InputSchema = JsonSerializer.Deserialize<JsonElement>("""
{
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "The input to echo back"
}
},
"required": ["message"]
}
"""),
}
]
}),
CallToolHandler = (request, cancellationToken) =>
{
if (request.Params?.Name == "echo")
{
if (request.Params.Arguments?.TryGetValue("message", out var message) is not true)
{
throw new McpException("Missing required argument 'message'");
}
return ValueTask.FromResult(new CallToolResponse()
{
Content = [new Content() { Text = $"Echo: {message}", Type = "text" }]
});
}
throw new McpException($"Unknown tool: '{request.Params?.Name}'");
},
}
},
}
可以看到在没有魔法的帮助下,只是简单写一个 echo 方法就需要很好代码量
准备好了 McpServerOptions 和 ITransport 类型的对象之后,就可以创建和启动 MCP 服务端了,代码如下
var transportFactory = new InterprocessTransportFactory();
await using IMcpServer server = McpServerFactory.Create(transportFactory.GetServerTransport(), options);
_ = server.RunAsync();
相对比来说,客户端的代码就简单许多了,只需简单从 InterprocessTransportFactory 拿到 IClientTransport 即可调用 McpClientFactory.CreateAsync
方法创建 MCP 客户端,如以下代码所示
var client = await McpClientFactory.CreateAsync(transportFactory.GetClientTransport());
以上代码即可完成创建 MCP 客户端,且让 MCP 客户端连接上 MCP 服务端
连接完成之后,可以使用 IMcpClient.ListToolsAsync
列举出所连接的 MCP 服务端提供的工具列表,代码如下
// Print the list of tools available from the server.
foreach (var tool in await client.ListToolsAsync())
{
Console.WriteLine($"{tool.Name} ({tool.Description})");
}
在本示例里面,会在控制台输出以下内容
echo (Echoes the input back to the client.)
既然枚举到了工具,那接下来就尝试调用一下工具,代码如下
var dictionary = new Dictionary<string, object?>()
{
{ "message", "Hello, World!" }
};
CallToolResponse callToolResponse = await client.CallToolAsync("echo", dictionary);
foreach (var content in callToolResponse.Content)
{
Console.WriteLine($"CallToolResponse: Type={content.Type} Text='{content.Text}'");
}
继续运行以上代码,将在控制台输出以下内容
CallToolResponse: Type=text Text='Echo: Hello, World!'
在执行 await client.CallToolAsync("echo", dictionary)
代码之前,在 ToolsCapability 的 CallToolHandler 委托打上断点,即可通过调用堆栈看到整个调用过程,如以下调用堆栈所示
ModelContextProtocol.dll!ModelContextProtocol.Server.McpServer.InvokeHandlerAsync.__InvokeScopedAsync|0(System.Func<ModelContextProtocol.Server.RequestContext<ModelContextProtocol.Protocol.Types.CallToolRequestParams>, System.Threading.CancellationToken, System.Threading.Tasks.ValueTask<ModelContextProtocol.Protocol.Types.CallToolResponse>> handler = {Method = {System.Reflection.RuntimeMethodInfo}}, ModelContextProtocol.Protocol.Types.CallToolRequestParams args = {ModelContextProtocol.Protocol.Types.CallToolRequestParams}, System.Threading.CancellationToken cancellationToken = IsCancellationRequested = false)
ModelContextProtocol.dll!ModelContextProtocol.Server.McpServer.InvokeHandlerAsync<ModelContextProtocol.Protocol.Types.CallToolRequestParams, ModelContextProtocol.Protocol.Types.CallToolResponse>(System.Func<ModelContextProtocol.Server.RequestContext<ModelContextProtocol.Protocol.Types.CallToolRequestParams>, System.Threading.CancellationToken, System.Threading.Tasks.ValueTask<ModelContextProtocol.Protocol.Types.CallToolResponse>> handler = {Method = {System.Reflection.RuntimeMethodInfo}}, ModelContextProtocol.Protocol.Types.CallToolRequestParams args = {ModelContextProtocol.Protocol.Types.CallToolRequestParams}, ModelContextProtocol.Protocol.Transport.ITransport destinationTransport = null, System.Threading.CancellationToken cancellationToken = IsCancellationRequested = false)
ModelContextProtocol.dll!ModelContextProtocol.Server.McpServer.SetHandler.AnonymousMethod__0(ModelContextProtocol.Protocol.Types.CallToolRequestParams request = {ModelContextProtocol.Protocol.Types.CallToolRequestParams}, ModelContextProtocol.Protocol.Transport.ITransport destinationTransport = null, System.Threading.CancellationToken cancellationToken = IsCancellationRequested = false)
ModelContextProtocol.dll!ModelContextProtocol.Shared.RequestHandlers.Set.AnonymousMethod__0(ModelContextProtocol.Protocol.Messages.JsonRpcRequest request = {ModelContextProtocol.Protocol.Messages.JsonRpcRequest}, System.Threading.CancellationToken cancellationToken = IsCancellationRequested = false)
ModelContextProtocol.dll!ModelContextProtocol.Shared.McpSession.HandleRequest(ModelContextProtocol.Protocol.Messages.JsonRpcRequest request = {ModelContextProtocol.Protocol.Messages.JsonRpcRequest}, System.Threading.CancellationToken cancellationToken = IsCancellationRequested = false)
ModelContextProtocol.dll!ModelContextProtocol.Shared.McpSession.HandleMessageAsync(ModelContextProtocol.Protocol.Messages.JsonRpcMessage message = {ModelContextProtocol.Protocol.Messages.JsonRpcRequest}, System.Threading.CancellationToken cancellationToken = IsCancellationRequested = false)
ModelContextProtocol.dll!ModelContextProtocol.Shared.McpSession.ProcessMessagesAsync.__ProcessMessageAsync|0()
其消息入口代码是在 McpSession.ProcessMessagesAsync
方法里面,其逻辑如下
internal sealed partial class McpSession
{
public async Task ProcessMessagesAsync(CancellationToken cancellationToken)
{
await foreach (var message in _transport.MessageReader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
... // 调度收到的消息的逻辑
}
}
}
如以上代码所示,可见就是从传入的 ITransport 的 MessageReader 属性里面读取到客户端发送过来的输入数据,将其进行调度的
以上就是在相同的一个进程内进行 MCP 客户端和服务端通讯的简单示例代码
整个 Program.cs 代码如下
// See https://aka.ms/new-console-template for more information
using System.Text.Json;
using System.Threading.Channels;
using ModelContextProtocol;
using ModelContextProtocol.Client;
using ModelContextProtocol.Protocol.Messages;
using ModelContextProtocol.Protocol.Transport;
using ModelContextProtocol.Protocol.Types;
using ModelContextProtocol.Server;
var transportFactory = new InterprocessTransportFactory();
McpServerOptions options = new()
{
ServerInfo = new Implementation() { Name = "MyServer", Version = "1.0.0" },
Capabilities = new ServerCapabilities()
{
Tools = new ToolsCapability()
{
ListToolsHandler = (request, cancellationToken) =>
ValueTask.FromResult(new ListToolsResult()
{
Tools =
[
new Tool()
{
Name = "echo",
Description = "Echoes the input back to the client.",
InputSchema = JsonSerializer.Deserialize<JsonElement>("""
{
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "The input to echo back"
}
},
"required": ["message"]
}
"""),
}
]
}),
CallToolHandler = (request, cancellationToken) =>
{
if (request.Params?.Name == "echo")
{
if (request.Params.Arguments?.TryGetValue("message", out var message) is not true)
{
throw new McpException("Missing required argument 'message'");
}
return ValueTask.FromResult(new CallToolResponse()
{
Content = [new Content() { Text = $"Echo: {message}", Type = "text" }]
});
}
throw new McpException($"Unknown tool: '{request.Params?.Name}'");
},
}
},
};
await using IMcpServer server = McpServerFactory.Create(transportFactory.GetServerTransport(), options);
_ = server.RunAsync();
// 以下是客户端代码
var client = await McpClientFactory.CreateAsync(transportFactory.GetClientTransport());
// Print the list of tools available from the server.
foreach (var tool in await client.ListToolsAsync())
{
Console.WriteLine($"{tool.Name} ({tool.Description})");
}
var dictionary = new Dictionary<string, object?>()
{
{ "message", "Hello, World!" }
};
CallToolResponse callToolResponse = await client.CallToolAsync("echo", dictionary);
foreach (var content in callToolResponse.Content)
{
Console.WriteLine($"CallToolResponse: Type={content.Type} Text='{content.Text}'");
}
class InterprocessTransportFactory
{
public InterprocessTransportFactory()
{
_clientTransport = new ClientTransport(this);
_serverTransport = new ServerTransport(this);
}
private readonly ClientTransport _clientTransport;
private readonly ServerTransport _serverTransport;
public IClientTransport GetClientTransport()
=> _clientTransport;
public ITransport GetServerTransport()
=> _serverTransport;
class ClientTransport(InterprocessTransportFactory factory) : TransportBase, IClientTransport, ITransport
{
private readonly InterprocessTransportFactory _factory = factory;
public Task<ITransport> ConnectAsync(CancellationToken cancellationToken = new CancellationToken())
{
return Task.FromResult<ITransport>(this);
}
public string Name => "ClientTransport";
public override async Task SendMessageAsync(JsonRpcMessage message,
CancellationToken cancellationToken = new CancellationToken())
{
await _factory._serverTransport.Channel.Writer.WriteAsync(message, cancellationToken);
}
}
class ServerTransport(InterprocessTransportFactory factory) : TransportBase
{
private readonly InterprocessTransportFactory _factory = factory;
public override async Task SendMessageAsync(JsonRpcMessage message,
CancellationToken cancellationToken = new CancellationToken())
{
await _factory._clientTransport.Channel.Writer.WriteAsync(message, cancellationToken);
}
}
abstract class TransportBase : ITransport
{
public Channel<JsonRpcMessage> Channel { get; } =
System.Threading.Channels.Channel.CreateUnbounded<JsonRpcMessage>();
public ValueTask DisposeAsync()
{
Channel.Writer.Complete();
return ValueTask.CompletedTask;
}
public abstract Task SendMessageAsync(JsonRpcMessage message,
CancellationToken cancellationToken = new CancellationToken());
public ChannelReader<JsonRpcMessage> MessageReader => Channel.Reader;
}
}
本文代码放在 github 和 gitee 上,可以使用如下命令行拉取代码。我整个代码仓库比较庞大,使用以下命令行可以进行部分拉取,拉取速度比较快
先创建一个空文件夹,接着使用命令行 cd 命令进入此空文件夹,在命令行里面输入以下代码,即可获取到本文的代码
git init
git remote add origin https://gitee.com/lindexi/lindexi_gd.git
git pull origin f4257182f3ab9e3e6deb81d41c5716d3a219e9a6
以上使用的是国内的 gitee 的源,如果 gitee 不能访问,请替换为 github 的源。请在命令行继续输入以下代码,将 gitee 源换成 github 源进行拉取代码。如果依然拉取不到代码,可以发邮件向我要代码
git remote remove origin
git remote add origin https://github.com/lindexi/lindexi_gd.git
git pull origin f4257182f3ab9e3e6deb81d41c5716d3a219e9a6
获取代码之后,进入 Workbench/FawheliraLemjawlelcalcelkener 文件夹,即可获取到源代码
更多技术博客,请参阅 博客导航
本文会经常更新,请阅读原文: https://blog.lindexi.com/post/dotnet-MCP-%E6%97%A0%E9%AD%94%E6%B3%95-%E6%9C%AC%E5%9C%B0%E8%BF%9B%E7%A8%8B%E5%86%85%E6%9C%8D%E5%8A%A1%E7%AB%AF%E5%AE%A2%E6%88%B7%E7%AB%AF%E8%B0%83%E7%94%A8%E5%92%8C%E9%80%9A%E8%AE%AF%E7%A4%BA%E4%BE%8B.html ,以避免陈旧错误知识的误导,同时有更好的阅读体验。
如果你想持续阅读我的最新博客,请点击 RSS 订阅,推荐使用RSS Stalker订阅博客,或者收藏我的博客导航
本作品采用
知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议
进行许可。欢迎转载、使用、重新发布,但务必保留文章署名林德熙(包含链接:
https://blog.lindexi.com
),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请
与我联系
。
无盈利,不卖课,做纯粹的技术博客
以下是广告时间
推荐关注 Edi.Wang 的公众号
欢迎进入 Eleven 老师组建的 .NET 社区
以上广告全是友情推广,无盈利