使用 Node.js 進行 Shell 腳本編寫
您可以購買此書的離線版本(HTML、PDF、EPUB、MOBI),並支持免費的線上版本。
(廣告,請不要封鎖。)

9 原生 Node.js 串流



本章是 Node 原生串流的簡介。它們支援 非同步反覆運算,這讓它們更容易使用,而且這也是我們在本章中會大量使用的功能。

請注意,跨平台的網路串流會在 §10「在 Node.js 上使用網路串流」 中介紹。我們在本書中會大量使用這些串流。因此,如果您願意,可以跳過本章。

9.1 回顧:非同步反覆運算和非同步產生器

非同步反覆運算是一種非同步擷取資料容器內容的協定(表示在擷取項目之前,目前的「工作」可能會暫停)。

非同步產生器有助於非同步反覆運算。例如,這是一個非同步產生器函式

/**
 * @returns an asynchronous iterable
 */
async function* asyncGenerator(asyncIterable) {
  for await (const item of asyncIterable) { // input
    if (···) {
      yield '> ' + item; // output
    }
  }
}

在本章的其餘部分,請密切注意函式是非同步函式或非同步產生器函式

/** @returns a Promise */
async function asyncFunction() { /*···*/ }

/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }

9.2 串流

串流是一種模式,其核心概念是「分而治之」大量資料:如果我們將其分割成較小的部分,並一次處理一個部分,我們就可以處理它。

Node.js 支援多種串流,例如

9.2.1 串聯

若要以多個步驟處理串流資料,我們可以串聯(連接)串流

  1. 輸入透過可讀取串流接收。
  2. 每個處理步驟透過轉換串流執行。
  3. 對於最後一個處理步驟,我們有兩個選項
    • 我們可以將最新可讀取串流中的資料寫入可寫入串流。也就是說,可寫入串流是我們串聯的最後一個元素。
    • 我們可以以其他方式處理最新可讀取串流中的資料。

第 (2) 部分是選用的。

9.2.2 文字編碼

在建立文字串流時,最好總是指定編碼

編碼的預設值為 null,等同於 'utf8'

9.2.3 輔助函式:readableToString()

我們偶爾會使用以下輔助函式。您不需要了解它的運作方式,只需要(大致上)了解它的功能。

import * as stream from 'stream';

/**
 * Reads all the text in a readable stream and returns it as a string,
 * via a Promise.
 * @param {stream.Readable} readable
 */
function readableToString(readable) {
  return new Promise((resolve, reject) => {
    let data = '';
    readable.on('data', function (chunk) {
      data += chunk;
    });
    readable.on('end', function () {
      resolve(data);
    });
    readable.on('error', function (err) {
      reject(err);
    });
  });
}

此函式透過基於事件的 API 實作。稍後我們將看到一個更簡單的方法來執行此操作,透過非同步反覆運算。

9.2.4 一些初步說明

9.3 可讀取串流

9.3.1 建立可讀取串流

9.3.1.1 從檔案建立可讀取串流

我們可以使用 fs.createReadStream() 建立可讀取串流

import * as fs from 'fs';

const readableStream = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});

assert.equal(
  await readableToString(readableStream),
  'This is a test!\n');
9.3.1.2 Readable.from():從可迭代物件建立可讀取串流

靜態方法 Readable.from(iterable, options?) 建立一個可讀取串流,其中包含 iterable 中的資料。iterable 可以是同步可迭代物件或非同步可迭代物件。參數 options 是選用的,可用於指定文字編碼等內容。

import * as stream from 'stream';

function* gen() {
  yield 'One line\n';
  yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
assert.equal(
  await readableToString(readableStream),
  'One line\nAnother line\n');
9.3.1.2.1 從字串建立可讀取串流

Readable.from() 接受任何可迭代物件,因此也可以用於將字串轉換為串流

import {Readable} from 'stream';

const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
assert.equal(
  await readableToString(readable),
  'Some text!');

目前Readable.from() 將字串視為任何其他可迭代物件,因此會反覆運算其碼點。在效能方面,這並不是理想的,但對於大多數使用案例來說應該是可以接受的。我預期 Readable.from() 會經常與字串一起使用,因此未來可能會進行最佳化。

9.3.2 透過 for-await-of 從可讀取串流讀取區塊

每個可讀取串流都是非同步可迭代物件,這表示我們可以使用 for-await-of 迴圈來讀取其內容

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test!\n'
9.3.2.1 將可讀取串流的內容收集到字串中

下列函式是我們在本章節一開始看到的函式的更簡單重新實作。

import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');

請注意,在這種情況下,我們必須使用非同步函式,因為我們想要傳回 Promise。

9.3.3 透過模組 'node:readlines' 從可讀取串流讀取行

內建模組 'node:readline' 讓我們可以從可讀取串流讀取行

import * as fs from 'node:fs';
import * as readline from 'node:readline/promises';

const filePath = process.argv[2]; // first command line argument

const rl = readline.createInterface({
  input: fs.createReadStream(filePath, {encoding: 'utf-8'}),
});
for await (const line of rl) {
  console.log('>', line);
}
rl.close();

9.4 透過非同步產生器轉換可讀取串流

非同步反覆運算提供了一個優雅的替代方案,可以轉換串流,以多個步驟處理串流資料

總之,這些是此類處理管線的組成部分

可讀
→ 第一個非同步產生器 [→ … → 最後一個非同步產生器]
→ 可讀或非同步函數

9.4.1 從非同步可迭代物件中的區塊轉到編號行

在以下範例中,我們將看到處理管線的範例,正如剛才所解釋的。

import {Readable} from 'stream';

/**
 * @param chunkIterable An asynchronous or synchronous iterable
 * over “chunks” (arbitrary strings)
 * @returns An asynchronous iterable over “lines”
 * (strings with at most one newline that always appears at the end)
 */
async function* chunksToLines(chunkIterable) {
  let previous = '';
  for await (const chunk of chunkIterable) {
    let startSearch = previous.length;
    previous += chunk;
    while (true) {
      // Works for EOL === '\n' and EOL === '\r\n'
      const eolIndex = previous.indexOf('\n', startSearch);
      if (eolIndex < 0) break;
      // Line includes the EOL
      const line = previous.slice(0, eolIndex+1);
      yield line;
      previous = previous.slice(eolIndex+1);
      startSearch = 0;
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

async function* numberLines(lineIterable) {
  let lineNumber = 1;
  for await (const line of lineIterable) {
    yield lineNumber + ' ' + line;
    lineNumber++;
  }
}

async function logLines(lineIterable) {
  for await (const line of lineIterable) {
    console.log(line);
  }
}

const chunks = Readable.from(
  'Text with\nmultiple\nlines.\n',
  {encoding: 'utf8'});
await logLines(numberLines(chunksToLines(chunks))); // (A)

// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'

處理管線在 A 行中設定。步驟如下

觀察

9.5 可寫串流

9.5.1 為檔案建立可寫串流

我們可以使用 fs.createWriteStream() 建立可寫串流

const writableStream = fs.createWriteStream(
  'tmp/log.txt', {encoding: 'utf8'});

9.5.2 寫入可寫串流

在本節中,我們將探討寫入可寫串流的方法

  1. 透過其方法 .write() 直接寫入可寫串流。
  2. 使用模組 stream 中的函數 pipeline() 將可讀串流導管到可寫串流。

為了展示這些方法,我們使用它們來實作相同的函數 writeIterableToFile()

可讀串流的方法 .pipe() 也支援導管,但它有缺點,最好避免使用。

9.5.2.1 writable.write(chunk)

在將資料寫入串流時,有兩個基於回呼的機制可以幫助我們

在以下範例中,我們將這些機制承諾化,以便透過非同步函式使用它們

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

stream.finished() 的預設版本是基於回呼,但可透過 util.promisify() (A 行) 轉換為基於承諾的版本。

我們使用以下兩種模式

9.5.2.2 透過 stream.pipeline() 將可讀串流導向可寫串流

在 A 行中,我們使用 stream.pipeline() 的承諾化版本將可讀串流 readable 導向可寫串流 writable

import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);

async function writeIterableToFile(iterable, filePath) {
  const readable = stream.Readable.from(
    iterable, {encoding: 'utf8'});
  const writable = fs.createWriteStream(filePath);
  await pipeline(readable, writable); // (A)
}
await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
// ···

方法 readable.pipe() 也支援導向,但有一個 注意事項:如果可讀串流發出錯誤,則可寫串流不會自動關閉。pipeline() 沒有這個注意事項。

模組 os

模組 buffer

模組 stream

模組 fs

本節中的靜態類型資訊基於 Definitely Typed

9.7 延伸閱讀和本章來源