針對急躁的程式設計師的 JavaScript(ES2022 版本)
請支持這本書:購買捐款
(廣告,請不要封鎖。)

42 非同步迭代



  必備知識

對於本章,您應熟悉

42.1 基本非同步迭代

42.1.1 協定:非同步迭代

為了了解非同步迭代如何運作,讓我們先回顧一下同步迭代。它包含以下介面

interface Iterable<T> {
  [Symbol.iterator]() : Iterator<T>;
}
interface Iterator<T> {
  next() : IteratorResult<T>;
}
interface IteratorResult<T> {
  value: T;
  done: boolean;
}

對於非同步迭代的協定,我們只想要變更一件事:由 .next() 產生的值應非同步傳遞。有兩個可行的選項

換句話說,問題在於是否將值或整個迭代結果包裝在 Promises 中。

必須是後者,因為當 .next() 傳回結果時,它會啟動非同步運算。該運算是否產生值或發出迭代結束信號,只能在運算完成後才能確定。因此,.done.value 都需要包裝在 Promise 中。

非同步迭代的介面如下所示。

interface AsyncIterable<T> {
  [Symbol.asyncIterator]() : AsyncIterator<T>;
}
interface AsyncIterator<T> {
  next() : Promise<IteratorResult<T>>; // (A)
}
interface IteratorResult<T> {
  value: T;
  done: boolean;
}

與同步介面唯一的不同是 .next() 的傳回類型(A 行)。

42.1.2 直接使用非同步迭代

下列程式碼直接使用非同步迭代協定

const asyncIterable = syncToAsyncIterable(['a', 'b']); // (A)
const asyncIterator = asyncIterable[Symbol.asyncIterator]();

// Call .next() until .done is true:
asyncIterator.next() // (B)
.then(iteratorResult => {
  assert.deepEqual(
    iteratorResult,
    { value: 'a', done: false });
  return asyncIterator.next(); // (C)
})
.then(iteratorResult => {
  assert.deepEqual(
    iteratorResult,
    { value: 'b', done: false });
  return asyncIterator.next(); // (D)
})
.then(iteratorResult => {
  assert.deepEqual(
    iteratorResult,
     { value: undefined, done: true });
})
;

在 A 行,我們建立一個非同步可迭代物件,其值為 'a''b'。我們稍後會看到 syncToAsyncIterable() 的實作。

我們在 B 行、C 行和 D 行呼叫 .next()。每次,我們都使用 .then() 來解開 Promise,並使用 assert.deepEqual() 來檢查解開的值。

如果我們使用非同步函式,則可以簡化此程式碼。現在,我們透過 await 來解開 Promises,而程式碼看起來幾乎就像我們正在執行同步迭代

async function f() {
  const asyncIterable = syncToAsyncIterable(['a', 'b']);
  const asyncIterator = asyncIterable[Symbol.asyncIterator]();
  
  // Call .next() until .done is true:
  assert.deepEqual(
    await asyncIterator.next(),
    { value: 'a', done: false });
  assert.deepEqual(
    await asyncIterator.next(),
    { value: 'b', done: false });
  assert.deepEqual(
    await asyncIterator.next(),
    { value: undefined, done: true });
}

42.1.3 透過 for-await-of 使用非同步迭代

非同步迭代協定並非用於直接使用。支援它的語言建構之一是 for-await-of 迴圈,它是 for-of 迴圈的非同步版本。它可以在非同步函式和非同步產生器(本章稍後會介紹)中使用。這是 for-await-of 使用範例

for await (const x of syncToAsyncIterable(['a', 'b'])) {
  console.log(x);
}
// Output:
// 'a'
// 'b'

for-await-of 相當靈活。除了非同步可迭代物件之外,它還支援同步可迭代物件

for await (const x of ['a', 'b']) {
  console.log(x);
}
// Output:
// 'a'
// 'b'

它還支援包裝在 Promises 中的值的同步可迭代物件

const arr = [Promise.resolve('a'), Promise.resolve('b')];
for await (const x of arr) {
  console.log(x);
}
// Output:
// 'a'
// 'b'

  練習:將非同步可迭代物件轉換為陣列

警告:我們很快就會在本章看到此練習的解答。

42.2 非同步產生器

非同步產生器同時具有兩種特性

  非同步產生器與同步產生器非常類似

由於非同步產生器和同步產生器非常類似,因此我不會詳細說明 yieldyield* 的運作方式。如果您有疑問,請參閱 §38「同步產生器」

因此,非同步產生器具有

如下所示

async function* asyncGen() {
  // Input: Promises, async iterables
  const x = await somePromise;
  for await (const y of someAsyncIterable) {
    // ···
  }

  // Output
  yield someValue;
  yield* otherAsyncGen();
}

42.2.1 範例:透過非同步產生器建立非同步可迭代物件

我們來看一個範例。以下程式碼建立一個包含三個數字的非同步可迭代物件

async function* yield123() {
  for (let i=1; i<=3; i++) {
    yield i;
  }
}

yield123() 的結果是否符合非同步迭代協定?

async function check() {
  const asyncIterable = yield123();
  const asyncIterator = asyncIterable[Symbol.asyncIterator]();
  assert.deepEqual(
    await asyncIterator.next(),
    { value: 1, done: false });
  assert.deepEqual(
    await asyncIterator.next(),
    { value: 2, done: false });
  assert.deepEqual(
    await asyncIterator.next(),
    { value: 3, done: false });
  assert.deepEqual(
    await asyncIterator.next(),
    { value: undefined, done: true });
}
check();

42.2.2 範例:將同步可迭代物件轉換為非同步可迭代物件

以下非同步產生器將同步可迭代物件轉換為非同步可迭代物件。它實作我們先前使用過的函式 syncToAsyncIterable()

async function* syncToAsyncIterable(syncIterable) {
  for (const elem of syncIterable) {
    yield elem;
  }
}

注意:在此情況下,輸入是同步的(不需要 await)。

42.2.3 範例:將非同步可迭代物件轉換為陣列

以下函式是先前練習的解答。它將非同步可迭代物件轉換為陣列(想像成展開,但適用於非同步可迭代物件,而不是同步可迭代物件)。

async function asyncIterableToArray(asyncIterable) {
  const result = [];
  for await (const value of asyncIterable) {
    result.push(value);
  }
  return result;
}

請注意,我們在此情況下無法使用非同步產生器:我們透過 for-await-of 取得輸入,並傳回包覆在 Promise 中的陣列。後者要求排除了非同步產生器。

這是 asyncIterableToArray() 的測試

async function* createAsyncIterable() {
  yield 'a';
  yield 'b';
}
const asyncIterable = createAsyncIterable();
assert.deepEqual(
  await asyncIterableToArray(asyncIterable), // (A)
  ['a', 'b']
);

請注意第 A 行中的 await,這是為了解開 asyncIterableToArray() 傳回的 Promise。為了讓 await 運作,此程式碼片段必須在非同步函式中執行。

42.2.4 範例:轉換非同步可迭代物件

讓我們實作一個非同步產生器,透過轉換現有的非同步可迭代物件來產生新的非同步可迭代物件。

async function* timesTwo(asyncNumbers) {
  for await (const x of asyncNumbers) {
    yield x * 2;
  }
}

為了測試此函式,我們使用前一節中的 asyncIterableToArray()

async function* createAsyncIterable() {
  for (let i=1; i<=3; i++) {
    yield i;
  }
}
assert.deepEqual(
  await asyncIterableToArray(timesTwo(createAsyncIterable())),
  [2, 4, 6]
);

  練習:非同步產生器

警告:我們很快就會在本章看到此練習的解答。

42.2.5 範例:對非同步可迭代物件進行對應

提醒一下,這是對同步可迭代物件進行對應的方式

function* mapSync(iterable, func) {
  let index = 0;
  for (const x of iterable) {
    yield func(x, index);
    index++;
  }
}
const syncIterable = mapSync(['a', 'b', 'c'], s => s.repeat(3));
assert.deepEqual(
  Array.from(syncIterable),
  ['aaa', 'bbb', 'ccc']);

非同步版本如下所示

async function* mapAsync(asyncIterable, func) { // (A)
  let index = 0;
  for await (const x of asyncIterable) { // (B)
    yield func(x, index);
    index++;
  }
}

請注意同步實作和非同步實作有多麼相似。僅有的兩個差異是 A 行的 async 和 B 行的 await。這相當於從同步函式轉換為非同步函式 – 我們只需要新增關鍵字 async 和偶爾的 await

若要測試 mapAsync(),我們使用輔助函式 asyncIterableToArray() (本章稍早有說明)

async function* createAsyncIterable() {
  yield 'a';
  yield 'b';
}
const mapped = mapAsync(
  createAsyncIterable(), s => s.repeat(3));
assert.deepEqual(
  await asyncIterableToArray(mapped), // (A)
  ['aaa', 'bbb']);

我們再次使用 await 來展開 Promise (A 行),而且此程式碼片段必須在非同步函式中執行。

  練習:filterAsyncIter()

練習/非同步迭代/filter_async_iter_test.mjs

42.3 透過 Node.js 串流進行非同步迭代

42.3.1 Node.js 串流:透過回呼 (推播) 非同步

傳統上,透過回呼非同步地從 Node.js 串流讀取

function main(inputFilePath) {
  const readStream = fs.createReadStream(inputFilePath,
    { encoding: 'utf8', highWaterMark: 1024 });
  readStream.on('data', (chunk) => {
    console.log('>>> '+chunk);
  });
  readStream.on('end', () => {
    console.log('### DONE ###');
  });
}

也就是說,串流受控並將資料推播至讀取器。

42.3.2 Node.js 串流:透過非同步迭代 (拉取) 非同步

從 Node.js 10 開始,我們也可以使用非同步迭代從串流讀取

async function main(inputFilePath) {
  const readStream = fs.createReadStream(inputFilePath,
    { encoding: 'utf8', highWaterMark: 1024 });

  for await (const chunk of readStream) {
    console.log('>>> '+chunk);
  }
  console.log('### DONE ###');
}

這次,讀取器受控並從串流拉取資料。

42.3.3 範例:從區塊到行

Node.js 串流會對資料的區塊 (任意長度的片段) 進行迭代。下列非同步產生器會將區塊的非同步可迭代物件轉換為行的非同步可迭代物件

/**
 * Parameter: async iterable of chunks (strings)
 * Result: async iterable of lines (incl. newlines)
 */
async function* chunksToLines(chunksAsync) {
  let previous = '';
  for await (const chunk of chunksAsync) { // input
    previous += chunk;
    let eolIndex;
    while ((eolIndex = previous.indexOf('\n')) >= 0) {
      // line includes the EOL (Windows '\r\n' or Unix '\n')
      const line = previous.slice(0, eolIndex+1);
      yield line; // output
      previous = previous.slice(eolIndex+1);
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

讓我們將 chunksToLines() 套用至區塊的非同步可迭代物件 (由 chunkIterable() 產生)

async function* chunkIterable() {
  yield 'First\nSec';
  yield 'ond\nThird\nF';
  yield 'ourth';
}
const linesIterable = chunksToLines(chunkIterable());
assert.deepEqual(
  await asyncIterableToArray(linesIterable),
  [
    'First\n',
    'Second\n',
    'Third\n',
    'Fourth',
  ]);

現在我們有了行的非同步可迭代物件,我們可以使用前一個練習的解答 numberLines() 來為這些行編號

async function* numberLines(linesAsync) {
  let lineNumber = 1;
  for await (const line of linesAsync) {
    yield lineNumber + ': ' + line;
    lineNumber++;
  }
}
const numberedLines = numberLines(chunksToLines(chunkIterable()));
assert.deepEqual(
  await asyncIterableToArray(numberedLines),
  [
    '1: First\n',
    '2: Second\n',
    '3: Third\n',
    '4: Fourth',
  ]);