【Node.js】StreamとSSEをまとめてみた
こんにちは、フリーランスエンジニアの太田雅昭です。
StreamとSSE
Streamは逐次送信する方式で、SSEはStreamの上位ラッパーです。それぞれ比較してみます。
Steam
Steamの場合、単純にデータを逐次送信します。例えばtext/plainデータなら以下のようになります。テキストを自前でデコードする必要があります。
// plain/server.ts
import fs from 'node:fs';
import http from 'node:http';
import path from 'node:path';
const PORT = 3500;
const server = http.createServer(async (req, res) => {
if (req.url === '/') {
outputHtml(res);
} else if (req.url === '/stream') {
outputStream(res);
}
});
server.listen(PORT, () => {
console.log(`HTTP server listening on http://localhost:${PORT}`);
});
function outputHtml(res: http.ServerResponse) {
res.writeHead(200, {
'Content-Type': 'text/html; charset=utf-8',
});
const html = fs.readFileSync(path.join(__dirname, 'client.html'), 'utf8');
res.write(html);
res.end();
}
async function outputStream(res: http.ServerResponse) {
res.writeHead(200, {
// Plain text streaming
'Content-Type': 'text/plain; charset=utf-8',
'Content-Encoding': 'none', // gzipなどを排除
'Cache-Control': 'no-cache, no-transform',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', // プロキシバッファを排除 (nginx etc.)
});
for (let i = 0; i < 5; i++) {
res.write(`chunk ${i} ${Date.now()}\n`);
await new Promise(resolve => setTimeout(resolve, 1000));
}
res.write('end\n');
res.end();
}
<!-- plain/client.html -->
<html>
<body>
<h1>Plain Text Streaming</h1>
<div id="messages"></div>
</body>
<script>
const element = document.getElementById('messages');
const log = (message) => {
element.innerHTML += message + '<br>';
};
async function init() {
const response = await fetch('/stream');
const reader = response.body.getReader();
while (true) {
const res = await reader.read();
if (res.done) break;
// ReadableStreamはバイトストリームなので、デコードして文字列に変換する
log(new TextDecoder().decode(res.value));
}
}
init();
</script>
</html>
SSE
SSEはStreamの上位ラッパーで、イベントやデコード処理などを担ってくれます。送信側もフォーマットに従う必要があります。フォーマットはfield: valueの形式で、filedにはevent,dataなどがあります。eventが省略されると自動でevent: messageが割り振られます。またeventの後には必ずdata送信も必要です。
// sse/server.ts
import fs from 'node:fs';
import http from 'node:http';
import path from 'node:path';
const PORT = 3500;
const server = http.createServer(async (req, res) => {
if (req.url === '/') {
outputHtml(res);
} else if (req.url === '/stream') {
outputStream(res);
}
});
server.listen(PORT, () => {
console.log(`HTTP server listening on http://localhost:${PORT}`);
});
function outputHtml(res: http.ServerResponse) {
res.writeHead(200, {
'Content-Type': 'text/html; charset=utf-8',
});
const html = fs.readFileSync(path.join(__dirname, 'client.html'), 'utf8');
res.write(html);
res.end();
}
async function outputStream(res: http.ServerResponse) {
res.writeHead(200, {
// SSEはtext/event-streamを返す
'Content-Type': 'text/event-stream; charset=utf-8',
'Content-Encoding': 'none', // gzipなどを排除
'Cache-Control': 'no-cache, no-transform',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', // プロキシバッファを排除 (nginx etc.)
});
for (let i = 0; i < 5; i++) {
// データを送信
res.write(`data: chunk ${i} ${Date.now()}\n\n`);
await new Promise(resolve => setTimeout(resolve, 1000));
}
// endイベントを送信。dataも必要なので空dataも入れている。
res.write('event: end\n');
res.write('data: \n\n');
res.end();
}
<!-- sse/client.html -->
<html>
<body>
<h1>SSE Streaming</h1>
<div id="messages"></div>
</body>
<script>
const element = document.getElementById('messages');
const log = (message) => {
element.innerHTML += message + '<br>';
};
const sse = new EventSource('/stream');
sse.onmessage = (event) => {
log(event.data);
};
sse.addEventListener('end', (event) => {
log('end');
sse.close();
});
</script>
</html>