【Node.js】StreamとSSEをまとめてみた

こんにちは、フリーランスエンジニアの太田雅昭です。

StreamとSSE

Streamは逐次送信する方式で、SSEはStreamの上位ラッパーです。それぞれ比較してみます。

Steam

Steamの場合、単純にデータを逐次送信します。例えばtext/plainデータなら以下のようになります。テキストを自前でデコードする必要があります。

// plain/server.ts

import fs from 'node:fs';
import http from 'node:http';
import path from 'node:path';

const PORT = 3500;

const server = http.createServer(async (req, res) => {
  if (req.url === '/') {
    outputHtml(res);
  } else if (req.url === '/stream') {
    outputStream(res);
  }
});

server.listen(PORT, () => {
  console.log(`HTTP server listening on http://localhost:${PORT}`);
});


function outputHtml(res: http.ServerResponse) {
  res.writeHead(200, {
    'Content-Type': 'text/html; charset=utf-8',
  });
  const html = fs.readFileSync(path.join(__dirname, 'client.html'), 'utf8');
  res.write(html);
  res.end();
}

async function outputStream(res: http.ServerResponse) {
  res.writeHead(200, {
    // Plain text streaming
    'Content-Type': 'text/plain; charset=utf-8',
    'Content-Encoding': 'none', // gzipなどを排除
    'Cache-Control': 'no-cache, no-transform',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no', // プロキシバッファを排除 (nginx etc.)
  });

  for (let i = 0; i < 5; i++) {
    res.write(`chunk ${i} ${Date.now()}\n`);
    await new Promise(resolve => setTimeout(resolve, 1000));
  }

  res.write('end\n');
  res.end();
}
<!-- plain/client.html -->
<html>

<body>
  <h1>Plain Text Streaming</h1>
  <div id="messages"></div>
</body>

<script>
  const element = document.getElementById('messages');
  const log = (message) => {
    element.innerHTML += message + '<br>';
  };

  async function init() {
    const response = await fetch('/stream');
    const reader = response.body.getReader();

    while (true) {
      const res = await reader.read();
      if (res.done) break;
      // ReadableStreamはバイトストリームなので、デコードして文字列に変換する
      log(new TextDecoder().decode(res.value));
    }
  }

  init();

</script>

</html>

SSE

SSEはStreamの上位ラッパーで、イベントやデコード処理などを担ってくれます。送信側もフォーマットに従う必要があります。フォーマットはfield: valueの形式で、filedにはevent,dataなどがあります。eventが省略されると自動でevent: messageが割り振られます。またeventの後には必ずdata送信も必要です。

// sse/server.ts

import fs from 'node:fs';
import http from 'node:http';
import path from 'node:path';

const PORT = 3500;

const server = http.createServer(async (req, res) => {
  if (req.url === '/') {
    outputHtml(res);
  } else if (req.url === '/stream') {
    outputStream(res);
  }
});

server.listen(PORT, () => {
  console.log(`HTTP server listening on http://localhost:${PORT}`);
});


function outputHtml(res: http.ServerResponse) {
  res.writeHead(200, {
    'Content-Type': 'text/html; charset=utf-8',
  });
  const html = fs.readFileSync(path.join(__dirname, 'client.html'), 'utf8');
  res.write(html);
  res.end();
}

async function outputStream(res: http.ServerResponse) {
  res.writeHead(200, {
    // SSEはtext/event-streamを返す
    'Content-Type': 'text/event-stream; charset=utf-8',
    'Content-Encoding': 'none', // gzipなどを排除
    'Cache-Control': 'no-cache, no-transform',
    'Connection': 'keep-alive',
    'X-Accel-Buffering': 'no', // プロキシバッファを排除 (nginx etc.)
  });

  for (let i = 0; i < 5; i++) {

    // データを送信
    res.write(`data: chunk ${i} ${Date.now()}\n\n`);

    await new Promise(resolve => setTimeout(resolve, 1000));
  }

  // endイベントを送信。dataも必要なので空dataも入れている。
  res.write('event: end\n');
  res.write('data: \n\n');
  res.end();
}
<!-- sse/client.html -->
<html>

<body>
  <h1>SSE Streaming</h1>
  <div id="messages"></div>
</body>

<script>
  const element = document.getElementById('messages');
  const log = (message) => {
    element.innerHTML += message + '<br>';
  };

  const sse = new EventSource('/stream');
  sse.onmessage = (event) => {
    log(event.data);
  };

  sse.addEventListener('end', (event) => {
    log('end');
    sse.close();
  });

</script>


</html>