数据流 - 权威指南

了解如何通过 Streams API 使用可读、可写和转换流。

Streams API 可让您以编程方式访问通过网络接收或以任何方式在本地创建的数据流,并使用 JavaScript 对其进行处理。流式传输涉及将您要接收、发送或转换的资源分解为多个小数据块,然后逐条处理这些区块。虽然流式传输是浏览器在接收要在网页上显示的 HTML 或视频等资源时仍然会执行的操作,但在 2015 年引入带有数据流的 fetch 之前,JavaScript 从未提供此功能。

以前,如果要处理某种资源(例如视频或文本文件等),则必须下载整个文件,等待系统将其反序列化为合适的格式,然后进行处理。随着流可用于 JavaScript,这一切都会改变。现在,当原始数据在客户端上可用时,您可以使用 JavaScript 逐步处理原始数据,而无需生成缓冲区、字符串或 blob。这样一来,就可以解锁许多用例,我在下面列出了其中一些用例:

  • 视频效果:通过实时应用效果的转换流来传送可读的视频流。
  • 数据(解压缩):通过对文件流执行选择性(解压缩)处理的转换流,使用管道进行文件流处理。
  • 图片解码:通过将字节解码为位图数据的转换流传输 HTTP 响应流,然后通过另一个转换流将位图转换为 PNG。如果安装在 Service Worker 的 fetch 处理程序内,它可让您以透明的方式对新的图片格式(如 AVIF)执行 polyfill 操作。

浏览器支持

ReadableStream 和 WritableStream

浏览器支持

  • 43
  • 14
  • 65
  • 10.1

来源

TransformStream

浏览器支持

  • 67
  • 79
  • 102
  • 14.1

来源

核心概念

在详细介绍各种类型的直播之前,我先介绍一些核心概念。

块状

数据块是写入数据流或从中读取的单段数据。它可以是任何类型;数据流甚至可以包含不同类型的数据块。在大多数情况下,区块并不是给定数据流的最原子数据单元。例如,一个字节流可能包含由 16 KiB Uint8Array 单元(而非单个字节)组成的区块。

可读流

可读流表示您可以从中读取数据的数据源。换句话说,数据来自可读流。具体而言,可读流是 ReadableStream 类的一个实例。

可写流

可写流表示您可以写入的数据的目的地。换句话说,数据会进入可写流。具体而言,可写流是 WritableStream 类的一个实例。

转换数据流

转换流由一对数据流组成:可写流(称为可写侧)和可读流(称为可读端)。在现实世界中,它相当于一个同声传译者,他可以实时从一种语言翻译成另一种语言。根据转换流特有的方式,将新数据写入可写端会使新数据可从可读端读取。具体而言,任何具有 writable 属性和 readable 属性的对象都可以用作转换流。不过,使用标准 TransformStream 类可让您更轻松地创建这种正确交错的键值对。

管道链

使用信息流的主要方式是:通过管道在彼此之间传输数据。可以使用可读流的 pipeTo() 方法将可读流直接传输至可写流,也可以使用可读取流的 pipeThrough() 方法,先通过一个或多个转换流传输。以这种方式连接在一起的一组数据流称为管道链。

背压

构建管道链后,它会传播有关区块应多快流经的信号。如果链中的任意步骤还不能接受区块,它会通过管道链向后传播信号,直到最终告知原始来源如此快地停止生成区块。这一流标准化过程称为背压。

T 恤

可以使用其 tee() 方法对可读数据流进行 te 调试(以大写“T”的形状命名)。这会锁定相应的数据流,也就是说,使其无法再直接使用;不过,会创建两个可独立使用的称为分支的新数据流。连接状态也很重要,因为直播无法倒回或重新开始,后面会详细介绍。

管道链示意图,其中的可读流源自对 fetch API 的调用,然后该调用通过转换流传送,该转换流的输出被触发,然后发送到浏览器以获取第一个生成的可读流,并发送到 Service Worker 缓存以用于第二个生成的可读流。
管道链。

可读流的机制

可读流是在 JavaScript 中表示的数据源,由从底层来源流出的 ReadableStream 对象表示。ReadableStream() 构造函数会从指定的处理程序创建并返回可读流对象。底层源代码有两种类型:

  • 当您访问推送源时,推送源会不断向您推送数据,您可以自行决定是否启动、暂停或取消对数据流的访问。示例包括直播视频流、服务器发送的事件或 WebSocket。
  • 拉取来源要求您在连接到拉取来源后明确请求数据。示例包括通过 fetch()XMLHttpRequest 调用执行的 HTTP 操作。

流式数据以小块的形式依序读取,称为“区块”。放置在流中的块称为“加入队列”。这意味着它们正在等待在队列中等待读取。内部队列会跟踪尚未读取的区块。

队列策略是一个对象,用于确定数据流应如何根据内部队列的状态发出背压信号。队列策略会为每个分块分配大小,并将队列中所有分块的总大小与指定数字进行比较,称为“高水印”。

数据流中的分块由读取器读取。此读取器一次检索一个数据块,允许您对数据执行任何想要执行的操作。读取器加上与之一起的其他处理代码称为使用者。

此上下文中的下一个结构称为控制器。每个可读流都有一个关联的控制器,顾名思义,您可以通过该控制器控制该流。

一次只能有一个读取器读取一个数据流;当某个读取器创建完毕并开始读取某个数据流时(即成为一个活跃读取器),该读取器就会被锁定到该数据流。如果您希望其他读取器接管您的流式传输,则通常需要在执行任何其他操作之前释放第一个读取器(但您可以启动流式传输)。

创建可读流

您可通过调用其构造函数 ReadableStream() 来创建可读流。该构造函数包含一个可选参数 underlyingSource,它表示一个对象,其中包含的方法和属性用于定义构造的流实例的行为方式。

underlyingSource

这可以使用以下由开发者定义的可选方法:

  • start(controller):在构建对象时立即调用。此方法可以访问流来源,并执行设置流功能所需的其他任何操作。如果此过程是异步完成的,则该方法可以返回一个 promise 以指示成功或失败。传递给此方法的 controller 参数是一个 ReadableStreamDefaultController
  • pull(controller):可用于在提取更多分块时控制数据流。只要数据流的内部区块队列还未满,该队列就会反复调用,直至达到其高水位标记。如果调用 pull() 的结果是一个 promise,则在该 promise 执行之前,系统不会再次调用 pull()。如果 promise 拒绝,则数据流将出错。
  • cancel(reason):在数据流使用方取消数据流时调用。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController 支持以下方法:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() 构造函数的第二个参数是 queuingStrategy,同样为可选参数。它是一个对象,可以选择为数据流定义队列策略,该策略接受两个参数:

  • highWaterMark:一个非负数,表示使用此队列策略的视频流的高水印。
  • size(chunk):此函数用于计算并返回给定分块值的有限非负大小。 结果用于确定背压,通过相应的 ReadableStreamDefaultController.desiredSize 属性显现。它还控制何时调用底层来源的 pull() 方法。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader()read() 方法

如需从可读流读取数据,您需要一个读取器,它将是一个 ReadableStreamDefaultReaderReadableStream 接口的 getReader() 方法会创建一个读取器,并将数据流锁定到该读取器。当信息流被锁定时,在释放读取器之前,无法获取其他读取器。

ReadableStreamDefaultReader 接口的 read() 方法将返回一个 promise,该 promise 提供对流内部队列中的下一个分块的访问权限。根据流的状态,返回执行或拒绝结果。这些可能的原因如下所示:

  • 如果某个区块可用,则 promise 将使用形式为
    { value: chunk, done: false } 的对象执行。
  • 如果数据流关闭,则 promise 将使用形式为
    { value: undefined, done: true } 的对象执行。
  • 如果数据流发生错误,promise 将被拒绝并显示相关错误。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 属性

您可以通过访问可读流的 ReadableStream.locked 属性来检查可读流是否处于锁定状态。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可读的数据流代码示例

以下代码示例显示了所有操作步骤。您首先要创建一个 ReadableStream,其 underlyingSource 参数(即 TimestampSource 类)会定义 start() 方法。此方法会指示数据流的 controller 在 10 秒内每秒向 enqueue() 发出一个时间戳。最后,它会告知控制器对数据流执行 close() 操作。如需使用此数据流,您可以通过 getReader() 方法创建读取器,然后调用 read(),直到数据流处于 done 状态。

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

异步迭代

在每次 read() 循环迭代中检查数据流是否为 done 可能不是最方便的 API。幸运的是,我们很快就会推出一种更好的方法来实现此目的:异步迭代。

for await (const chunk of stream) {
  console.log(chunk);
}

目前使用异步迭代的一种解决方法是使用 polyfill 来实现该行为。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

启动可读流

ReadableStream 接口的 tee() 方法可启动当前可读流,并返回一个包含两个生成的分支的双元素数组作为新的 ReadableStream 实例。这样,两个读取器就可以同时读取数据流。例如,如果您要从服务器获取响应并将其流式传输到浏览器,但同时也想将其流式传输到 Service Worker 缓存,可以在 Service Worker 中执行此操作。由于响应正文无法使用一次,因此需要两个副本才能执行此操作。如需取消数据流,您随后需要取消两个生成的分支。通常情况下,将某个数据流设为 T 预订会在该时长内将其锁定,以防止其他读取方将其锁定。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

可读字节流

对于表示字节的流,我们提供了可读流的扩展版本,以高效处理字节,特别是通过最大限度地减少副本来高效处理字节。字节流支持获取自带缓冲区 (BYOB) 读取器。默认实现可以提供一系列不同的输出,例如字符串或数组缓冲区(就 WebSocket 而言),而字节流保证字节输出。此外,自带酒水读取器还具有稳定性优势。这是因为,如果缓冲区分离,则可以保证缓冲区不会两次写入同一个缓冲区,从而避免出现竞态条件。BYOB 读取器可以减少浏览器需要运行垃圾回收的次数,因为它可以重复使用缓冲区。

创建可读字节流

您可以通过向 ReadableStream() 构造函数传递一个额外的 type 参数来创建可读字节流。

new ReadableStream({ type: 'bytes' });

underlyingSource

系统会为可读字节流的底层来源提供 ReadableByteStreamController 来操控它。其 ReadableByteStreamController.enqueue() 方法接受值为 ArrayBufferViewchunk 参数。ReadableByteStreamController.byobRequest 属性会返回当前的 BYOB 拉取请求,如果没有,则返回 null。最后,ReadableByteStreamController.desiredSize 属性会返回所需的大小,以填充受控数据流的内部队列。

queuingStrategy

ReadableStream() 构造函数的第二个参数是 queuingStrategy,同样为可选参数。它是一个对象,可以选择为数据流定义队列策略,该策略采用一个参数:

  • highWaterMark:非负字节数,表示使用此队列策略的流的高水印。这用于确定背压,通过相应的 ReadableByteStreamController.desiredSize 属性显现。它还控制何时调用底层来源的 pull() 方法。

getReader()read() 方法

然后,您可以通过相应地设置 mode 参数来获取对 ReadableStreamBYOBReader 的访问权限:ReadableStream.getReader({ mode: "byob" })。这样可以更精确地控制缓冲区分配,以避免复制。如需从字节流读取数据,您需要调用 ReadableStreamBYOBReader.read(view),其中 viewArrayBufferView

可读的字节流代码示例

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

以下函数返回可读的字节流,允许对随机生成的数组进行高效的零复制读取。它不会使用预定的分块大小 (1,024),而是尝试填充开发者提供的缓冲区,从而实现完全控制。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

可写流的机制

可写流是您可以写入数据的目的地,在 JavaScript 中由 WritableStream 对象表示。这作为底层接收器(底层接收器是写入原始数据的较低级别的 I/O 接收器)之上的抽象。

数据通过写入器(一次一个分块)写入数据流。一个区块可以有多种形式,就像读取器中的区块一样。您可以使用任何代码来生成可供写入的区块;写入者和关联的代码称为“生产者”

当某个写入器创建完毕并开始向某个数据流写入数据时(活跃写入器),系统就会说该写入器被锁定到该数据流。一个可写流一次只能有一个写入器执行。如果您希望其他写入者开始写入您的数据流,则通常需要先将其释放,然后再添加其他写入者。

内部队列会跟踪已写入数据流但尚未由底层接收器处理的区块。

队列策略是一个对象,用于确定数据流应如何根据内部队列的状态发出背压信号。队列策略会为每个分块分配大小,并将队列中所有分块的总大小与指定数字进行比较,称为“高水印”。

最终构造称为控制器。每个可写流都有一个关联的控制器,您可以通过该控制器控制该流(例如中止该流)。

创建可写流

Streams API 的 WritableStream 接口提供了将流式数据写入目标位置(称为接收器)的标准抽象。此对象具有内置背压和排队功能。通过调用其构造函数 WritableStream() 来创建可写流。它包含一个可选的 underlyingSink 参数,该参数表示一个对象,其中包含的方法和属性用于定义构造的流实例的行为方式。

underlyingSink

underlyingSink 可以包含以下由开发者定义的可选方法。传递给某些方法的 controller 参数是 WritableStreamDefaultController

  • start(controller):构建对象时,系统会立即调用此方法。此方法的内容应旨在访问底层接收器。如果此过程是异步完成的,则可以返回一个 promise 以指示成功或失败。
  • write(chunk, controller):当新的数据块(在 chunk 参数中指定)准备好写入底层接收器时,将调用此方法。它可以返回一个 promise,以表示写入操作成功或失败。只有在前面的写入成功后,系统才会调用此方法,且在数据流关闭或中止后方可调用。
  • close(controller):如果应用发出信号来表明它已完成向数据流写入数据块,则调用此方法。这些内容应执行必要的操作,以最终确定对底层接收器的写入,并释放对该接收器的访问权限。如果此过程是异步进行的,则可以返回一个 promise 以指示成功或失败。只有在所有排入队列的写入都成功后,才会调用此方法。
  • abort(reason):如果应用发出信号来指示它想要突然关闭数据流并使其处于错误状态,则调用此方法。它可以清理所有保留的资源(类似于 close()),但即使写入操作排入队列,也会调用 abort()。这些区块会被丢弃。如果此过程是异步进行的,则可以返回一个 promise 以指示成功或失败。reason 参数包含一个 DOMString,用于说明数据流中止的原因。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API 的 WritableStreamDefaultController 接口代表了一个控制器,允许在设置期间、提交更多分块以进行写入时,或在写入结束时控制 WritableStream 的状态。构建 WritableStream 时,系统会向底层接收器提供相应的 WritableStreamDefaultController 实例以供操作。WritableStreamDefaultController 只有一种方法:WritableStreamDefaultController.error(),这会导致未来与关联的数据流的任何互动出现错误。WritableStreamDefaultController 还支持 signal 属性,该属性会返回 AbortSignal 的实例,从而可以在需要时停止 WritableStream 操作。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() 构造函数的第二个参数是 queuingStrategy,同样为可选参数。它是一个对象,可以选择为数据流定义队列策略,该策略接受两个参数:

  • highWaterMark:一个非负数,表示使用此队列策略的视频流的高水印。
  • size(chunk):此函数用于计算并返回给定分块值的有限非负大小。 结果用于确定背压,通过相应的 WritableStreamDefaultWriter.desiredSize 属性显现。

getWriter()write() 方法

如需写入可写流,您需要一个写入器,该写入器将是一个 WritableStreamDefaultWriterWritableStream 接口的 getWriter() 方法会返回 WritableStreamDefaultWriter 的新实例,并将数据流锁定到该实例。当该数据流被锁定时,在当前数据流被释放之前,无法获取其他写入方。

WritableStreamDefaultWriter 接口的 write() 方法会将传递的数据块写入 WritableStream 及其底层接收器,然后返回一个解析为指示写入操作成功或失败的 promise。请注意,“成功”的含义由底层接收器决定;可能表示区块已被接受,但不一定表示区块已安全保存到其最终目标位置。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 属性

您可以通过访问可写流的 WritableStream.locked 属性来检查该流是否已被锁定。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可写流代码示例

以下代码示例显示了所有操作步骤。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

将可读流传输到可写流

可以通过可读流的 pipeTo() 方法将可读流传输到可写流。ReadableStream.pipeTo() 将当前的 ReadableStream 通过管道传送到给定的 WritableStream,并返回一个 promise,该 promise 在管道过程成功完成时会执行,如果遇到任何错误,则会拒绝。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

创建转换流

Streams API 的 TransformStream 接口表示一组可转换的数据。可通过调用其构造函数 TransformStream() 来创建转换流,该函数会根据指定的处理程序创建并返回转换流对象。TransformStream() 构造函数会接受表示 transformer 的可选 JavaScript 对象作为其第一个参数。此类对象可以包含以下任一方法:

transformer

  • start(controller):构建对象时,系统会立即调用此方法。这通常用于通过 controller.enqueue() 将前缀区块加入队列。这些区块将从可读端读取,但不依赖于可写端的任何写入。如果此初始过程是异步的(例如,由于获取前缀区块需要一些工作),该函数可以返回一个 promise 来指示成功或失败;被拒绝的 promise 会对数据流错误。TransformStream() 构造函数会重新抛出抛出的任何异常。
  • transform(chunk, controller):当最初写入可写端的新分块准备好进行转换时,调用此方法。流实现可保证,此函数仅在先前的转换成功之后调用,且绝不会在 start() 完成或 flush() 被调用之后调用。此函数会执行转换流的实际转换工作。它可以使用 controller.enqueue() 将结果加入队列。这样可以允许写入可写端的单个分块在可读端生成零个或多个分块,具体取决于 controller.enqueue() 的调用次数。如果转换过程是异步的,则此函数可以返回一个 promise,以指示转换是成功还是失败。遭拒的 promise 会同时对转换流的可读端和可写端出错。如果未提供 transform() 方法,则会使用身份转换,它会将区块从可写端保持不变,排入可读端。
  • flush(controller):在写入可写端的所有分块通过成功传递 transform() 完成转换,且可写端即将关闭后,调用此方法。通常,这用于将后缀区块加入可读端队列,在该端也关闭之前。如果刷新过程是异步进行的,则该函数可以返回表示成功或失败的 promise;结果将传达给 stream.writable.write() 的调用方。此外,遭拒的 promise 会同时对数据流的可读端和可写端出错。抛出异常的处理方式与返回被拒的 promise 相同。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy 队列策略

TransformStream() 构造函数的第二个和第三个可选参数是可选的 writableStrategyreadableStrategy 队列策略。它们的定义分别如可读流和可写流部分所述。

转换数据流代码示例

以下代码示例演示了一个简单的转换流的实际运用。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

通过转换流传送可读流

ReadableStream 接口的 pipeThrough() 方法提供了一种可链接的方式,通过转换流或任何其他可写/可读对对当前数据流进行管道处理。通常,如果对某个数据流执行管道操作,该数据流会在管道运行期间被锁定,以防止其他读取程序将其锁定。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

下一个代码示例(有点刻意设计)展示了如何实现 fetch() 的“Souting”版本,通过将返回的响应 promise作为数据流来逐块转换,从而将所有文本都大写。这种方法的优势在于,您无需等待整个文档下载完毕,这在处理大型文件时会产生巨大的影响。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

演示

下面的演示显示了可读、可写和转换流的实际运用。此外,本页面还包含 pipeThrough()pipeTo() 管道链的示例,还演示了 tee()。您可以选择在自己的窗口中运行演示或查看源代码

浏览器中可用的实用信息流

浏览器中内置了许多有用的流。您可以从 blob 轻松创建 ReadableStreamBlob 接口的 stream() 方法会返回 ReadableStream,并在读取时返回 blob 中包含的数据。另外请注意,File 对象是 Blob 的特定种类,可以在 blob 支持的任何上下文中使用。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode() 的流式传输变体分别称为 TextDecoderStreamTextEncoderStream

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

分别使用 CompressionStreamDecompressionStream 转换流可以轻松地压缩或解压缩文件。以下代码示例展示了如何下载数据流规范,直接在浏览器中对其进行压缩 (gzip),以及将压缩文件直接写入磁盘。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream 和实验性 fetch() 请求流就是实际使用的可写流示例。

Serial API 大量使用了可读流和可写流。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最后,WebSocketStream API 将数据流与 WebSocket API 集成在一起。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

实用资源

致谢

本文由 Jake ArchibaldFrançois BeaufortSam DuttonMattias BuelensSurmaJoe MedleyAdam Rice 审核。 Jake Archibald 的博文在理解数据流方面有很大帮助。部分代码示例的灵感来自 GitHub 用户 @bellbind 的探索,以及 Streams 上的 MDN Web 文档中的大量文章部分。Streams Standard作者在编写此规范方面做了大量工作。主打图片由 Ryan LaraUnsplash 用户撰写。