מקורות נתונים – המדריך המלא

ב-Streams API מוסבר איך משתמשים בשידורים קריאים, ניתנים לכתיבה ומבצעים טרנספורמציה.

ה-Streams API מאפשר לגשת באופן פרוגרמטי למקורות נתונים שהתקבלו ברשת או שנוצרו בכל אמצעי מקומי, ולעבד אותם באמצעות JavaScript. סטרימינג כולל פירוק של משאב שרוצים לקבל, לשלוח או להפוך למקטעים קטנים, ולאחר מכן לעבד את המקטעים האלה ביט אחר ביט. למרות שדפדפנים עושים זאת בכל מקרה כשמקבלים נכסים כמו HTML או סרטונים להצגה בדפי אינטרנט, היכולת הזו מעולם לא הייתה זמינה ל-JavaScript לפני ההשקה של fetch באמצעות שידורים חיים.

בעבר, אם רציתם לעבד משאב כלשהו (סרטון או קובץ טקסט וכו'), הייתם צריכים להוריד את הקובץ כולו, להמתין עד שהוא יעבור תהליך deseration לפורמט מתאים, ואז לעבד אותו. כשהשידורים זמינים ל-JavaScript, כל זה משתנה. עכשיו אפשר לעבד נתונים גולמיים באמצעות JavaScript בהדרגה, ברגע שהוא זמין אצל הלקוח, בלי ליצור מאגר נתונים זמני, מחרוזת או blob. נפתח מספר תרחישים לדוגמה, שאת חלקם אפרט:

  • אפקטים של וידאו: הכנסת אפקטים בזמן אמת לשידור וידאו קריא.
  • דחיסת נתונים (de)compress:העברת מקור נתונים של קובץ דרך זרם טרנספורמציה שדוחס אותו באופן סלקטיבי (ללא דחיסה).
  • פענוח תמונה: העברת זרם תגובת HTTP דרך זרם טרנספורמציה שמקודד בייטים לנתוני מפת סיביות (bitmap), ולאחר מכן דרך זרם טרנספורמציה אחר שמתרגם את מפות הביטים לקובצי PNG. אם היא מותקנת בתוך ה-handler של fetch של Service Worker, זה מאפשר למלא באופן שקוף פורמטים חדשים של תמונות, כמו AVIF.

תמיכת דפדפן

ReadableStream ו-WritableStream

תמיכה בדפדפן

  • 43
  • 14
  • 65
  • 10.1

מקור

TransformStream

תמיכה בדפדפן

  • 67
  • 79
  • 102
  • 14.1

מקור

מושגי ליבה

לפני שנתחיל בפירוט על הסוגים השונים של שידורים, אציג כמה מושגים מרכזיים.

גושים

מקטע הוא קטע נתונים יחיד שנכתב במקור נתונים או נקרא ממנו. היא יכולה להיות מכל סוג. זרמים יכולים גם להכיל מקטעים מסוגים שונים. ברוב המקרים, מקטע לא יהיה יחידת הנתונים האטומית ביותר של זרם נתון. לדוגמה, זרם של בייטים עשוי להכיל מקטעים שמכילים 16 יחידות Uint8ArrayKiB במקום בייטים בודדים.

זרמים שניתן לקרוא

מקור נתונים קריא מייצג מקור נתונים שממנו ניתן לקרוא. במילים אחרות, הנתונים יוצאים מזרם קריא. באופן כללי, שידור קריא הוא מופע של המחלקה ReadableStream.

שידורים ניתנים לכתיבה

מקור נתונים שניתן לכתוב מייצג יעד לנתונים שאפשר לכתוב בו. במילים אחרות, הנתונים נכנסים לזרם שניתן לכתוב. באופן כזה, זרם ניתן לכתיבה הוא מופע של המחלקה WritableStream.

טרנספורמציה של שידורים חיים

זרם טרנספורמציה מורכב מזוג זרמים: זרם שניתן לכתוב, שמוכר בתור הצד הניתן לכתיבה, וזרם קריא, שנקרא גם הצד הקריא. מטאפורה מהעולם האמיתי היא מתורגמן סימולטני שמתרגם משפה אחת לאחרת תוך כדי תנועה. באופן ספציפי לזרם הטרנספורמציה, כתיבה לצד שניתן לכתיבה גורמת לכך שנתונים חדשים יהיו זמינים לקריאה מהצד הקריא. באופן בטוח, כל אובייקט עם מאפיין writable ומאפיין readable יכול לשמש כזרם טרנספורמציה. עם זאת, המחלקה TransformStream הרגילה מאפשרת ליצור בקלות זוג כזה שמסובך כמו שצריך.

שרשראות צינור

עדכוני תוכן משמשים בעיקר על ידי חיבור ביניהם. את השידור הקריא אפשר לשדר ישירות לשידור שניתן לכתיבה, באמצעות השיטה pipeTo() של השידור הקריא, או לשדר אותו קודם דרך זרם טרנספורמציה אחד או יותר, באמצעות השיטה pipeThrough() של השידור הקריא. קבוצה של צינורות בסטרימינג יחד בצורה הזו נקראת שרשרת צינורות.

לחץ אחורי

אחרי בניית שרשרת צינורות, היא תפיץ אותות לגבי המהירות שבה המקטעים צריכים לעבור דרכה. אם שלב כלשהו בשרשרת עדיין לא יכול לקבל מקטעים, הוא מפיץ אות לאחור דרך שרשרת הצינורות, עד שבסופו של דבר מקבלים הודעה למקור המקורי להפסיק לייצר מקטעים במהירות רבה. התהליך הזה של נירמול נקרא לחץ לאחור.

טישרט

ניתן להתאים שידור קריא (בשם שלו לצורה של 'T' באותיות רישיות) באמצעות שיטת tee() שלו. הפעולה הזו תנעל את השידור, כלומר לא תוכלו יותר להשתמש בו באופן ישיר. עם זאת, ייווצר שני שידורים חדשים, בשם ענפים, שאפשר לצרוך בנפרד. קישור בין מכשירים חשוב גם הוא כי לא ניתן להריץ מחדש או להפעיל מחדש סטרימינג. מידע נוסף על כך בהמשך.

תרשים של שרשרת צינורות שמורכבת מזרם קריא שמגיע מקריאה ל-API לאחזור. לאחר מכן, הפלט מוזהב דרך זרם טרנספורמציה שפלט פלט שנשלח ולאחר מכן נשלח לדפדפן כדי לקבל את הזרם הקריא הראשון שמתקבל ואל המטמון של ה-Service Worker עבור השידור הקריא השני.
שרשרת צינורות.

המכניקה של זרם קריא

מקור נתונים קריא הוא מקור נתונים שמיוצג ב-JavaScript על ידי אובייקט ReadableStream שעובר ממקור בסיסי. ה-builder של ReadableStream() יוצר ומחזיר אובייקט זרם קריא מרכיבי ה-handler הנתונים. יש שני סוגים של מקור מידע:

  • מקורות בדחיפה דוחפים נתונים כל הזמן כשאתם פותחים אותם, והאחריות להתחיל, להשהות או לבטל את הגישה לשידור היא שלכם. לדוגמה, סטרימינג של וידאו בשידור חי, אירועים שנשלחו על ידי השרת או WebSockets.
  • כשאתם מתחברים למקורות של שליפת נתונים, אתם נדרשים לבקש מהם נתונים באופן מפורש. לדוגמה, פעולות HTTP באמצעות קריאות fetch() או XMLHttpRequest.

נתוני הסטרימינג נקראים ברצף בקטעים קטנים שנקראים מקטעים. המקטעים שמוצבים בשידור החי אמורים להופיע בתור. המשמעות היא שהם ממתינים בתור ומוכנים לקריאה. תור פנימי עוקב אחר המקטעים שעדיין לא נקראו.

אסטרטגיית סידור בתור היא אובייקט שקובע איך שידור צריך לאותת על לחץ לאחור על סמך המצב של התור הפנימי שלו. אסטרטגיית הסידור 'הבאים בתור' מקצה גודל לכל מקטע, ומשווה את הגודל הכולל של כל המקטעים בתור למספר מסוים, שנקרא סימן המים הגבוה.

הקטעים שבתוך השידור נקראים על ידי קורא. הקורא הזה מאחזר את הנתונים במקטעים נפרדים, ומאפשר לכם לבצע כל פעולה שתרצו לעשות בהם. הקורא וקוד העיבוד הנוסף שמתלווה אליו נקראים צרכן.

המבנה הבא בהקשר הזה נקרא בקר. לכל שידור קריא משויך בקר, כפי שהשם מרמז, הוא מאפשר לשלוט בשידור.

רק קורא אחד יכול לקרוא זרם בכל פעם. כאשר קורא נוצר ומתחיל לקרוא זרם (כלומר, הופך לקורא פעיל), הוא נעול אליו. אם אתם רוצים שקורא אחר יהשתלט על קריאת השידור, בדרך כלל תצטרכו לשחרר את הקורא הראשון לפני שתבצעו פעולה אחרת (אבל תוכלו להתחיל את השידור).

יצירת זרם קריא

כדי ליצור שידור קריא, מפעילים את ה-builder שלו ReadableStream(). לבנאי יש ארגומנט אופציונלי underlyingSource, שמייצג אובייקט עם שיטות ומאפיינים שמגדירים את ההתנהגות של מופע הזרם המובנה.

underlyingSource

ניתן לעשות זאת באמצעות השיטות האופציונליות הבאות שהוגדרו על ידי המפתח:

  • start(controller): קריאה מיידית לאחר בניית האובייקט. השיטה יכולה לגשת למקור של השידור, ולבצע כל פעולה אחרת שנדרשת כדי להגדיר את הפונקציונליות של השידור. אם התהליך יתבצע באופן אסינכרוני, השיטה תוכל להבטיח הצלחה או כישלון. הפרמטר controller שהועבר לשיטה הזו הוא ReadableStreamDefaultController.
  • pull(controller): אפשר להשתמש בה כדי לשלוט בשידור מיד עם אחזור של מקטעים נוספים. היא נקראת שוב ושוב כל עוד תור המקטעים הפנימי בזרם לא מלא, עד שהתור מגיע לסימן המים הגבוה. אם התוצאה של הקריאה ל-pull() היא הבטחה, לא תתבצע קריאה נוספת אל pull() עד שההבטחה תמומש. אם ההבטחה תידחה, הסטרימינג יהפוך לשגיאות.
  • cancel(reason): מתבצעת קריאה כשצרכן השידור מבטל את השידור.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

השדה ReadableStreamDefaultController תומך בשיטות הבאות:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

הארגומנט השני, שהוא אופציונלי, גם של הבנאי ReadableStream() הוא queuingStrategy. זהו אובייקט שמגדירים באופן אופציונלי אסטרטגיית תור לשידור החי, שמקבלת שני פרמטרים:

  • highWaterMark: מספר לא שלילי שמציין את סימן המים הגבוה של הזרם לפי אסטרטגיית תור זו.
  • size(chunk): פונקציה שמחשבת ומחזירה את הגודל הלא-שלילי הסופי של ערך המקטע הנתון. התוצאה משמשת לקביעת לחץ לאחור, המתבטאת באמצעות המאפיין ReadableStreamDefaultController.desiredSize המתאים. המדיניות גם קובעת מתי מתבצעת קריאה ל-method pull() של המקור הבסיסי.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

השיטות getReader() ו-read()

כדי לקרוא משידור קריא, נדרש קורא, שנקרא ReadableStreamDefaultReader. השיטה getReader() בממשק ReadableStream יוצרת קורא ונועלת את השידור אליו. בזמן שהשידור נעול, לא ניתן לרכוש קורא אחר עד שהקורא הזה ישוחרר.

השיטה read() בממשק ReadableStreamDefaultReader מחזירה הבטחה שמספקת גישה לקטע הבא בתור הפנימי של השידור. מתבצע אישור או דחייה עם תוצאה, בהתאם למצב השידור. אלה האפשרויות השונות:

  • אם מקטע זמין, ההבטחה תמומש באמצעות אובייקט בפורמט הזה
    { value: chunk, done: false }.
  • אם השידור ייסגר, ההבטחה תמומש באמצעות אובייקט בפורמט הזה
    { value: undefined, done: true }.
  • אם השידור מקבל שגיאה, ההבטחה תידחה עם השגיאה הרלוונטית.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

הנכס locked

תוכלו לבדוק אם שידור קריא נעול על ידי גישה למאפיין ReadableStream.locked שלו.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

דוגמאות קריאות של קוד שידור

דוגמת הקוד הבאה מציגה את כל השלבים בפעולה. קודם צריך ליצור ReadableStream שבארגומנט underlyingSource שלו (כלומר, המחלקה TimestampSource) מגדיר שיטה start(). השיטה הזו מורה ל-controller של השידור ל-enqueue() חותמת זמן בכל שנייה במהלך עשר שניות. בסיום, הוא מורה לבקר close() את השידור. כדי להשתמש בסטרימינג הזה, יוצרים קורא באמצעות השיטה getReader() וקוראים ל-read() עד שהשידור יהיה done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

איטרציה אסינכרונית

בדיקה של כל איטרציה של לולאה של read() אם מקור הנתונים הוא done, הוא לא ה-API הנוח ביותר. למרבה המזל, בקרוב תהיה דרך טובה יותר לעשות זאת: איטרציה אסינכרונית.

for await (const chunk of stream) {
  console.log(chunk);
}

פתרון זמני לשימוש באיטרציה אסינכרונית הוא יישום ההתנהגות עם polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

קישור של זרם קריא

השיטה tee() של הממשק ReadableStream קובעת את השידור הקריא הנוכחי, ומחזירה מערך של שני רכיבים שמכיל את שני ההסתעפויות שנוצרו כמכונות חדשות של ReadableStream. כך שני קוראים יכולים לקרוא זרם בו-זמנית. אפשר לעשות זאת, לדוגמה, ב-Service Worker אם רוצים לאחזר תגובה מהשרת ולהעביר אותה לדפדפן, אבל גם להעביר אותה למטמון של ה-service worker. מכיוון שלא ניתן להשתמש בגוף התשובה יותר מפעם אחת, יש צורך בשני עותקים כדי לעשות זאת. כדי לבטל את השידור, תצטרכו לבטל את שני ההסתעפויות שנוצרו. בדרך כלל, קישור של שידור סטרימינג ינעל אותו למשך פרק הזמן הזה, וכך קוראים אחרים לא יוכלו לנעול אותו.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

זרמים של בייטים שניתן לקרוא

בשידורים שמייצגים בייטים, גרסה מורחבת של השידור הקריא מסופקת כדי לטפל בבייטים ביעילות, במיוחד על ידי הקטנת מספר העותקים. סטרימינג של בייטים מאפשר לכם לרכוש קוראים שאוחסנו במאגר הנתונים הזמני (BYOB). הטמעת ברירת המחדל יכולה לספק מגוון של פלטים שונים, כמו מחרוזות או מאגרי נתונים זמניים במקרה של WebSockets, אבל סטרימינג של בייטים מבטיח פלט של בייטים. בנוסף, קוראי BYOB יכולים ליהנות מיתרונות יציבות. הסיבה לכך היא שאם מאגר מתנתק, הוא יכול להבטיח שלא כותבים פעמיים באותו מאגר נתונים זמני, וכך נמנעים מתנאי מרוצים. קוראי BYOB יכולים להפחית את מספר הפעמים שהדפדפן צריך להריץ איסוף אשפה כי הוא יכול לעשות שימוש חוזר במאגרי אחסון.

יצירת זרם בייטים קריא

אפשר ליצור זרם בייטים קריא על ידי העברת פרמטר type נוסף למבנה ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

המקור הבסיסי של זרם בייט קריא מקבל ReadableByteStreamController למניפולציה. השיטה ReadableByteStreamController.enqueue() מקבלת ארגומנט chunk שהערך שלו הוא ArrayBufferView. המאפיין ReadableByteStreamController.byobRequest מחזיר את בקשת המשיכה הנוכחית באמצעות BYOB, או null אם אין כזו. לבסוף, המאפיין ReadableByteStreamController.desiredSize מחזיר את הגודל הרצוי על מנת למלא את התור הפנימי של השידור המבוקר.

queuingStrategy

הארגומנט השני, שהוא אופציונלי, גם של הבנאי ReadableStream() הוא queuingStrategy. זהו אובייקט שיכול להגדיר באופן אופציונלי אסטרטגיית תור לשידור החי, שמקבלת פרמטר אחד:

  • highWaterMark: מספר בייטים לא שלילי שמציין את סימן המים הגבוה של הזרם באמצעות אסטרטגיית התור הזו. הערך הזה משמש לקביעת לחץ לאחור, המתבטא באמצעות המאפיין ReadableByteStreamController.desiredSize המתאים. המדיניות גם קובעת מתי מתבצעת קריאה ל-method pull() של המקור הבסיסי.

השיטות getReader() ו-read()

לאחר מכן אפשר לקבל גישה ל-ReadableStreamBYOBReader על ידי הגדרת הפרמטר mode בהתאם: ReadableStream.getReader({ mode: "byob" }). כך מתאפשרת שליטה מדויקת יותר על הקצאת מאגר הנתונים הזמני, על מנת להימנע מעותקים. כדי לקרוא מזרם הבייטים, צריך לקרוא ל-ReadableStreamBYOBReader.read(view), כאשר view הוא ArrayBufferView.

דוגמת קוד שידור בבייט ניתן לקריאה

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

הפונקציה הבאה מחזירה שידורים קריאים של בייטים, שמאפשרים קריאה יעילה באפס דפים של מערך שנוצר באופן אקראי. במקום להשתמש בגודל מקטע מוגדר מראש של 1,024, המערכת מנסה למלא את מאגר הנתונים הזמני שסופק על ידי המפתח, וכך להשיג שליטה מלאה.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

המכניקה של זרם שניתן לכתיבה

מקור נתונים ניתן לכתיבה הוא יעד שאליו אפשר לכתוב נתונים, שמיוצגים ב-JavaScript על ידי אובייקט WritableStream. כך ניתן לפשט את החלק העליון של ה-sink הבסיסי – sink ברמה נמוכה יותר של קלט/פלט (I/O) שאליו נכתבים נתונים גולמיים.

הנתונים נכתבים בשידור החי באמצעות מחבר, מקטע אחד בכל פעם. למקטע יכולות להיות כמה צורות, בדיוק כמו המקטעים בקורא. תוכלו להשתמש בכל קוד שתרצו כדי ליצור את המקטעים שמוכנים לכתיבה. המחבר, יחד עם הקוד המשויך, נקרא מפיק.

כאשר משתמש נוצר ומתחיל לכתוב בזרם (כותב פעיל), נאמר נעול עליו. רק מחבר אחד יכול לכתוב בזרם שניתן לכתיבה בו-זמנית. אם אתם רוצים שכותב אחר יתחיל לכתוב בזרם שלכם, בדרך כלל עליכם לשחרר אותו לפני שאתם מצרפים כותב אחר.

תור פנימי עוקב אחרי המקטעים שנכתבו בשידור, אבל עדיין לא עובדו על ידי ה-sink שבבסיס שלהם.

אסטרטגיית סידור בתור היא אובייקט שקובע איך שידור צריך לאותת על לחץ לאחור על סמך המצב של התור הפנימי שלו. אסטרטגיית הסידור 'הבאים בתור' מקצה גודל לכל מקטע, ומשווה את הגודל הכולל של כל המקטעים בתור למספר מסוים, שנקרא סימן המים הגבוה.

המבנה הסופי נקרא בקר. לכל שידור ניתן לכתיבה משויך בקר, שמאפשר לשלוט בשידור (לדוגמה, לבטל אותו).

יצירת זרם שניתן לכתוב

הממשק WritableStream של Streams API מספק הפשטה סטנדרטית לכתיבה של נתונים בסטרימינג ליעד, שנקרא sink. האובייקט הזה כולל לחיצה מובנית והוספה לתור. כדי ליצור זרם ניתן לכתיבה, מפעילים את ה-constructor שלו WritableStream(). יש לו פרמטר underlyingSink אופציונלי, שמייצג אובייקט עם שיטות ומאפיינים שמגדירים את ההתנהגות של מופע הזרם המובנה.

underlyingSink

השדה underlyingSink יכול לכלול את השיטות האופציונליות הבאות שהוגדרו על ידי המפתח. הפרמטר controller שמועבר לחלק מהשיטות הוא WritableStreamDefaultController.

  • start(controller): השיטה הזו מופעלת מיד לאחר יצירת האובייקט. המטרה של התוכן בשיטה הזו היא לקבל גישה ל-sink שבבסיס שלו. אם מבצעים את התהליך הזה באופן אסינכרוני, הוא עשוי להחזיר הבטחה להצלחה או לכשל.
  • write(chunk, controller): השיטה הזו תופעל כשמקטע נתונים חדש (שצוין בפרמטר chunk) יהיה מוכן לכתיבה ב-sink שבבסיס שלו. היא יכולה להחזיר הבטחה להצלחה או לכשל בפעולת הכתיבה. הקריאה לשיטה הזו תתבצע רק אחרי שפעולות קודמות יצליחו, ולעולם לא אחרי סיום השידור או הביטול שלו.
  • close(controller): השיטה הזו תיקרא אם האפליקציה תזהה שהיא סיימה לכתוב מקטעים לשידור. התוכן צריך לעשות את מה שנדרש כדי לסיים את הכתיבה ל-sink המקורי ולשחרר את הגישה אליו. במקרה שהתהליך הזה אסינכרוני, הוא יכול להחזיר הבטחה להצלחה או לכשל. תתבצע קריאה לשיטה הזו רק אחרי שכל פעולות הכתיבה שמופיעות בתור הצליחו.
  • abort(reason): השיטה הזו תיקרא אם האפליקציה תזהה שהיא רוצה לסגור בפתאומיות את השידור ולהעביר אותו למצב שגיאה. הוא יכול לנקות משאבים שמורים, כמו close(), אבל תתבצע קריאה ל-abort() גם אם הכתיבה נמצאת בתור. המקטעים האלה יושלכו הצידה. אם התהליך הוא אסינכרוני, הוא עשוי להחזיר הבטחה להצלחה או לכשל. הפרמטר reason מכיל DOMString שמתאר את הסיבה לביטול השידור.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

ממשק WritableStreamDefaultController של Streams API מייצג בקר שמאפשר שליטה על המצב של WritableStream במהלך ההגדרה, ככל ששולחים יותר מקטעים לצורך כתיבה או בסוף הכתיבה. כשיוצרים WritableStream, ה-sink שבבסיס ה-sink מקבל מכונת WritableStreamDefaultController תואמת, שצריך לבצע מניפולציה. ל-WritableStreamDefaultController יש רק שיטה אחת: WritableStreamDefaultController.error(), שגורמת לשגיאות עתידיות עם מקור הנתונים המשויך. WritableStreamDefaultController תומך גם במאפיין signal שמחזיר מופע של AbortSignal, וכך מאפשר להפסיק פעולה WritableStream במקרה הצורך.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

הארגומנט השני, שהוא אופציונלי, גם של הבנאי WritableStream() הוא queuingStrategy. זהו אובייקט שמגדירים באופן אופציונלי אסטרטגיית תור לשידור החי, שמקבלת שני פרמטרים:

  • highWaterMark: מספר לא שלילי שמציין את סימן המים הגבוה של הזרם לפי אסטרטגיית תור זו.
  • size(chunk): פונקציה שמחשבת ומחזירה את הגודל הלא-שלילי הסופי של ערך המקטע הנתון. התוצאה משמשת לקביעת לחץ לאחור, המתבטאת באמצעות המאפיין WritableStreamDefaultWriter.desiredSize המתאים.

השיטות getWriter() ו-write()

כדי לכתוב בזרם שניתן לכתיבה, נדרש כותב, שיהיה WritableStreamDefaultWriter. השיטה getWriter() בממשק WritableStream מחזירה מופע חדש של WritableStreamDefaultWriter ונועלת את השידור למכונה הזו. בזמן שהשידור נעול, לא ניתן לרכוש כותב אחר עד שהצופה הנוכחי ישוחרר.

השיטה write() בממשק WritableStreamDefaultWriter כותבת מקטע נתונים שהועבר ל-WritableStream ול-sink שעליה הוא מבוסס, ואז מחזירה הבטחה שמשקפת את ההצלחה או הכישלון של פעולת הכתיבה. חשוב לשים לב שהמשמעות של "הצלחה" תלויה בשורש הבעיה. היא עשויה להצביע על כך שהמקטע התקבל, ולא בהכרח שהוא נשמר בבטחה ביעד האולטימטיבי שלו.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

הנכס locked

כדי לבדוק אם שידור שניתן לכתיבה נעול, צריך לגשת לנכס WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

דוגמה של קוד שידור

דוגמת הקוד הבאה מציגה את כל השלבים בפעולה.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

העברת זרם קריא לזרם שניתן לכתיבה

ניתן לנתב שידור קריא לשידור שניתן לכתיבה באמצעות השיטה pipeTo() של השידור הקריא. ReadableStream.pipeTo() משדר את ReadableStream הנוכחי ל-WritableStream נתון ומחזיר הבטחה שמתממשת כשתהליך ה-piping יסתיים בהצלחה, או דוחה אם אירעו שגיאות.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

יצירת זרם טרנספורמציה

הממשק TransformStream של Streams API מייצג קבוצה של נתונים שניתנים לשינוי. כדי ליצור זרם טרנספורמציה, קוראים לבנאי שלו TransformStream(), שיוצר ומחזיר אובייקט של זרם טרנספורמציה מרכיבי ה-handler הנתונים. הבנאי TransformStream() מקבל כארגומנט הראשון שלו אובייקט JavaScript אופציונלי שמייצג את transformer. אובייקטים כאלה יכולים לכלול כל אחת מהשיטות הבאות:

transformer

  • start(controller): השיטה הזו מופעלת מיד לאחר יצירת האובייקט. בדרך כלל משתמשים בהגדרה הזו כדי להוסיף מקטעי קידומת לתור, באמצעות controller.enqueue(). הקטעים האלה ייקראו מהצד הקריא, אבל לא תלויים בכתיבה לצד הניתן לכתיבה. אם התהליך הראשוני הזה הוא אסינכרוני, למשל כי נדרש מאמץ מסוים כדי להשיג את מקטעי התחילית, הפונקציה יכולה להחזיר הבטחה להצלחה או לכישלון. הבטחה שנדחה תפגע בשידור. כל חריגת זריקה יושלכו מחדש על ידי ה-constructor של TransformStream().
  • transform(chunk, controller): השיטה הזו נקראת כאשר מקטע חדש שנכתב במקור בצד שניתן לכתיבה, מוכן לשינוי. הטמעת השידור מבטיחה שהפונקציה הזו תיקרא רק אחרי שהטרנספורמציות הקודמות הצליחו, ולא לפני השלמת start() או אחרי הקריאה ל-flush(). הפונקציה הזו מבצעת בפועל את עבודת הטרנספורמציה של זרם הטרנספורמציה. הוא יכול להוסיף את התוצאות לתור באמצעות controller.enqueue(). כך מתאפשרת למקטע אחד שנכתב בצד שניתן לכתיבה, והתוצאה תהיה אפס או מספר מקטעים בצד הקריא, בהתאם למספר הפעמים שנקרא controller.enqueue(). אם תהליך הטרנספורמציה הוא אסינכרוני, הפונקציה הזו יכולה להחזיר הבטחה להצלחה או לכשל של הטרנספורמציה. כאשר מבטיחים שהפעולה תידחה, יופיעו שגיאות גם בצד הקריא וגם בצד הניתן לכתיבה של השידור של הטרנספורמציה. אם לא סופקה שיטה transform(), נעשה שימוש בטרנספורמציה של זהות, שמסדרת מקטעים ללא שינוי מהצד הניתן לכתיבה לצד הקריא.
  • flush(controller): השיטה הזו נקראת אחרי שכל המקטעים שנכתבו בצד שניתן לכתיבה עברו שינוי על ידי מעבר בהצלחה דרך transform(), והצד הניתן לכתיבה עומד להיסגר. בדרך כלל משתמשים בהגדרה הזו כדי להוסיף מקטעי סיומות לצד הקריא, לפני שגם זה נסגר. אם תהליך השטיפה הוא אסינכרוני, הפונקציה יכולה להחזיר הבטחה להצלחה או לכשל. התוצאה תועבר למבצע של stream.writable.write(). בנוסף, הבטחה שנדחתה תפגע גם בצד הקריא וגם בצד הניתן לכתיבה של השידור. הצגת חריגות נחשבת כהחזרת הבטחה שנדחתה.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

השיטות של writableStrategy ו-readableStrategy בתור

הפרמטר השני והשלישי האופציונלי של ה-constructor של TransformStream() הם אסטרטגיות אופציונליות לתור writableStrategy ו-readableStrategy. הם מוגדרים כפי שמפורט בקטעים הקריאים ובקטע הניתן לכתיבה, בהתאמה.

דוגמה לטרנספורמציה של קוד השידור

דוגמת הקוד הבאה מציגה זרם טרנספורמציה פשוט בפעולה.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

הצמדת זרם קריא דרך זרם טרנספורמציה

השיטה pipeThrough() בממשק ReadableStream מספקת דרך שרשרת להצגת צינור עיבוד נתונים של השידור הנוכחי דרך זרם 'טרנספורמציה' או כל צמד אחר לכתיבה/לקריאה. בדרך כלל, צינור עיבוד נתונים ינעל אותו למשך הזמן של הצינור, כך שקוראים אחרים לא יוכלו לנעול אותו.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

דוגמת הקוד הבאה (קצת מתוחכמת) מראה איך אפשר להטמיע גרסה של fetch() לצעוק שכותבת את כל הטקסט באותיות גדולות, על ידי שימוש בהבטחת התשובה שהוחזרה כזרם והוספה של מקרא למקטעים. היתרון של הגישה הזו הוא שלא צריך להמתין עד להורדת המסמך כולו, דבר שיכול לעשות הבדל משמעותי בטיפול בקבצים גדולים.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

הדגמה (דמו)

ההדגמה הבאה מציגה עדכוני תוכן קריאים, ניתנים לכתיבה ושמתבצעת טרנספורמציה. הוא כולל גם דוגמאות של שרשראות צינורות pipeThrough() ו-pipeTo(), וגם מדגים tee(). אפשר גם להריץ את ההדגמה בחלון נפרד, או להציג את קוד המקור.

זרמים שימושיים זמינים בדפדפן

יש מספר זרמים שימושיים המובנים ישירות בדפדפן. אפשר ליצור בקלות ReadableStream מ-blob. השיטה stream() של הממשק Blob מחזירה ReadableStream, שלאחר הקריאה מחזירים את הנתונים שנכללים ב-blob. חשוב גם לזכור שאובייקט File הוא סוג ספציפי של Blob, ואפשר להשתמש בו בכל הקשר ש-blob יכול.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

וריאנטים של הסטרימינג של TextDecoder.decode() ו-TextEncoder.encode() מכונים TextDecoderStream ו-TextEncoderStream בהתאמה.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

קל לדחוס קובץ או לבטל את הדחיסה שלו באמצעות הטרנספורמציות של CompressionStream ושל DecompressionStream, בהתאמה. דוגמת הקוד הבאה ממחישה כיצד ניתן להוריד את המפרט של Streams, לדחוס אותו (gzip) ישירות מהדפדפן ולכתוב את הקובץ הדחוס ישירות בדיסק.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

FileSystemWritableFileStream של File System Access API ומקורות הבקשות של fetch() הניסיוניים הם דוגמאות לשידורים שניתן לכתוב בטבע.

ב-Serial API יש שימוש נרחב בשידורים קריאים וגם בשידורים שניתנים לכתיבה.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

לבסוף, ה-API של WebSocketStream משלב סטרימינג עם WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

משאבים שימושיים

אישורים

המאמר הזה נבדק על ידי ג'ייק ארצ'יבלד, פרנסואה בופורט, סם דוטון, מטיאס בולנס, סורמה, ג'ו מדלי ואדם רייס. הפוסטים בבלוג של ג'ייק ארצ'יבלד עזרו לי מאוד להבין סטרימינג. חלק מדוגמאות הקוד הן בהשראת החיפושים של משתמש @bellbind ב-GitHub, וחלקים מהפרוזה מבוססים במידה רבה על MDN Web Docs on Streams. הכותבים של Streams Standard עשו עבודה אדירה בכתיבת המפרט הזה, תמונה ראשית מאת ריאן לארה ב-UnFlood.