readableToString()
for-await-of
從可讀取串流讀取區塊'node:readlines'
從可讀取串流讀取行本章是 Node 原生串流的簡介。它們支援 非同步反覆運算,這讓它們更容易使用,而且這也是我們在本章中會大量使用的功能。
請注意,跨平台的網路串流會在 §10「在 Node.js 上使用網路串流」 中介紹。我們在本書中會大量使用這些串流。因此,如果您願意,可以跳過本章。
非同步反覆運算是一種非同步擷取資料容器內容的協定(表示在擷取項目之前,目前的「工作」可能會暫停)。
非同步產生器有助於非同步反覆運算。例如,這是一個非同步產生器函式
/**
* @returns an asynchronous iterable
*/
async function* asyncGenerator(asyncIterable) {
for await (const item of asyncIterable) { // input
if (···) {
yield '> ' + item; // output
}
} }
for-await-of
迴圈會反覆運算輸入的 asyncIterable
。這個迴圈在一般的非同步函式中也可以使用。yield
會將值傳送到這個產生器回傳的非同步可反覆運算中。在本章的其餘部分,請密切注意函式是非同步函式或非同步產生器函式
/** @returns a Promise */
async function asyncFunction() { /*···*/ }
/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }
串流是一種模式,其核心概念是「分而治之」大量資料:如果我們將其分割成較小的部分,並一次處理一個部分,我們就可以處理它。
Node.js 支援多種串流,例如
可讀取串流是我們可以從中讀取資料的串流。換句話說,它們是資料的來源。一個範例是可讀取檔案串流,它讓我們可以讀取檔案的內容。
可寫入串流是我們可以寫入資料的串流。換句話說,它們是資料的接收器。一個範例是可寫入檔案串流,它讓我們可以將資料寫入檔案。
轉換串流既可讀取又可寫入。作為可寫入串流,它接收資料片段,轉換(變更或捨棄)它們,然後將它們輸出為可讀取串流。
若要以多個步驟處理串流資料,我們可以串聯(連接)串流
第 (2) 部分是選用的。
在建立文字串流時,最好總是指定編碼
Node.js 文件中有支援編碼及其預設拼法的清單,例如
'utf8'
'utf16le'
'base64'
也允許使用一些不同的拼法。您可以使用 Buffer.isEncoding()
來檢查哪些是
> buffer.Buffer.isEncoding('utf8')true
> buffer.Buffer.isEncoding('utf-8')true
> buffer.Buffer.isEncoding('UTF-8')true
> buffer.Buffer.isEncoding('UTF:8')false
編碼的預設值為 null
,等同於 'utf8'
。
readableToString()
我們偶爾會使用以下輔助函式。您不需要了解它的運作方式,只需要(大致上)了解它的功能。
import * as stream from 'stream';
/**
* Reads all the text in a readable stream and returns it as a string,
* via a Promise.
* @param {stream.Readable} readable
*/
function readableToString(readable) {
return new Promise((resolve, reject) => {
let data = '';
.on('data', function (chunk) {
readable+= chunk;
data ;
}).on('end', function () {
readableresolve(data);
;
}).on('error', function (err) {
readablereject(err);
;
});
}) }
此函式透過基於事件的 API 實作。稍後我們將看到一個更簡單的方法來執行此操作,透過非同步反覆運算。
await
。在這種情況下,我們想像我們在模組內或在非同步函式的本體內。'\n'
(LF)'\r\n'
(CR LF)os
中的 常數 EOL
存取。我們可以使用 fs.createReadStream()
建立可讀取串流
import * as fs from 'fs';
const readableStream = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
.equal(
assertawait readableToString(readableStream),
'This is a test!\n');
Readable.from()
:從可迭代物件建立可讀取串流靜態方法 Readable.from(iterable, options?)
建立一個可讀取串流,其中包含 iterable
中的資料。iterable
可以是同步可迭代物件或非同步可迭代物件。參數 options
是選用的,可用於指定文字編碼等內容。
import * as stream from 'stream';
function* gen() {
yield 'One line\n';
yield 'Another line\n';
}const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
.equal(
assertawait readableToString(readableStream),
'One line\nAnother line\n');
Readable.from()
接受任何可迭代物件,因此也可以用於將字串轉換為串流
import {Readable} from 'stream';
const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
.equal(
assertawait readableToString(readable),
'Some text!');
目前,Readable.from()
將字串視為任何其他可迭代物件,因此會反覆運算其碼點。在效能方面,這並不是理想的,但對於大多數使用案例來說應該是可以接受的。我預期 Readable.from()
會經常與字串一起使用,因此未來可能會進行最佳化。
for-await-of
從可讀取串流讀取區塊每個可讀取串流都是非同步可迭代物件,這表示我們可以使用 for-await-of
迴圈來讀取其內容
import * as fs from 'fs';
async function logChunks(readable) {
for await (const chunk of readable) {
console.log(chunk);
}
}
const readable = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);
// Output:
// 'This is a test!\n'
下列函式是我們在本章節一開始看到的函式的更簡單重新實作。
import {Readable} from 'stream';
async function readableToString2(readable) {
let result = '';
for await (const chunk of readable) {
+= chunk;
result
}return result;
}
const readable = Readable.from('Good morning!', {encoding: 'utf8'});
.equal(await readableToString2(readable), 'Good morning!'); assert
請注意,在這種情況下,我們必須使用非同步函式,因為我們想要傳回 Promise。
'node:readlines'
從可讀取串流讀取行內建模組 'node:readline'
讓我們可以從可讀取串流讀取行
import * as fs from 'node:fs';
import * as readline from 'node:readline/promises';
const filePath = process.argv[2]; // first command line argument
const rl = readline.createInterface({
input: fs.createReadStream(filePath, {encoding: 'utf-8'}),
;
})for await (const line of rl) {
console.log('>', line);
}.close(); rl
非同步反覆運算提供了一個優雅的替代方案,可以轉換串流,以多個步驟處理串流資料
Readable.from()
將其轉換為可讀串流(稍後可以將其導管到可寫串流中)。總之,這些是此類處理管線的組成部分
在以下範例中,我們將看到處理管線的範例,正如剛才所解釋的。
import {Readable} from 'stream';
/**
* @param chunkIterable An asynchronous or synchronous iterable
* over “chunks” (arbitrary strings)
* @returns An asynchronous iterable over “lines”
* (strings with at most one newline that always appears at the end)
*/
async function* chunksToLines(chunkIterable) {
let previous = '';
for await (const chunk of chunkIterable) {
let startSearch = previous.length;
+= chunk;
previous while (true) {
// Works for EOL === '\n' and EOL === '\r\n'
const eolIndex = previous.indexOf('\n', startSearch);
if (eolIndex < 0) break;
// Line includes the EOL
const line = previous.slice(0, eolIndex+1);
yield line;
= previous.slice(eolIndex+1);
previous = 0;
startSearch
}
}if (previous.length > 0) {
yield previous;
}
}
async function* numberLines(lineIterable) {
let lineNumber = 1;
for await (const line of lineIterable) {
yield lineNumber + ' ' + line;
++;
lineNumber
}
}
async function logLines(lineIterable) {
for await (const line of lineIterable) {
console.log(line);
}
}
const chunks = Readable.from(
'Text with\nmultiple\nlines.\n',
encoding: 'utf8'});
{await logLines(numberLines(chunksToLines(chunks))); // (A)
// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'
處理管線在 A 行中設定。步驟如下
chunksToLines()
:從包含區塊的非同步可迭代物件轉到包含行的非同步可迭代物件。numberLines()
:從包含行的非同步可迭代物件轉到包含編號行的非同步可迭代物件。logLines()
:記錄非同步可迭代物件中的項目。觀察
chunksToLines()
和 numberLines()
的輸入和輸出都是非同步可迭代物件。這就是它們是非同步產生器的原因(如 async
和 *
所示)。logLines()
的輸入是非同步可迭代物件。這就是它是非同步函數的原因(如 async
所示)。我們可以使用 fs.createWriteStream()
建立可寫串流
const writableStream = fs.createWriteStream(
'tmp/log.txt', {encoding: 'utf8'});
在本節中,我們將探討寫入可寫串流的方法
.write()
直接寫入可寫串流。stream
中的函數 pipeline()
將可讀串流導管到可寫串流。為了展示這些方法,我們使用它們來實作相同的函數 writeIterableToFile()
。
可讀串流的方法 .pipe()
也支援導管,但它有缺點,最好避免使用。
writable.write(chunk)
在將資料寫入串流時,有兩個基於回呼的機制可以幫助我們
'drain'
表示背壓已結束。finished()
在串流
在以下範例中,我們將這些機制承諾化,以便透過非同步函式使用它們
import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';
const finished = util.promisify(stream.finished); // (A)
async function writeIterableToFile(iterable, filePath) {
const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
for await (const chunk of iterable) {
if (!writable.write(chunk)) { // (B)
// Handle backpressure
await once(writable, 'drain');
}
}.end(); // (C)
writable// Wait until done. Throws if there are errors.
await finished(writable);
}
await writeIterableToFile(
'One', ' line of text.\n'], 'tmp/log.txt');
[.equal(
assert.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
fs'One line of text.\n');
stream.finished()
的預設版本是基於回呼,但可透過 util.promisify()
(A 行) 轉換為基於承諾的版本。
我們使用以下兩種模式
在處理反壓時寫入可寫串流 (B 行)
if (!writable.write(chunk)) {
await once(writable, 'drain');
}
關閉可寫串流並等到寫入完成 (C 行)
.end();
writableawait finished(writable);
stream.pipeline()
將可讀串流導向可寫串流在 A 行中,我們使用 stream.pipeline()
的承諾化版本將可讀串流 readable
導向可寫串流 writable
import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);
async function writeIterableToFile(iterable, filePath) {
const readable = stream.Readable.from(
, {encoding: 'utf8'});
iterableconst writable = fs.createWriteStream(filePath);
await pipeline(readable, writable); // (A)
}await writeIterableToFile(
'One', ' line of text.\n'], 'tmp/log.txt');
[// ···
readable.pipe(destination)
方法 readable.pipe()
也支援導向,但有一個 注意事項:如果可讀串流發出錯誤,則可寫串流不會自動關閉。pipeline()
沒有這個注意事項。
模組 os
const EOL: string
(自 0.7.8 起)
包含目前平台使用的換行字元序列。
模組 buffer
Buffer.isEncoding(encoding: string): boolean
(自 0.9.1 起)
如果 encoding
正確命名 Node.js 支援的其中一種文字編碼,則傳回 true
。 支援的編碼包括
'utf8'
'utf16le'
'ascii'
'latin1
'base64'
'hex'
(每個位元組為兩個十六進位字元)模組 stream
Readable.prototype[Symbol.asyncIterator](): AsyncIterableIterator<any>
(自 10.0.0 起)
可讀串流是可非同步迭代的。例如,您可以在非同步函式或非同步產生器中使用 for-await-of
迴圈來迭代它們。
finished(stream: ReadableStream | WritableStream | ReadWriteStream, callback: (err?: ErrnoException | null) => void): () => Promise<void>
(自 10.0.0 起)
在讀取/寫入完成或發生錯誤時,會解決傳回的承諾。
承諾化版本如下建立
const finished = util.promisify(stream.finished);
pipeline(...streams: Array<ReadableStream|ReadWriteStream|WritableStream>): Promise<void>
(自 10.0.0 起)
在串流之間導向。在串流完成或發生錯誤時,會解決傳回的承諾。
承諾化版本如下建立
const pipeline = util.promisify(stream.pipeline);
Readable.from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable
(自 12.3.0 起)
將可迭代轉換為可讀取串流。
interface ReadableOptions {
?: number;
highWaterMark?: string;
encoding?: boolean;
objectMode?(this: Readable, size: number): void;
read?(this: Readable, error: Error | null,
destroy: (error: Error | null) => void): void;
callback?: boolean;
autoDestroy }
這些選項與 Readable
建構函式的選項相同,並在 此處 說明。
模組 fs
createReadStream(path: string | Buffer | URL, options?: string | {encoding?: string; start?: number}): ReadStream
(自 2.3.0 起)
建立可讀取串流。有更多選項可用。
createWriteStream(path: PathLike, options?: string | {encoding?: string; flags?: string; mode?: number; start?: number}): WriteStream
(自 2.3.0 起)
使用選項 .flags
可以指定是要寫入還是附加,以及在檔案存在或不存在時會發生什麼事。有更多選項可用。
本節中的靜態類型資訊基於 Definitely Typed。