Aprenda a usar streams legíveis, graváveis e de transformação com a API Streams.
Com a API Streams, é possível acessar programaticamente streams de dados recebidos pela rede
ou criados localmente por qualquer meio e
processá-los com JavaScript. O streaming envolve dividir um recurso que você quer receber, enviar ou transformar em pequenos blocos e, em seguida, processar esses blocos bit a bit. Embora o streaming seja algo
que os navegadores fazem ao receber recursos como HTML ou vídeos para exibição em páginas da Web, esse
recurso nunca estava disponível para JavaScript antes do fetch
com streams ser lançado em 2015.
Antes, se você quisesse processar um recurso de algum tipo (seja um vídeo, um arquivo de texto etc.), teria que fazer o download de todo o arquivo, esperar que ele fosse desserializado em um formato adequado e, então, processá-lo. Com os streams disponíveis para JavaScript, tudo isso muda. Agora é possível processar dados brutos com JavaScript progressivamente assim que eles são disponibilizados no cliente, sem precisar gerar um buffer, uma string ou um blob. Isso desbloqueia vários casos de uso, alguns dos quais listamos abaixo:
- Efeitos de vídeo: transmitir um stream de vídeo legível por um stream de transformação que aplica efeitos em tempo real.
- (des)compressão de dados: encadear um fluxo de arquivos por meio de um fluxo de transformação que o compacta seletivamente.
- Decodificação de imagens:canalização de um fluxo de resposta HTTP por meio de um fluxo de transformação que decodifica bytes
em dados de bitmap e, em seguida, por outro fluxo de transformação que traduz bitmaps em PNGs. Se
instalado dentro do gerenciador
fetch
de um service worker, você pode aplicar o polyfill de forma transparente a novos formatos de imagem, como o AVIF.
Suporte ao navegador
ReadableStream e WritableStream
TransformStream
Principais conceitos
Antes de entrar em detalhes sobre os vários tipos de stream, vou apresentar alguns conceitos principais.
Pedaços
Um bloco é um único dado que é gravado ou lido em um stream. Ele pode ser de qualquer
tipo. Os streams podem até mesmo conter blocos de tipos diferentes. Na maioria das vezes, um bloco não será a unidade de dados mais atômica para um determinado fluxo. Por exemplo, um stream de bytes pode conter blocos compostos de 16
unidades Uint8Array
de KiB, em vez de bytes únicos.
Streams legíveis
Um stream legível representa uma fonte de dados que você pode ler. Em outras palavras, os dados saem de um fluxo legível. Concretamente, um stream legível é uma instância da classe
ReadableStream
.
Streams graváveis
Um stream gravável representa um destino dos dados em que é possível gravar. Em outras palavras, os dados
vão para um fluxo gravável. Concretamente, um stream gravável é uma instância da
classe WritableStream
.
Transformar streams
Um stream de transformação consiste em um par de streams: um stream gravável, conhecido como lado gravável, e um stream legível, conhecido como lado legível.
Uma metáfora do mundo real para isso seria um intérprete simultâneo (link em inglês) que traduz de um idioma para outro instantaneamente.
De maneira específica para o fluxo de transformação, a gravação no lado gravável resulta na disponibilização de novos dados para leitura no lado legível. Concretamente, qualquer objeto com as propriedades writable
e readable
pode servir como um fluxo de transformação. No entanto, a classe TransformStream
padrão facilita a criação
de um par que esteja devidamente entrelaçado.
Correntes para canos
Os streams são usados principalmente por encadeamento entre eles. Um stream legível pode ser encaminhado diretamente
para um gravável, usando o método pipeTo()
do stream legível, ou por um
ou mais streams de transformação primeiro, usando o método pipeThrough()
do stream legível. Um conjunto de
fluxos reunidos dessa forma é chamado de corrente de tubos.
Contrapressão
Depois que uma cadeia de pipeline é construída, ela propaga sinais sobre a velocidade com que os blocos devem fluir por ela. Se alguma etapa na cadeia ainda não puder aceitar fragmentos, ela vai propagar um sinal para trás na cadeia de barras verticais, até que a fonte original seja instruída a parar de produzir fragmentos tão rápido. Esse processo de normalização do fluxo é chamado de contrapressão.
Tee
Um stream legível pode ser teed (nomeado de acordo com a forma de um "T" maiúsculo) usando o método tee()
.
Isso bloqueará o stream, ou seja, o tornará diretamente utilizável. No entanto, será criado dois novos streams, chamados ramificações, que podem ser consumidos de forma independente.
Isso também é importante porque as transmissões não podem ser retrocedidas ou reiniciadas. Saiba mais sobre isso mais adiante.
A mecânica de um stream legível
Um stream legível é uma fonte de dados representada em JavaScript por um objeto
ReadableStream
que
flui de uma fonte. O
construtor ReadableStream()
cria e retorna um objeto de stream legível dos gerenciadores fornecidos. Há dois
tipos de origem subjacente:
- As origens de push enviam constantemente dados quando são acessados, e cabe a você iniciar, pausar ou cancelar o acesso ao stream. Os exemplos incluem streams de vídeo ao vivo, eventos enviados pelo servidor ou WebSockets.
- As origens de pull exigem que você solicite explicitamente dados delas após a conexão. Os exemplos
incluem operações HTTP por meio de chamadas
fetch()
ouXMLHttpRequest
.
Os dados de stream são lidos sequencialmente em pequenas partes chamadas blocos. Os blocos colocados em um stream são considerados enfileirados. Isso significa que eles estão aguardando em uma fila pronta para leitura. Uma fila interna monitora os blocos que ainda não foram lidos.
Uma estratégia de enfileiramento é um objeto que determina como um stream deve sinalizar a pressão de retorno com base no estado da fila interna. A estratégia de enfileiramento atribui um tamanho a cada fragmento e compara o tamanho total de todos os blocos na fila com um número especificado, conhecido como marca d'água alta.
Os blocos dentro do stream são lidos por um leitor. Esse leitor recupera os dados um bloco por vez, permitindo que você faça qualquer tipo de operação que quiser fazer nele. O leitor e o outro código de processamento que o acompanha são chamados de consumidores.
A próxima construção nesse contexto é chamada de controlador. Cada stream legível tem um controlador associado que, como o nome sugere, permite controlá-lo.
Somente um leitor pode ler um stream por vez. Quando um leitor é criado e começa a ler um stream (ou seja, se torna um leitor ativo), ele é bloqueado a ele. Se você quiser que outro leitor assuma a leitura da sua transmissão, normalmente é necessário liberar o primeiro antes de fazer qualquer outra coisa, embora você possa tee de streams.
Como criar um stream legível
Para criar um stream legível, chame o construtor
ReadableStream()
dele.
O construtor tem um argumento opcional underlyingSource
, que representa um objeto
com métodos e propriedades que definem o comportamento da instância de stream construída.
O underlyingSource
Isso pode usar os seguintes métodos opcionais definidos pelo desenvolvedor:
start(controller)
: chamado imediatamente quando o objeto é construído. O método pode acessar a origem do stream e fazer qualquer outra coisa necessária para configurar a funcionalidade do stream. Se esse processo for feito de maneira assíncrona, o método poderá retornar uma promessa para indicar sucesso ou falha. O parâmetrocontroller
transmitido a esse método é umReadableStreamDefaultController
.pull(controller)
: pode ser usado para controlar o stream à medida que mais blocos são buscados. Ela é chamada repetidamente desde que a fila interna de blocos do stream não esteja cheia, até que a fila atinja a marca-d'água superior. Se o resultado da chamada depull()
for uma promessa,pull()
não será chamado novamente até que essa promessa seja atendida. Se a promessa for rejeitada, o stream vai apresentar um erro.cancel(reason)
: chamado quando o consumidor de stream cancela o stream.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
O ReadableStreamDefaultController
oferece suporte aos seguintes métodos:
ReadableStreamDefaultController.close()
fecha o stream associado.ReadableStreamDefaultController.enqueue()
enfileira uma determinada parte no stream associado.ReadableStreamDefaultController.error()
gera erros nas interações futuras com o stream associado.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
O queuingStrategy
O segundo argumento, também opcional, do construtor ReadableStream()
é queuingStrategy
.
É um objeto que define opcionalmente uma estratégia de enfileiramento para o stream, que usa dois
parâmetros:
highWaterMark
: um número não negativo que indica a marca-d'água do fluxo que está usando a estratégia de enfileiramento.size(chunk)
: função que calcula e retorna o tamanho não negativo finito do valor do bloco especificado. O resultado é usado para determinar a pressão de retorno, que se manifesta pela propriedadeReadableStreamDefaultController.desiredSize
adequada. Ele também controla quando o métodopull()
da fonte subjacente é chamado.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Os métodos getReader()
e read()
Para ler um stream legível, você precisa de um leitor, que será um
ReadableStreamDefaultReader
.
O método getReader()
da interface ReadableStream
cria um leitor e bloqueia o stream. Enquanto a transmissão estiver bloqueada, nenhum outro leitor poderá ser adquirido até que este seja liberado.
O método read()
da interface ReadableStreamDefaultReader
retorna uma promessa que fornece acesso ao próximo
bloco na fila interna do stream. Ela é atendida ou rejeitada com um resultado, dependendo do estado
do stream. As diferentes possibilidades são as seguintes:
- Se um bloco estiver disponível, a promessa será atendida com um objeto do formulário
{ value: chunk, done: false }
. - Se o stream for fechado, a promessa será atendida com um objeto no formato
{ value: undefined, done: true }
. - Se ocorrer um erro na transmissão, a promessa vai ser rejeitada com o erro relevante.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
A propriedade locked
Para verificar se um stream legível está bloqueado, acesse a
propriedade
ReadableStream.locked
dele.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Exemplos de código de stream legíveis
O exemplo de código abaixo mostra todas as etapas em ação. Primeiro, você cria um ReadableStream
que, no
argumento underlyingSource
(ou seja, a classe TimestampSource
), define um método start()
.
Esse método informa ao controller
do stream a
enqueue()
um carimbo de data/hora a cada segundo durante 10 segundos.
Por fim, ele instrui o controlador a close()
o stream. Você consome esse
stream criando um leitor por meio do método getReader()
e chamando read()
até que o stream seja
done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iteração assíncrona
Verificar cada iteração de repetição read()
se o stream é done
pode não ser a API mais conveniente.
Felizmente, em breve haverá uma maneira melhor de fazer isso: a iteração assíncrona.
for await (const chunk of stream) {
console.log(chunk);
}
Uma solução alternativa para usar a iteração assíncrona hoje é implementar o comportamento com um polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Como criar um stream legível
O método tee()
da
interface ReadableStream
mostra o fluxo legível atual, retornando uma matriz de dois elementos
contendo as duas ramificações resultantes como novas instâncias de ReadableStream
. Isso permite
que dois leitores leiam uma transmissão ao mesmo tempo. Isso pode ser feito, por exemplo, em um service worker se
quiser buscar uma resposta do servidor e transmiti-la para o navegador, mas também transmiti-la para o
cache do service worker. Como um corpo de resposta não pode ser consumido mais de uma vez, você precisa de duas cópias
para fazer isso. Para cancelar o fluxo, é necessário cancelar as duas ramificações resultantes. A prática de Tear de uma transmissão
geralmente bloqueia o stream, impedindo que outros leitores o bloqueiem.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Streams de bytes legíveis
Para streams que representam bytes, uma versão estendida do stream legível é fornecida para processar bytes de maneira eficiente, em especial a minimização de cópias. Os streams de bytes permitem a aquisição de leitores do tipo "traga seu próprio buffer" (BYOB, na sigla em inglês). A implementação padrão pode oferecer uma variedade de saídas diferentes, como strings ou buffers de matriz no caso de WebSockets, enquanto os streams de bytes garantem a saída de bytes. Além disso, leitores BYOB têm benefícios de estabilidade. Isso ocorre porque, se um buffer for removido, será possível garantir que ele não será gravado duas vezes no mesmo buffer duas vezes, evitando disputas. Os leitores BYOB podem reduzir o número de vezes que o navegador precisa executar a coleta de lixo, porque ele pode reutilizar buffers.
Como criar um stream de bytes legível
É possível criar um fluxo de bytes legível transmitindo um parâmetro type
extra para o
construtor ReadableStream()
.
new ReadableStream({ type: 'bytes' });
O underlyingSource
A fonte subjacente de um stream de bytes legível recebe um ReadableByteStreamController
para
manipular. O método ReadableByteStreamController.enqueue()
usa um argumento chunk
cujo valor
é ArrayBufferView
. A propriedade ReadableByteStreamController.byobRequest
retorna a solicitação de envio BYOB atual ou nula se não houver nenhuma. Por fim, a propriedade ReadableByteStreamController.desiredSize
retorna o tamanho desejado para preencher a fila interna do stream controlado.
O queuingStrategy
O segundo argumento, também opcional, do construtor ReadableStream()
é queuingStrategy
.
É um objeto que define opcionalmente uma estratégia de enfileiramento para o stream, que usa um
parâmetro:
highWaterMark
: um número não negativo de bytes que indica o sinal alto do stream que usa essa estratégia de enfileiramento. Isso é usado para determinar a pressão de retorno, manifestando-se por meio da propriedadeReadableByteStreamController.desiredSize
apropriada. Ele também controla quando o métodopull()
da fonte subjacente é chamado.
Os métodos getReader()
e read()
Em seguida, você pode ter acesso a um ReadableStreamBYOBReader
definindo o parâmetro mode
conforme necessário:
ReadableStream.getReader({ mode: "byob" })
. Isso permite um controle mais preciso sobre a alocação
do buffer para evitar cópias. Para ler o fluxo de bytes, você precisa chamar
ReadableStreamBYOBReader.read(view)
, em que view
é um
ArrayBufferView
.
Exemplo de código legível de stream de bytes
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
A função a seguir retorna fluxos de bytes legíveis que permitem uma leitura eficiente de cópia zero de uma matriz gerada aleatoriamente. Em vez de usar um tamanho de bloco predeterminado de 1.024, ele tenta preencher o buffer fornecido pelo desenvolvedor, permitindo controle total.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
A mecânica de um fluxo gravável
Um stream gravável é um destino em que é possível gravar dados, representado em JavaScript por um
objeto WritableStream
. Isso
serve como uma abstração sobre um coletor subjacente, que é um coletor de E/S de nível inferior em que
os dados brutos são gravados.
Os dados são gravados no stream por um gravador, um bloco por vez. Um bloco pode assumir várias formas, assim como os pedaços de um leitor. Você pode usar o código que quiser para produzir os blocos prontos para escrita. O gravador e o código associado são chamados de produtores.
Quando um gravador é criado e começa a gravar em um stream (um gravador ativo), ele é considerado bloqueado a ele. Somente um compositor por vez pode gravar em um stream gravável. Se você quiser que outro gravador comece a gravar em seu stream, normalmente você precisará liberá-lo antes de anexar outro gravador a ele.
Uma fila interna monitora os blocos que foram gravados no stream, mas ainda não foram processados pelo coletor subjacente.
Uma estratégia de enfileiramento é um objeto que determina como um stream deve sinalizar a pressão de retorno com base no estado da fila interna. A estratégia de enfileiramento atribui um tamanho a cada fragmento e compara o tamanho total de todos os blocos na fila com um número especificado, conhecido como marca d'água alta.
A construção final é chamada de controlador. Cada stream gravável tem um controlador associado que permite controlar o stream (por exemplo, anulá-lo).
Como criar um stream gravável
A interface WritableStream
da
API Streams fornece uma abstração padrão para gravar dados de streaming em um destino, conhecido
como coletor. Este objeto vem com backpressure e enfileiramento integrados. Para criar um stream gravável,
chame o construtor
WritableStream()
dele.
Ele tem um parâmetro underlyingSink
opcional, que representa um objeto
com métodos e propriedades que definem o comportamento da instância de stream construída.
O underlyingSink
O underlyingSink
pode incluir os seguintes métodos opcionais definidos pelo desenvolvedor. O parâmetro controller
transmitido a alguns dos métodos é um
WritableStreamDefaultController
.
start(controller)
: esse método é chamado imediatamente quando o objeto é construído. O conteúdo desse método precisa ter acesso ao coletor subjacente. Se esse processo for feito de maneira assíncrona, ele poderá retornar uma promessa para indicar sucesso ou falha.write(chunk, controller)
: este método será chamado quando um novo bloco de dados (especificado no parâmetrochunk
) estiver pronto para ser gravado no coletor subjacente. Ele pode retornar uma promessa para sinalizar o sucesso ou a falha da operação de gravação. Esse método é chamado somente após as gravações anteriores serem bem-sucedidas, e nunca depois que o stream for fechado ou cancelado.close(controller)
: esse método será chamado se o app sinalizar que terminou de gravar blocos no stream. O conteúdo precisa fazer o que for necessário para finalizar as gravações no coletor subjacente e liberar o acesso a ele. Se esse processo for assíncrono, ele poderá retornar uma promessa para sinalizar sucesso ou falha. Esse método só será chamado depois que todas as gravações na fila forem bem-sucedidas.abort(reason)
: este método será chamado se o app indicar que quer fechar abruptamente o stream e colocá-lo em um estado de erro. Ele pode limpar todos os recursos retidos, comoclose()
, masabort()
será chamado mesmo que as gravações estejam na fila. Esses pedaços serão jogados fora. Se esse processo for assíncrono, ele poderá retornar uma promessa para sinalizar sucesso ou falha. O parâmetroreason
contém umDOMString
que descreve por que o stream foi cancelado.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
A interface
WritableStreamDefaultController
da API Streams representa um controlador que permite o controle do estado de um WritableStream
durante a configuração, à medida que mais blocos são enviados para gravação ou no fim do processo. Ao criar um WritableStream
, o coletor subjacente recebe uma instância WritableStreamDefaultController
correspondente para manipular. O WritableStreamDefaultController
tem apenas um método:
WritableStreamDefaultController.error()
,
o que causa erros em interações futuras com o stream associado.
WritableStreamDefaultController
também oferece suporte a uma propriedade signal
que retorna uma instância de
AbortSignal
,
permitindo que uma operação WritableStream
seja interrompida, se necessário.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
O queuingStrategy
O segundo argumento, também opcional, do construtor WritableStream()
é queuingStrategy
.
É um objeto que define opcionalmente uma estratégia de enfileiramento para o stream, que usa dois
parâmetros:
highWaterMark
: um número não negativo que indica a marca-d'água do fluxo que está usando a estratégia de enfileiramento.size(chunk)
: função que calcula e retorna o tamanho não negativo finito do valor do bloco especificado. O resultado é usado para determinar a pressão de retorno, que se manifesta pela propriedadeWritableStreamDefaultWriter.desiredSize
adequada.
Os métodos getWriter()
e write()
Para gravar em um stream gravável, você precisa de um gravador, que será um
WritableStreamDefaultWriter
. O método getWriter()
da interface WritableStream
retorna uma
nova instância de WritableStreamDefaultWriter
e bloqueia o stream para essa instância. Enquanto o
stream estiver bloqueado, nenhum outro gravador poderá ser adquirido até que o atual seja liberado.
O método write()
da interface WritableStreamDefaultWriter
grava um bloco de dados transmitido em um WritableStream
e no coletor subjacente. Em seguida, retorna uma promessa que é resolvida para indicar o sucesso ou a falha da operação de gravação. Observe que o que
"sucesso" significa depende do coletor subjacente. Isso pode indicar que o bloco foi aceito
e não necessariamente que está salvo com segurança no destino final.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
A propriedade locked
Para conferir se um stream gravável está bloqueado, acesse a
propriedade
WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Exemplo de código de stream gravável
O exemplo de código abaixo mostra todas as etapas em ação.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Como encaminhar um stream legível para um stream gravável
Um stream legível pode ser encaminhado para um stream gravável pelo método
pipeTo()
dele.
ReadableStream.pipeTo()
encaminha o ReadableStream
atual para um determinado WritableStream
e retorna uma
promessa que será atendida quando o processo de pipeline for concluído ou será rejeitada se algum erro for
encontrado.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Como criar um stream de transformação
A interface TransformStream
da API Streams representa um conjunto de dados transformáveis. Você cria um stream de transformação chamando seu construtor TransformStream()
, que cria e retorna um objeto de fluxo de transformação a partir dos gerenciadores fornecidos. O construtor TransformStream()
aceita como
primeiro argumento um objeto JavaScript opcional que representa o transformer
. Esses objetos podem
conter qualquer um dos seguintes métodos:
O transformer
start(controller)
: esse método é chamado imediatamente quando o objeto é construído. Normalmente, isso é usado para enfileirar blocos de prefixo, usandocontroller.enqueue()
. Esses blocos serão lidos do lado legível, mas não dependem de nenhuma gravação no lado gravável. Se esse processo inicial for assíncrono, por exemplo, porque é necessário algum esforço para adquirir os blocos de prefixo, a função poderá retornar uma promessa para sinalizar sucesso ou falha. Uma promessa rejeitada vai gerar um erro no stream. Todas as exceções geradas serão geradas novamente pelo construtorTransformStream()
.transform(chunk, controller)
: este método é chamado quando um novo bloco originalmente gravado no lado gravável está pronto para ser transformado. A implementação de stream garante que essa função seja chamada somente depois que as transformações anteriores forem bem-sucedidas, e nunca antes da conclusão destart()
ou apósflush()
ter sido chamado. Essa função executa o trabalho de transformação real do fluxo de transformação. Ele pode enfileirar os resultados usandocontroller.enqueue()
. Isso permite que um único bloco gravado no lado gravável resulte em zero ou vários fragmentos no lado legível, dependendo de quantas vezescontroller.enqueue()
é chamado. Se o processo de transformação for assíncrono, essa função poderá retornar uma promessa para sinalizar o sucesso ou o fracasso da transformação. Uma promessa rejeitada vai gerar erros nos lados legível e gravável do fluxo de transformação. Se nenhum métodotransform()
for fornecido, a transformação de identidade será usada, o que filtra os blocos inalterados do lado gravável para o lado legível.flush(controller)
: esse método é chamado depois que todos os blocos gravados no lado gravável foram transformados pela passagem detransform()
e o lado gravável está prestes a ser fechado. Normalmente, isso é usado para enfileirar blocos de sufixo para o lado legível, antes que ele também se torne fechado. Se o processo de limpeza for assíncrono, a função poderá retornar uma promessa para sinalizar sucesso ou falha. O resultado será comunicado ao autor da chamada destream.writable.write()
. Além disso, uma promessa rejeitada vai gerar erros nos lados legível e gravável do stream. Gerar uma exceção é tratado da mesma forma que retornar uma promessa rejeitada.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Estratégias de enfileiramento de writableStrategy
e readableStrategy
O segundo e o terceiro parâmetros opcionais do construtor TransformStream()
são estratégias de enfileiramento de writableStrategy
e readableStrategy
opcionais. Eles são definidos conforme descrito nas
seções de stream legível e gravável,
respectivamente.
Exemplo de código de fluxo de transformação
O exemplo de código a seguir mostra um stream de transformação simples em ação.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Como encadear um stream legível por um stream de transformação
O método pipeThrough()
da interface ReadableStream
fornece uma maneira encadeada de canalizar o stream atual
através de um stream de transformação ou qualquer outro par gravável/legível. O pipe de um stream geralmente o bloqueia
durante o pipeline, impedindo que outros leitores o bloqueiem.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
O próximo exemplo de código (um pouco complicado) mostra como você pode implementar uma versão "gritando" de fetch()
que coloca todo o texto em maiúsculas, consumindo a promessa de resposta retornada
como um stream
e colocando em letras maiúsculas por parte. A vantagem dessa abordagem é que você não precisa esperar o download do documento inteiro, o que pode fazer uma enorme diferença ao lidar com arquivos grandes.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demonstração
A demonstração abaixo mostra streams legíveis, graváveis e de transformação em ação. Ele também inclui exemplos
de cadeias de barra vertical pipeThrough()
e pipeTo()
e também demonstra tee()
. Se preferir, execute a demonstração na própria janela ou veja o código-fonte.
Streams úteis disponíveis no navegador
Há diversos streams úteis integrados ao navegador. É fácil criar um
ReadableStream
a partir de um blob. O método stream() da interface Blob
retorna um ReadableStream
que, durante a leitura, retorna os dados contidos no blob. Lembre-se também de que um objeto
File
é um tipo específico de
Blob
e pode ser usado em qualquer contexto possível de um blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
As variantes de streaming de TextDecoder.decode()
e TextEncoder.encode()
são chamadas de
TextDecoderStream
e
TextEncoderStream
, respectivamente.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Compactar ou descompactar um arquivo é fácil com os streams de transformação CompressionStream
e DecompressionStream
, respectivamente. O exemplo de código abaixo mostra como fazer o download da especificação de streams, compactá-la (gzip) diretamente no navegador e gravar o arquivo compactado diretamente no disco.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
O FileSystemWritableFileStream
da API File System Access
e os fluxos de solicitação fetch()
experimentais são
exemplos de streams graváveis em geral.
A API Serial faz um uso intenso de fluxos legíveis e graváveis.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Por fim, a API WebSocketStream
integra fluxos à API WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Recursos úteis
- Especificação de streams
- Demonstrações complementares
- Polyfill de streams
- 2016: o ano dos fluxos da Web
- Geradores e iteradores assíncronos
- Visualizador de streaming
Agradecimentos
Este artigo foi revisado por Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley e Adam Rice. As postagens do blog de Jake Archibald me ajudaram muito a entender os fluxos. Alguns dos exemplos de código são inspirados nas análises detalhadas do usuário do GitHub @bellbind, e partes da prosa são elaboradas com base nos Documentos da Web do MDN em streams (links em inglês). Os autores do Streams Standard (em inglês) fizeram um ótimo trabalho ao escrever essa especificação. Imagem principal de Ryan Lara no Unsplash (links em inglês).