@cloudsignal/mcp-over-mqtt
Reference implementation of MCP over MQTT. The package lets a tool implementation be exposed as a network-callable MCP server on any conforming MQTT 5 broker, with the same JSON-Schema arguments and structured results as MCP-over-stdio.
Status
v0.1.x, tracking spec v0.1. Published on npm. MIT licensed.
Install
npm install @cloudsignal/mcp-over-mqttThe underlying MQTT 5 client is a transitive dependency. Install @cloudsignal/mqtt-client directly only if you need low-level access.
Quickstart
Stand up a tool server using the MCPServer class. Each tool is defined separately with defineTool (which validates the schema and binds the handler), then registered on the server.
import { MCPServer, defineTool } from '@cloudsignal/mcp-over-mqtt';
const echoTool = defineTool({
name: 'echo',
description: 'Echoes input back to the caller.',
input_schema: {
type: 'object',
properties: { text: { type: 'string' } },
required: ['text'],
},
handler: async ({ args }) => ({ echo: args.text }),
});
const server = new MCPServer({
name: 'example-server',
tools: [echoTool],
broker: 'mqtts://broker.cloudsignal.app:8883',
credentials: {
orgId: process.env.ORG_ID!,
tokenServiceUrl: 'https://tokens.cloudsignal.app',
externalToken: process.env.SERVER_JWT,
},
});
await server.start();A caller using MCPClient on the same broker then invokes it as client.call('echo', { text: 'hello' }). For the full multi-file deployment, see Run MQTT.Agent on a local Mosquitto broker (note: that guide is currently aspirational pending vanilla-MQTT auth support; the working-today equivalent uses CloudSignal-style credentials as shown above).
Common operations
The most-asked patterns are shown here. The full operation list ships alongside each SDK release in the package README and the typedoc.
Stream partial results
Set supports_streaming: true on a tool definition to activate the ctx.stream() method in the handler. Each call publishes a partial-result message to the dedicated stream topic {ns}/mcp/tools/{tool_id}/stream/{call_id} before the final response is sent. The caller subscribes to that topic and receives interim updates as they arrive.
import { MCPServer, defineTool } from '@cloudsignal/mcp-over-mqtt';
const streamingTool = defineTool({
name: 'long-task',
description: 'Runs a multi-step task and streams progress updates.',
input_schema: {
type: 'object',
properties: { steps: { type: 'number' } },
required: ['steps'],
},
supports_streaming: true, // activates ctx.stream in the handler
handler: async ({ steps }, ctx) => {
for (let i = 1; i <= steps; i++) {
// ctx.stream publishes a partial to {ns}/mcp/tools/long-task/stream/{call_id}
await ctx.stream!({ progress: i, of: steps });
// ... do real work ...
}
return { done: true, steps }; // final result published to the response topic
},
});
const server = new MCPServer({
name: 'example-server',
tools: [streamingTool],
broker: 'mqtts://broker.cloudsignal.app:8883',
credentials: {
orgId: process.env.ORG_ID!,
tokenServiceUrl: 'https://tokens.cloudsignal.app',
externalToken: process.env.SERVER_JWT,
},
});
await server.start();See MCP Section 3.6 Streaming partials for the wire-level details.
Run replicas for load balancing
MCPServer subscribes to each tool's call topic via an MQTT shared subscription ($share/mcp-tool-{tool_name}/{ns}/mcp/tools/{tool_name}/call) by default (useSharedSubscription: true). The broker distributes each inbound call to exactly one subscriber, so running multiple identical replicas provides horizontal load balancing without duplicate handling. To disable shared subscriptions and receive every call on all instances, set useSharedSubscription: false. See MCP Section 3.3 Replicas and load balancing for the wire-level details.
import { MCPServer, defineTool } from '@cloudsignal/mcp-over-mqtt';
const echoTool = defineTool({
name: 'echo',
description: 'Echoes input back to the caller.',
input_schema: {
type: 'object',
properties: { text: { type: 'string' } },
required: ['text'],
},
handler: async ({ text }) => ({ echo: text }),
});
// Start as many replicas as you need - each subscribes to:
// $share/mcp-tool-echo/{ns}/mcp/tools/echo/call
// The broker delivers each call to exactly one replica.
const server = new MCPServer({
name: 'echo-server',
tools: [echoTool],
broker: 'mqtts://broker.cloudsignal.app:8883',
credentials: {
orgId: process.env.ORG_ID!,
tokenServiceUrl: 'https://tokens.cloudsignal.app',
externalToken: process.env.SERVER_JWT,
},
useSharedSubscription: true, // default - set false to fan-out to all instances
});
await server.start();See MCP Section 3.3 Replicas and load balancing for the wire-level details.
API surface
@cloudsignal/mcp-over-mqtt covers MCP semantics, tool server registration, replicas, discovery, and partial-result streaming. The full API is documented alongside each release in the package README; typedoc is generated alongside each release.
Pairs with
@cloudsignal/agentfor the calling side of the wire.@cloudsignal/mqtt-client, the underlying MQTT 5 client.
Where to go next
- MCP over MQTT spec - the wire protocol this package implements.
- Substrate - common-ground definitions (claims, topic conventions, QoS, sessions, cards) that MCP depends on.
- Run MQTT.Agent on a local Mosquitto broker - end-to-end testbed including a working tool server (note: guide is currently aspirational pending vanilla-MQTT auth).