The bundle exposes a streaming endpoint at /rpc/stream that mirrors the
JSON-RPC envelope on input and emits rows over time. This is not JSON-RPC
2.0 (the spec is request/response only) — it’s a deliberate extension for
LLM token streaming, server-sent updates, progressive lists, etc.
The streaming route is off by default — flip it on when you actually have
a #[Rpc\Stream] handler:
json_rpc_server:
routes:
stream: { enabled: true }
If you forget, the compiler pass throws a clear LogicException at container
build listing every method that carries the attribute — no silent 404s.
use Knetesin\JsonRpcServerBundle\Attribute as Rpc;
use Knetesin\JsonRpcServerBundle\Attribute\StreamFormat;
#[Rpc\Method('chat.stream')]
#[Rpc\Stream(format: StreamFormat::Sse)]
final class ChatStream
{
/** @return \Generator<int, array<string, mixed>, void, void> */
public function __invoke(ChatRequest $req): \Generator
{
foreach ($this->generate($req) as $chunk) {
yield ['delta' => $chunk];
}
}
}
Return type must be iterable<mixed> (Generator works best — produces one row
at a time without holding everything in memory).
| Format | Content-Type | Wire format |
|---|---|---|
StreamFormat::Ndjson (default) |
application/x-ndjson |
One JSON object per line. |
StreamFormat::Sse |
text/event-stream |
data: <json>\n\n framing. Browser EventSource works. |
StreamFormat::JsonArray |
application/json |
[<json>,<json>,...] — single valid JSON document, progressively written. |
#[Rpc\Stream(format: StreamFormat::Ndjson)]
#[Rpc\Stream(format: StreamFormat::Sse)]
#[Rpc\Stream(format: StreamFormat::JsonArray)]
curl -X POST http://localhost/rpc/stream \
-H 'Content-Type: application/json' \
-d '{"jsonrpc":"2.0","method":"chat.stream","params":{"prompt":"…"},"id":1}'
Response headers always include:
Cache-Control: no-cacheX-Accel-Buffering: no (tells nginx/Cloudflare not to buffer)The bundle also calls ob_flush() + flush() per row, which means streams
actually stream under PHP-FPM with default output_buffering = 4096.
The endpoint distinguishes pre-stream errors from mid-stream errors:
Detected before the iterator starts (parse, method-not-found, batch > 1, method-not-streaming). Result: plain JSON-RPC envelope, HTTP 4xx/5xx.
| Failure | Status |
|---|---|
| Parse / Invalid Request | 400 |
| Method not found | 404 |
| Internal error | 500 |
{"jsonrpc":"2.0","error":{"code":-32600,"message":"Streaming endpoint accepts only a single request"},"id":1}
Once the iterator has emitted at least one row, headers are already flushed — HTTP status can’t change. The bundle appends an inline error frame in the active format and closes the stream cleanly:
| Format | Error frame |
|---|---|
| NDJSON | Final line {"error":{"code":...,"message":"..."}} |
| SSE | event: error\ndata: {"error":{...}}\n\n |
| JsonArray | Final element {"_error":{...}} (underscored to avoid colliding with valid data shapes) |
Clients should always check the last item for this shape.
/rpc/stream accepts a single envelope. A batch yields a 400 with
Streaming endpoint accepts only a single request. Mixing batch with
streaming has no clean wire semantics — clients should call multiple times.
Streams aren’t notifications — they correlate request and response via id.
A request without id still streams, but the response can’t be matched.
We don’t reject this explicitly; just send id to be safe.
| Combination | Result |
|---|---|
#[Rpc\Stream] + #[Rpc\Cache] |
Compile-time error. A stream is per-call; can’t be replayed from a static blob. |
#[Rpc\Stream] + #[Rpc\RateLimit] |
Allowed. Rate-limit fires before the iterator starts. |
#[Rpc\Stream] + #[Rpc\Mcp] |
Allowed in metadata, but MCP transport doesn’t stream — the LLM client receives the final aggregated content. |
#[Rpc\Stream] + #[Rpc\Method(roles: [...])] |
Allowed. Auth fires before the iterator starts. |
#[Rpc\Method('logs.tail')]
#[Rpc\Stream(format: StreamFormat::Ndjson)]
final class LogsTail
{
public function __construct(private LogReader $reader) {}
public function __invoke(LogsTailRequest $req): \Generator
{
foreach ($this->reader->tail($req->source, $req->follow) as $line) {
yield ['ts' => $line->timestamp, 'level' => $line->level, 'msg' => $line->message];
}
}
}
Reader emits one row per log line. Client uses fetch with a reader that
splits on \n.
#[Rpc\Method('export.progress')]
#[Rpc\Stream(format: StreamFormat::Sse)]
final class ExportProgress
{
public function __invoke(ExportRequest $req): \Generator
{
foreach ($this->exporter->run($req) as $progress) {
yield ['percent' => $progress];
}
yield ['done' => true, 'url' => $this->exporter->resultUrl()];
}
}
Browser:
const es = new EventSource('/rpc/stream', { withCredentials: true });
es.onmessage = e => console.log(JSON.parse(e.data));
es.addEventListener('error', e => console.error('Stream error', e.data));
(SSE over POST needs custom transport — most apps use NDJSON for that reason.)