新規投稿のお知らせを受信されたい方は、サブスクリプションをご登録ください:

JavaScript用のより優れたStream APIに値する

2026-02-27

24分で読了
この投稿はEnglishおよび한국어でも表示されます。

このコンテンツは自動機械翻訳サービスによる翻訳版であり、皆さまの便宜のために提供しています。原本の英語版と異なる誤り、省略、解釈の微妙な違いが含まれる場合があります。ご不明な点がある場合は、英語版原本をご確認ください。

ストリームでデータを処理することは、アプリケーションを構築する方法の基本です。あらゆる場所でストリーミングを機能させるために、WHATWG Streams Standard(非公式には「Webストリーム」として知られる)は、ブラウザとサーバー間で動作する共通のAPIを確立するように設計されました。ブラウザに出荷され、Cloudflare Workers、Node.js、Deno、Bunそして、fetch()のようなAPIの基盤となりました。これは大仕事で、設計した人たちは、当時できていた制約とツールで難題を解決していました。

しかし、長年Webストリームを構築し、Node.jsとCloudflare Workersの両方で実装し、お客様とランタイムの本番環境の問題をデバッグし、開発者があまりにも多くのよくある落とし穴を通過できるようにしてきた結果、標準的なAPIには多くのリスクがあると確信するようになりました。段階的な改善だけでは簡単に修正できない基本的なユーザビリティとパフォーマンスの問題を解決することができます。問題はバグではなく、それらは、10年前なら理にかなっていたかもしれない設計上の判断の結果ですが、現在のJavaScript開発者のコードの書き方とは一致しません。

この記事では、Webストリームの基本的な問題をいくつか探り、JavaScriptの言語プリミティブに関連して構築された代替アプローチを紹介して、もっと良いものになることを実証します。

ベンチマークでは、この代替手段は、Webストリームよりも2倍から120倍速く、私がテストしたすべてのランタイム(Cloudflare Workers、Node.jsなど)で実行できます。Deno、Bun、主要ブラウザなど)に対応しています。この改善は、賢い最適化によるものではなく、最新のJavaScript言語機能をより効果的に活用する、根本的に異なる設計の選択によるものです。私は、前の仕事を否定するためにここにいます。それでは、次に展開するものについてお話しましょう。

どのような場所から来ているのか

Streams Standardは、2014年から2016年にかけて「低レベルのI/Oプリミティブに効率的にマッピングするデータストリームを作成、構成、消費するためのAPI」を提供するという野心的な目標を掲げて開発されました。Webストリーム以前は、Webプラットフォームはストリーミングデータを処理する標準的な方法を持ち合わせていませんでした。

Node.jsは、ブラウザでも動作するように移植された当時、すでに独自のストリーミングAPIを持っていましたが、WHATWGはWebブラウザのニーズのみを考慮するという憲章があるため、出発点としてそれを使用しないことを選択しました。サーバー側のランタイムがWebストリームを採用したのはその後、Cloudflare WorkersとDenoがそれぞれ第一級のWebストリームサポートで登場し、クロスランタイム互換性が優先されるようになった後です。

Webストリームの設計は、JavaScriptの非同期反復に進化するものです。for await...of構文は、Streams Standardが最初に最終化されてから2年後のES2018まで出現しませんでした。このタイミングは、APIが最初に、JavaScriptで非同期シーケンスを消費するための逸脱的な方法となるものを利用できないことを意味します。代わりに、Specは独自のリーダー/ライターの獲得モデルを導入し、その決定はAPIのあらゆる側面に影響を与えました。

通常業務のための過剰なセレモニー

ストリームでの最も一般的なタスクは、完了までを読み取ることです。Webストリームでは次のようになります:

// First, we acquire a reader that gives an exclusive lock
// on the stream...
const reader = stream.getReader();
const chunks = [];
try {
  // Second, we repeatedly call read and await on the returned
  // promise to either yield a chunk of data or indicate we're
  // done.
  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    chunks.push(value);
  }
} finally {
  // Finally, we release the lock on the stream
  reader.releaseLock();
}

このパターンは、ストリーミング固有のものだと思うかもしれません。しかし、そうではありません。リーダーの取得、ロック管理、{ value, done }プロトコルはすべて設計上の選択肢に過ぎず、要件ではありません。これらは、Webストリーム仕様がいつ、どのように書かれたかを示す人工知能です。非同期反復は、時間の経過とともに到着するシーケンスを処理するために正確に存在しますが、Streamの仕様が書かれた時点では、非同期反復はまだ存在していませんでした。ここでの複雑さは、純粋なAPIのオーバーヘッドであり、根本的な必要性ではありません。

Webストリームがfor await...ofをサポートするようになったので、代替アプローチを検討してください。

const chunks = [];
for await (const chunk of stream) {
  chunks.push(chunk);
}

定型文がはるかに少ないという点では優れていますが、すべてを解決するわけではありません。非同期イテレーションは、想定外のAPIに後付けされ、その結果を示しています。BYOB(独自のバッファの使用)読み取りのような機能は、イテレーションを通じてアクセスできません。リーダー、ロック、コントローラーなど、根本にある複雑さは依然あり、隠れているだけです。何か問題が発生した場合、またはAPIの追加機能が必要になる場合、開発者は元のAPIの雑草に戻り、なぜストリームが「ロック」されているのか、なぜreleaseLock()が期待していたことをしなかったのかを理解しようとしたり、制御できないコードのボトルネックを探し出したりすることになります。

ロックの問題

Webストリームは、ロックモデルを使用して、複数のコンシューマーが読み取りをインターリーブするのを防ぎます。getReader()を呼び出すと、ストリームはロックされます。ロックされている間、ストリームから直接読み込むことや、パイプを通すこと、またはキャンセルすることさえできません。唯一できるのは、実際にリーダーを保持しているコードだけです。

これは、簡単に間違っているのを知るまでは、理にかなっています。

async function peekFirstChunk(stream) {
  const reader = stream.getReader();
  const { value } = await reader.read();
  // Oops — forgot to call reader.releaseLock()
  // And the reader is no longer available when we return
  return value;
}

const first = await peekFirstChunk(stream);
// TypeError: Cannot obtain lock — stream is permanently locked
for await (const chunk of stream) { /* never runs */ }

releaseLock()を忘れるとストリームは恒久的に中断されます。lockedプロパティは、ストリームがロックされていることを示しますが、なぜ、誰が、ロックがまだ使用可能かどうかは示しません。パイプ処理は内部的にロックを取得し、パイプの操作中、明らかではない方法でストリームを使用できないようにします。

また、読み取り保留中のロックを解除することに関する意味も長年不明瞭でした。read()を呼び出したのが待っていず、releaseLock()を呼び出した場合、どうなるのでしょうか?この仕様は、最近、ロックリリース時に保留中読み取りをキャンセルするように明確化されましたが、実装にはさまざまなものがあり、以前の特定されていない動作に依存したコードは壊れる可能性があります。

とはいえ、ロックイン自体は悪ではないという認識は重要です。実際、アプリケーションが適切かつ順番にデータを消費または生成するようにするための重要な目的を担っています。主な課題は、getReader()releaseLock()などのAPIを使用して手動で実装されたものです。自動ロックと非同期反復可能読者の登場により、ユーザーの観点からロック対応が遥かに容易になりました。

実装者にとっては、ロックモデルはかなりの量の自明でない内部帳簿管理を追加します。すべての操作はロック状態を確認し、リーダーを追跡する必要があり、ロック、キャンセル、エラー状態間の相互作用によって、すべて正しく処理しなければならないエッジケースのマトリックスが作成されます。

BYOB:メリットのない複雑さ

BYOB(Bring Your Own Buffer)の読み取りは、開発者がストリームから読み取るときにメモリバッファを再利用できるように設計されており、高スループットシナリオ向けの重要な最適化です。アイデアは健全です。各チャンクに新しいバッファを割り当てるのではなく、独自のバッファを提供し、ストリームがそれを入力するのです。

実際には、(そしてもちろん、常に例外があります)、BYOBは測定可能な利益を得るために使用されることはほとんどありません。APIは、デフォルトの読み取りよりもはるかに複雑で、別のリーダータイプ(ReadableStreamBYOBReader)と他の特殊なクラス(例:ReadableStreamBYOBRequest)、バッファライフサイクルの慎重な管理、およびArrayBuffer分離の意味の理解が必要です。BYOB読み取りにバッファを渡すと、バッファはデプロイされ、ストリームに転送され、潜在的に異なるメモリ上で別のビューを取り戻すことができます。この移転ベースのモデルは、エラーが発生しやすく、混乱を招くこともあります。

const reader = stream.getReader({ mode: 'byob' });
const buffer = new ArrayBuffer(1024);
let view = new Uint8Array(buffer);

const result = await reader.read(view);
// 'view' should now be detached and unusable
// (it isn't always in every impl)
// result.value is a NEW view, possibly over different memory
view = result.value; // Must reassign

また、BYOBは非同期イテレーションやTransformStreamsでは使用できないため、ゼロコピーでの読み取りを望む開発者は手動のリーダーループに戻されます。

実装者にとって、BYOBはかなり複雑さを増します。ストリームは、保留中のBYOBリクエストを追跡し、部分的なフィルを処理し、バッファ分離を正しく管理し、BYOBリーダーと基盤となるソースの間で調整する必要があります。読み取り可能なバイトストリームのWebプラットフォームテストには、BYOBエッジケース専用のテストファイルが含まれます。デタッチされたバッファ、不正なビュー、エンキュー後の応答順序などです。

BYOBはユーザーと実装者の両方にとって複雑になるものの、実際にはほとんど採用されていません。ほとんどの開発者は、デフォルトの読み取りにこだわって、割り当てのオーバーヘッドを受け入れます。

カスタムReadableStreamインスタンスのほとんどのユーザーランド実装では、デフォルトとBYOBの両方の読み取りサポートを単一ストリームで正しく実装するために必要なすべてのセレモニーを気にすることはありません。それには十分な理由があるのです。正確な情報を得るのは難しく、時間のかかるコードのほとんどはデフォルトの読み取りパスに頼らざるを得ません。以下の例は、「正しい」実装によって必要とされるものを示しています。これは、規模が大きく、複雑で、エラーが発生しやすく、一般的な開発者が本当に対処しなければならないレベルの複雑さではありません。

new ReadableStream({
    type: 'bytes',
    
    async pull(controller: ReadableByteStreamController) {      
      if (offset >= totalBytes) {
        controller.close();
        return;
      }
      
      // Check for BYOB request FIRST
      const byobRequest = controller.byobRequest;
      
      if (byobRequest) {
        // === BYOB PATH ===
        // Consumer provided a buffer - we MUST fill it (or part of it)
        const view = byobRequest.view!;
        const bytesAvailable = totalBytes - offset;
        const bytesToWrite = Math.min(view.byteLength, bytesAvailable);
        
        // Create a view into the consumer's buffer and fill it
        // not critical but safer when bytesToWrite != view.byteLength
        const dest = new Uint8Array(
          view.buffer,
          view.byteOffset,
          bytesToWrite
        );
        
        // Fill with sequential bytes (our "data source")
        // Can be any thing here that writes into the view
        for (let i = 0; i < bytesToWrite; i++) {
          dest[i] = (offset + i) & 0xFF;
        }
        
        offset += bytesToWrite;
        
        // Signal how many bytes we wrote
        byobRequest.respond(bytesToWrite);
        
      } else {
        // === DEFAULT READER PATH ===
        // No BYOB request - allocate and enqueue a chunk
        const bytesAvailable = totalBytes - offset;
        const chunkSize = Math.min(1024, bytesAvailable);
        
        const chunk = new Uint8Array(chunkSize);
        for (let i = 0; i < chunkSize; i++) {
          chunk[i] = (offset + i) & 0xFF;
        }
        
        offset += chunkSize;
        controller.enqueue(chunk);
      }
    },
    
    cancel(reason) {
      console.log('Stream canceled:', reason);
    }
  });

ホストランタイムが、例えばフェッチResponsebodyとして、ランタイム自体からバイト指向のReadableStreamを提供する場合、ランタイム自体がBYOB読み取りの最適化された実装を提供する方がはるかに簡単であるものの、それらは依然としてデフォルトとBYOBの両方の読み取りパターンを処理できる必要があり、その要件はかなりの複雑さをもたらします。

バックプレッシャー:理論的には良いが、実際には壊れる

Backpressure(バックプレッシャー)は、動作の遅いコンシューマーが速いプロデューサーの減速を知らせる機能で、Webストリームの第一級のコンセプトです。理論的には、です。実際のところ、このモデルには重大な欠陥がいくつかあります。

プライマリシグナルは、コントローラーのdesiredSizeです。ポジティブ(データが必要)、Zero(容量大容量)、ネガティブ(容量超過)、Null(クローズ済み)です。プロデューサーは、この値をチェックし、プラスでない場合はエンキューを停止することになっています。ただし、これを強制するものはありません:controller.enqueue()望まれるリターゲットサイズの場合でも常に成功します。

new ReadableStream({
  start(controller) {
    // Nothing stops you from doing this
    while (true) {
      controller.enqueue(generateData()); // desiredSize: -999999
    }
  }
});

Streamの実装は、バックプレッシャーを無視できます。また、一部の仕様定義された機能は、バックプレッシャーを明示的に破壊します。例えば、tee()は、1つのストリームから2つの分岐を作成します。一方のブランチの読み取り速度が他のブランチより速い場合、データは内部バッファに制限なく蓄積されます。速いコンシューマーは、遅いコンシューマーが追いつく間に無限のメモリ増加を引き起こす可能性があります。これを設定する方法も、遅いブランチをキャンセルする以外にオプトアウトする方法もありません。

Webストリームは、highWaterMarkオプションとカスタマイズ可能なサイズ計算の形で、バックプレッシャーの動作を調整するための明確なメカニズムを提供しますが、これらはdesiredSizeと同様に無視されやすく、多くのアプリケーションはこれらに注意を払うことができません。

WritableStream側にも同じ問題が存在します。WritableStreamには高い透かし目標のサイズがあります。データの生産者が注意を払うべきですが、できないことが多いというwriter.readyという約束があります。

const writable = getWritableStreamSomehow();
const writer = writable.getWriter();

// Producers are supposed to wait for the writer.ready
// It is a promise that, when resolves, indicates that
// the writables internal backpressure is cleared and
// it is ok to write more data
await writer.ready;
await writer.write(...);

実装者にとっては、バックプレッシャーは保証はせずに複雑さを増大させます。Queue Consumerのサイズを追跡し、望ましいサイズを計算し、適切なタイミングでpull()を呼び出す機械はすべて正しく実装されなければなりません。しかし、これらのシグナルは助言であるため、すべての機能が、バックプレッシャーが解決できるはずの問題を実際に防ぐことはできません。

約束の隠れたコスト

Webストリーム仕様では、多数のポイントでプロミスを作成する必要があり、多くの場合ホットパス上にあり、ユーザーに見えないことも少なくありません。各 read()の呼び出しはPromiseを返すだけではありません。内部的には、実装はキュー管理、pull()調整、およびバックプレッシャーシグナリングのために追加のPromiseを作成します。

このオーバーヘッドは、バッファ管理、完了、バックプレッシャー信号の約束に依存するので、義務付けられています。実装に特化したものもありますが、仕様に書かれている通りであれば避けられません。高頻度のストリーミング(動画フレーム、ネットワークパケット、リアルタイムデータ)では、このオーバーヘッドが相当なものになります。

問題はパイプラインでさらに悪化します。各TransformStreamは、ソースとシンクの間にプロミスの仕組みをさらに一層追加します。仕様は同期の高速パスを定義しないため、データがすぐに利用可能であっても、約束された機械が実行されます。

実装者にとっては、この約束重視の設計が最適化の機会を制約します。この仕様では、特定の約束解決の順序付けが義務付けられているため、微妙なコンプライアンス違反のリスクを冒すことなく、バッチ操作や不要な非同期境界のスキップすることは困難です。実装者が行う隠れた内部最適化はたくさんありますが、それらを正しく行うことは複雑で困難な場合があります。

このブログ記事を書いている間に、VercelのMalte Ublが、VercelがNode.jsのパフォーマンス改善に関して行っているいくつかの調査について説明する独自のブログ記事を公開しました。Webストリームの実装。その投稿では、Webストリームの実装すべてが直面する、以下の基本的なパフォーマンス最適化の問題について考察しています。

「または PipeTo()を検討してください。各チャンクは、読み取り、書き込み、バックプレッシャー、繰り返しという、完全なPromiseチェーンを通過していきます。読み取りごとに{value, done}の結果オブジェクトが割り当てられます。エラーの伝播によって追加のPromiseブランチが作成されます。

これらは、どれも間違いではありません。これらの保証は、ストリームがセキュリティ境界を越えるブラウザでは、キャンセルセマンティクスは相手に乗じるすきを与えず、パイプの両端を制御できないブラウザでは重要です。しかし、サーバー上で、React Server Componentsを1KBのチャンクで3回変換すると、コストが蓄積されます。

ネイティブのWebStream PipeThoughをベンチマークしたのは、1KBのチャンクに対して630MB/sです。同じパススルー変換で最大7,900 MB/sのNode.js Pipeline()。これは12倍のギャップであり、その差はほぼすべてPromiseとオブジェクト割り当てのオーバーヘッドです。」 - Malte Ubl, https://vercel.com/blog/we-ralph-wiggumed-webstreams-to-make-them-10x-faster

研究の一環として、Node.jsの一連の改善提案をまとめましたWebストリームの実装により、特定のコードパスの約束を排除することで、最大10倍もの高速化を実現できます。これは、有用ではありますが、大きなオーバーヘッドを追加することを証明するものです。Node.jsのコアメンテナーの1人として、MalteとVercelの人々が改善提案を実現するお手伝いができることを楽しみにしています。

Cloudflare Workersに最近行った更新では、内部のデータパイプラインにも同様の変更を加えて、特定のアプリケーションシナリオで作成されるJavaScript約束の数を最大200倍減らしました。その結果、これらのアプリケーションのパフォーマンスは、数桁向上します。

現実世界の障害

未消費の本体によるリソースの枯渇

fetch()がレスポンスを返す場合、本文はReadableStreamです。ステータスだけを確認し、本文を消費またはキャンセルしない場合はどうなりますか?その答えは実装によって異なりますが、一般的な結果はリソースの漏洩です。

async function checkEndpoint(url) {
  const response = await fetch(url);
  return response.ok; // Body is never consumed or cancelled
}

// In a loop, this can exhaust connection pools
for (const url of urls) {
  await checkEndpoint(url);
}

このパターンは、undici(Node.jsに組み込まれたfetch()実装)を使用するNode.jsアプリケーションで接続プールを枯渇させる原因となっていました。同様の問題が他のランタイムでも発生しています。ストリームは基盤となる接続への参照を保持するため、明示的な消費またはキャンセルを行わなければ、接続はガベージコレクションまで続く可能性があります。

問題は、暗黙的にStreamブランチを作成するAPIによってさらに悪化しています。Request.clone()Response.clone()ボディストリーム上で暗黙のtee()を実行します。これは見落としがちな詳細です。ロギングまたはリトライのロジックをクローンするコードは、無意識のうちに独立した消費を必要とする支社的なストリームを作成し、リソース管理の負担を増大させる可能性があります。

確かに、こうしたタイプの問題実装のバグです。接続リークは間違いなく、undici氏が自身の実装で修正する必要があるものでしたが、仕様の複雑さにより、この種の問題に対処することは容易ではありません。

「Node.jsのfetch()実装でストリームをクローニングするのは、見た目よりも難しいのです。リクエストまたはレスポンスの本文をクローンする際は、tee()を呼びます。これは、1つのストリームを2つのブランチに分割し、両方を消費する必要があります。あるコンシューマーが他のコンシューマーよりも読み取りが速い場合、データバッファは低速なブランチを待ちます。両方のブランチを適切に消費しないと、基盤となる接続がリークされます。1つのソースを共有する2人の読者の間で調整が必要なため、誤って元のリクエストを壊したり、接続プールを使い果たすことがよくあります。これは単純なAPI呼び出しですが、複雑な基盤となる仕組みを持つため正しく理解するのは難しいです。」- Mateo Collina、博士- Platformmatic共同創設者兼CTO、Node.js技術運営委員会委員長

メモリの最前線を越える

tee()は、ストリームを2つの分岐に分割します。一見簡単ですが、この実装にはバッファリングが必要です。あるブランチが他のブランチより速く読み込まれた場合、遅いブランチが追いつくまでデータはどこかに保持される必要があります。

const [forHash, forStorage] = response.body.tee();

// Hash computation is fast
const hash = await computeHash(forHash);

// Storage write is slow — meanwhile, the entire stream
// may be buffered in memory waiting for this branch
await writeToStorage(forStorage);

仕様では、tee() のバッファ制限を義務付けていません。公平に言えば、この仕様は、仕様の観測可能な規範的要件が満たされる限り、実装がtee()やその他のAPIの実際の内部メカニズムを、適切と考える方法で実装することを許可しています。しかし、実装がStreamの仕様で記述された特定の方法でtee()を実装することを選択した場合、tee()には回避が困難な組み込み型のメモリ管理の問題が生じます。

実装部門は、これに対処するための独自の戦略を策定する必要がありました。Firefoxは当初リンクリスト方式を使っており、消費率の差に比例してO(n)メモリ増加が発生しました。Cloudflare Workersでは、バックプレッシャーが最速ではなく、最も遅いコンシューマーによって示される共有バッファモデルを実装することを選択しました。

バックプレッシャーのギャップを変革

TransformStream は、処理ロジックを介した 読み取り可能/書き込み可能 なペアを作成します。transform() 関数は、読み取り時ではなく、書き込み時に実行されます。変換の処理は、利用者の準備ができているかどうかに関係なく、データが到着すると迅速に行われます。これにより、コンシューマーが遅いときに不要な作業が発生し、両側間のバックプレッシャーシグナリングにはギャップがあり、負荷の無制限なバッファリングが発生する可能性があります。仕様では、変換されるデータのプロデューサーが、変換の書き込み可能な側にある writer.ready シグナルに注意を払っていることが想定されますが、多くの場合、生産者はそれを無視してしまいます。

変換の transform() 操作が同期的で、常に出力をすぐにキューに入れる場合、ダウンストリームのコンシューマーが遅くても、書き込み可能側にバックプレッシャーを返すことはありません。これは、多くの開発者が完全に見落とす仕様設計の結果です。ユーザーが1人につき、通常現時点でアクティブなストリームパイプラインの数も少数であるブラウザでは、このタイプのフットガンは何の影響もありませんが、ランタイム時のサーバー側またはエッジのパフォーマンスに大きな影響を与えます同時に数千のリクエストに対応する高速な遅延が発生します。

const fastTransform = new TransformStream({
  transform(chunk, controller) {
    // Synchronously enqueue — this never applies backpressure
    // Even if the readable side's buffer is full, this succeeds
    controller.enqueue(processChunk(chunk));
  }
});

// Pipe a fast source through the transform to a slow sink
fastSource
  .pipeThrough(fastTransform)
  .pipeTo(slowSink);  // Buffer grows without bound

TransformStreamsは、コントローラーのバックプレッシャーをチェックし、それをライターに伝えるために、約束を使用します。

const fastTransform = new TransformStream({
  async transform(chunk, controller) {
    if (controller.desiredSize <= 0) {
      // Wait on the backpressure to clear somehow
    }

    controller.enqueue(processChunk(chunk));
  }
});

しかし、ここでの難しさとして、TransformStreamDefaultControllerには Writers のような ready promise メカニズムがありません。そのため、TransformStreamの実装では、controller.desiredSizeが再び正の値になる時期を定期的にチェックするためのポーリングメカニズムを実装する必要があります。

パイプラインでは、問題がさらに悪化します。複数の変換(解析、変換、シリアル化など)を連鎖させる場合、各TransformStreamには独自の内部読み取り可能および書き込み可能なバッファがあります。実装者が仕様に厳密に従う場合、データはプッシュ指向の方法でこれらのバッファを通過します。ソースはAの変換をプッシュし、Bの変換をプッシュし、Cの変換をプッシュし、それぞれが最終的なコンシューマーが転送する前に、中間バッファにデータを蓄積します削除し始めました3つの変換を使用して、6つの内部バッファを同時に満たすことができます。

Stream APIを使用する開発者は、ソース、変換、および書き込み可能な宛先を作成する際に、highWaterMarkのようなオプションを使用することを覚えておくことが期待されていますが、多くの場合、忘れてしまったり、単に無視することを選択したりします。

source
  .pipeThrough(parse)      // buffers filling...
  .pipeThrough(transform)  // more buffers filling...
  .pipeThrough(serialize)  // even more buffers...
  .pipeTo(destination);    // consumer hasn't started yet

実装では、ID変換を崩壊させたり、観測できないパスを短縮したり、バッファ割り当てを遅らせたり、JavaScriptをまったく実行しないネイティブコードに戻るなどして、変換パイプラインを最適化する方法が見つかりました。Deno、Bun、Cloudflare Workersはすべて、オーバーヘッドの多くを排除するのに役立つ「ネイティブパス」最適化の実装に成功しています。Vercelの最近のfast-webstreams研究は、Node.jsの同様の最適化に取り組んでいます。しかし、最適化自体もかなりの複雑さを増し、TransformStreamが用いる本質的なプッシュ指向モデルから完全に脱却することはできません。

サーバーサイドレンダリングにおけるGCフラッシング

Streamのサーバーサイドレンダリング(SSR)は、特に厄介なケースです。典型的なSSRストリームは何千もの小さなHTMLフラグメントをレンダリングし、それぞれがStreamの機械を通過する可能性があります。

// Each component enqueues a small chunk
function renderComponent(controller) {
  controller.enqueue(encoder.encode(`<div>${content}</div>`));
}

// Hundreds of components = hundreds of enqueue calls
// Each one triggers promise machinery internally
for (const component of components) {
  renderComponent(controller);  // Promises created, objects allocated
}

すべてのフラグメントは、read()呼び出し用に作成されたプロミス、バックプレッシャー調整のプロミス、中間バッファ割り当て、および{ value, done } 結果オブジェクトを意味します。これらのほとんどは、ほぼ即座にガベージになります。

負荷がかかると、スループットを破壊する可能性のあるGC圧力が生じます。JavaScriptエンジンは、有益な作業をする代わりに、短期間のオブジェクトを収集するのに多大な時間を費やしてしまいます。GCが中断リクエスト処理を一時停止すると、遅延は予測不可能になります。私がSSRのワークロードでは、ガベージコレクションがリクエストあたりの総CPU時間のかなりの部分(最大50%を超える)を占めていることが分かりました。その時間を、実際にコンテンツのレンダリングに費やすことができます。

皮肉なことに、ストリーミングSSRはコンテンツを段階的に送信することでパフォーマンスを向上させるはずです。しかし、Streamを作成する機械のオーバーヘッドが、特に小さなコンポーネントを持つページでは、その改善効果を損なう可能性があります。開発者は、レスポンス全体をバッファリングする方が、Webストリームを介してストリーミングするよりも実際には高速であり、目的を完全に損なうことに気づくことがあります。

最適化の自転車操業

使用可能なパフォーマンスを実現するために、主要なランタイムはすべて、Webストリームの非標準の内部最適化に依存してきました。Node.js、Deno、Bun、Cloudflare Workersはすべて、独自の回避策を開発しています。これは、機械の多くが観測されず、ショートカットされる可能性がある、システムレベルのI/Oに接続されたストリームに特に当てはまります。

こうした最適化の機会を見つけるだけでも、大仕事になる可能性があります。どの動作が観察可能で、どの動作を安全に回避できるのかを特定するには、エンドツーエンドで仕様を理解する必要があります。それでも、ある最適化が実際に仕様に準拠しているかどうかは不明であることが多いです。実装者は、互換性を損なうことなく、どの意味論を緩和できるかを判断する必要があります。これにより、ランタイムチームには、許容可能なパフォーマンスを達成するために、仕様の専門家になるという大きなプレッシャーがかかります。

こうした最適化は実装が難しく、エラーが発生しやすく、ランタイム間の動作が不整合になります。Bunの「Direct Streams」最適化は、意図的かつ観察可能な非標準的なアプローチを採用しており、仕様の多くのメカニズムを完全にバイパスしています。Cloudflare WorkersのIdentityTransformStreamは、パススルー変換の高速処理を提供しますが、Workers固有であり、TransformStreamの標準ではない動作を実装しています。それぞれのランタイムには独自のトリックがあり、非標準ソリューションに偏りがちです。たいていの場合、それが高速化する唯一の方法だからです。

この断片化はポータビリティを損ないます。あるランタイムではうまく動作するコードが、「標準的な」APIsを使用している場合でも、別のランタイムでは動作が異なる(または低質になる)場合があります。ランタイム実装者にとって複雑な負担は相当なものであり、微妙な動作の違いは、クロスランタイムコードを書こうとする開発者、特に多くのランタイム環境で効率的に実行できなければならないフレームワークを維持する開発者にとって摩擦が生じます。

また、多くの最適化は、ユーザーコードに観測されない仕様の一部でのみ可能であることを強調する必要があります。代替手段、Bun「Direct Streams」のような代替手段は、仕様で定義された観測可能な動作から意図的に逸脱させることです。つまり、最適化が「不完全」に感じられることが多いということです。あるシナリオによっては動作するが、他のシナリオでは動作しない、あるランタイムでは動作するが、他のシナリオでは動作しないなどです。そのような場合があるたびに、Webストリームのアプローチ全体の持続不可能な複雑さが増大します。これが原因で、ほとんどのランタイム実装者は、準拠テストに合格したとしても、Streamの実装のさらなる改善に大規模な労力を費やすことはほとんどありません。

実装者は、こうした複雑な手順を踏む必要はありません。妥当なパフォーマンスを実現するために、仕様の意味を緩和する、またはバイパスする必要があることに気づいたなら、それは仕様自体に何かが間違っているという兆候です。適切に設計されたストリーミングAPIはデフォルトで効率的であるべきであり、各ランタイムが独自のエスケープハッシュを作成する必要はありません。

コンプライアンスの負担

複雑な仕様では、複雑なエッジケースが生まれます。StreamのWebプラットフォームテストは70以上のテストファイルにまたがっており、包括的なテストは良いことですが、テストする必要があるものが何であるかを示しています。

実装が通過しなければならない、不明瞭なテストのいくつかを考えてみましょう。

  • プロトタイプ汚染防御:1つのテストがObject.prototype.thenにパッチを適用し、Promiseの解決をインターセプトします。その後、pipeTo()およびtee()操作がプロトタイプチェーンを介して内部値を漏洩しないことを検証します。これは、仕様の約束が多い内部機能が攻撃対象領域を作成するためにのみ存在するセキュリティプロパティをテストします。

  • WebAssemblyメモリの拒否:BYOB読み取りは、WebAssemblyメモリに支えられたArrayBuffersを明示的に拒否しなければなりません。WebAssemblyメモリは、通常のバッファのように見えますが、転送できません。このエッジケースは、Specのバッファ分離モデルによって存在します。より単純なAPIでは処理する必要がないのです。

  • ステートマシンの競合によるクラッシュ回帰:byobRequest.respond()の呼び出しを特定のテストで確認しますenqueue()の後、ランタイムがクラッシュしない。このシーケンスにより、内部ステートマシンに競合が発生します。enqueue()は保留中の読み取りを実行し、byobRequestを無効化する必要がありますが、実装は開発者が複雑なAPIを正しく使用していない可能性が非常に高いため、メモリを破損するのではなく、その後のrespond()を適切に処理する必要があります。

これらは、テスト作成者が完全に協力して考案したシナリオではありません。それらは仕様の設計の結果であり、現実世界のバグを反映しています。

ランタイムの実装者にとって、WPTスイートを通過することは、ほとんどのアプリケーションコードが遭遇することのない複雑なエッジケースを処理することを意味します。テストは、ハッピーなパスだけでなく、リーダー、ライター、コントローラー、キュー、戦略、そしてそれらすべてを接続する約束機械間の相互作用の完全なマトリックスをエンコードします。

よりシンプルなAPIは、概念が少なく、概念間の相互作用が少なく、適切なエッジケースが少なく、その結果、実装が実際に一貫して動作するという確信が高まります。

要点

Webストリームは、ユーザーにとっても実装者にとっても複雑なものです。仕様の問題はバグではありません。設計どおりにAPIを使用することで実現されます。漸進的改善だけで修正できる問題ではありません。これらは、基本的な設計の選択の結果です。物事を改善するためには、異なる基盤が必要です。

より良いStream APIであることを可能に

異なるランタイムでWebストリームの仕様を複数回実装し、問題点を直接確認した後、今日の第一原理から設計した場合のより良い代替ストリーミングAPIはどのようなものになるかを検討するタイミングだと決めました。

以下は概念実証です。完成した標準ではなく、プロダクションレディで利用可能なライブラリでもありません。必ずしも新しい何かの具体的提案でもありませんが、Webストリームの問題がストリーミングに固有のものではないことを示す議論の出発点です。 ;特定の設計上の選択の結果であり、違った方法で選択することも可能です。このAPIが正しい答えかどうかよりも、それがストリーミングプリミティブから実際に必要なものについての生産的な会話につながるかどうかより重要です。

ストリームとは?

API設計について掘り下げる前に、ストリームとは何か?を考えてみましょう。

本質的には、ストリームは時間とともに到着するデータのシーケンスです。すべてを一度に行うことはできません。利用可能になるにつれて、徐々に処理していきます。

Unixパイプは、おそらくこのアイディアを最も純粋に表現したものです。

cat access.log | grep "error" | sort | uniq -c

データは左から右に流れます。各ステージは入力を読み取り、作業を行い、出力を書き込みます。パイプリーダーの取得も、コントローラーロックの管理も必要ありません。下流段階が遅いと、当然ながら上流段階も遅くなります。バックプレッシャーはモデル内で暗黙的であり、学習する(または無視する)ための別個のメカニズムではありません。

JavaScriptでは、「時間とともに到着する一連のモノ」の自然なプリミティブはすでに言語にあります:非同期反復可能です。for await...ofで消費します。反復を中止することで、消費を停止します。

これは、新しいAPIが維持しようとしている直感です。ストリームは反復のように感じられなければなりません。なぜなら、それこそが反復するものだからです。Webストリーム(リーダー、ライター、コントローラー、ロック、キューイング戦略)の複雑さは、この基本的なシンプルさを不明瞭にします。より優れたAPIは、単純なケースをシンプルにし、本当に必要な場合にのみ複雑さを増す必要があります。

設計の原則

私は、異なる原則に基づいて概念実証の代替案を作成しました。

Streamは反復可能です。

隠れた内部状態を持つカスタムReadableStreamクラスはありません。読み取り可能なストリームは、単に AsyncIterable<Uint8Array[]> です。for await...ofで消費します。リーダーの取得も、ロックの管理も必要ありません。

プルスルー変換

変換はコンシューマーがプルするまで実行されません。評価も隠れたバッファリングもありません。データは、ソースからコンシューマーへオンデマンドで流れ、変換を経てコンシューマーへ送信されます。反復を停止すると、処理は停止します。

明示的なバックプレッシャー

バックプレッシャーはデフォルトで厳格です。バッファが満杯になると、書き込みは静かに蓄積されるのではなく、拒否されます。スペースが利用可能になるまでブロック、最も古い削除、最新の削除など、代替ポリシーを設定できますが、明示的に選択する必要があります。静かなメモリ増加はもうありません。

バッチ処理されたチャンク

反復ごとに1つのチャンクを生成するのではなく、ストリームはUint8Array[]: チャンクの配列を生成します。これにより、複数のチャンクにわたる非同期オーバーヘッドが合理化され、ホットパスでの約束の作成とマイクロタスクの遅延が低減されます。

バイトのみ

APIはバイト(Uint8Array)のみを処理します。文字列は自動的にUTF-8にエンコードされます。「バリューストリーム」と「バイトストリーム」の対比はありません。任意のJavaScript値をストリーミングするには、async iterablesを直接使用してください。APIはUint8Arrayを使用しますが、チャンクは不透明なものとして扱われます。部分的な消費も、BYOBパターンも、ストリーミング機械自体内でのバイトレベルの操作もありません。チャンクが入り、チャンクが出てくると、変換で明示的に変更しない限り、変更は一切されません。

同期の高速パスが重要

APIは、同期されたデータソースが必要であり、一般的であることを認識します。非同期スケジューリングが提供される唯一のオプションであるという理由だけで、アプリケーションは非同期スケジューリングのパフォーマンス上のコストを常に受け入れる必要はありません。同時に、同期処理と非同期処理の混在は危険です。同期パスは常に選択肢であり、常に明示的である必要があります。

新しいAPIの動作

ストリームの作成と利用

Webストリームでは、単純な生産者/消費者のペアを作成するには、TransformStream、手動でのエンコーディング、そして慎重なロック管理が必要です。

const { readable, writable } = new TransformStream();
const enc = new TextEncoder();
const writer = writable.getWriter();
await writer.write(enc.encode("Hello, World!"));
await writer.close();
writer.releaseLock();

const dec = new TextDecoder();
let text = '';
for await (const chunk of readable) {
  text += dec.decode(chunk, { stream: true });
}
text += dec.decode();

この比較的クリーンなバージョンでも、TransformStream、手動のTextEncoderTextDecoder、および明示的なロックのリリースが必要です。

以下は、新しいAPIに対応するものです。

import { Stream } from 'new-streams';

// Create a push stream
const { writer, readable } = Stream.push();

// Write data — backpressure is enforced
await writer.write("Hello, World!");
await writer.end();

// Consume as text
const text = await Stream.text(readable);

読み取り可能は、単に非同期の反復可能です。ストリーム全体を収集してデコードするStream.text()など、任意の関数に渡すことができます。

ライターにはシンプルなインターフェースがあります: バッチ書き込み用の write()、writev()、完了を通知する end()、エラー用の abort()。これが本質的にこれだけです。

writerは具体的なクラスではありません。write()end()、およびabort()を実装するオブジェクトはどれでもライターになれるため、既存のAPIを適応させたり、サブクラス化せずに特殊な実装を作成したりすることが容易になります。start()write()close()abort()コールバックなど、ライフサイクルと状態がバインドされているWritableStreamに依存しないコントローラーを介して調整する必要がある複雑なUnderlyingSinkプロトコルはありません。

以下は、書かれたすべてのデータを収集する単純なメモリ内ライターです。

// A minimal writer implementation — just an object with methods
function createBufferWriter() {
  const chunks = [];
  let totalBytes = 0;
  let closed = false;

  const addChunk = (chunk) => {
    chunks.push(chunk);
    totalBytes += chunk.byteLength;
  };

  return {
    get desiredSize() { return closed ? null : 1; },

    // Async variants
    write(chunk) { addChunk(chunk); },
    writev(batch) { for (const c of batch) addChunk(c); },
    end() { closed = true; return totalBytes; },
    abort(reason) { closed = true; chunks.length = 0; },

    // Sync variants return boolean (true = accepted)
    writeSync(chunk) { addChunk(chunk); return true; },
    writevSync(batch) { for (const c of batch) addChunk(c); return true; },
    endSync() { closed = true; return totalBytes; },
    abortSync(reason) { closed = true; chunks.length = 0; return true; },

    getChunks() { return chunks; }
  };
}

// Use it
const writer = createBufferWriter();
await Stream.pipeTo(source, writer);
const allData = writer.getChunks();

拡張すべきベースクラスも、実装すべき抽象メソッドも、調整すべきコントローラーもありません。正しい形のオブジェクトというだけです。

プルスルー変換

新しいAPI設計では、変換はデータが消費されるまでいかなる作業も実行してはいけません。これが基本原則です。

// Nothing executes until iteration begins
const output = Stream.pull(source, compress, encrypt);

// Transforms execute as we iterate
for await (const chunks of output) {
  for (const chunk of chunks) {
    process(chunk);
  }
}

Stream.pull()は、レイジーパイプラインを作成します。圧縮暗号化の変換は、出力の反復を始めるまで実行されません。各イテレーションでは、オンデマンドでパイプラインを介してデータがプルされます。

これは、パイプがセットアップされるとすぐに、ソースから変換にデータを積極的にポンピングし始めるWebストリームのpipeThrough()とは根本的に異なります。プルセマンティックスは、処理が行われるタイミングを制御し、イテレーションを停止すると処理を停止することを意味します。

変換はステートレスにもステートフルにもできます。ステートレス変換は、チャンクを取得し、変換されたチャンクを返す関数です。

// Stateless transform — a pure function
// Receives chunks or null (flush signal)
const toUpperCase = (chunks) => {
  if (chunks === null) return null; // End of stream
  return chunks.map(chunk => {
    const str = new TextDecoder().decode(chunk);
    return new TextEncoder().encode(str.toUpperCase());
  });
};

// Use it directly
const output = Stream.pull(source, toUpperCase);

ステートフル変換は、呼び出し間で状態を維持するメンバー関数を持つ単純なオブジェクトです。

// Stateful transform — a generator that wraps the source
function createLineParser() {
  // Helper to concatenate Uint8Arrays
  const concat = (...arrays) => {
    const result = new Uint8Array(arrays.reduce((n, a) => n + a.length, 0));
    let offset = 0;
    for (const arr of arrays) { result.set(arr, offset); offset += arr.length; }
    return result;
  };

  return {
    async *transform(source) {
      let pending = new Uint8Array(0);
      
      for await (const chunks of source) {
        if (chunks === null) {
          // Flush: yield any remaining data
          if (pending.length > 0) yield [pending];
          continue;
        }
        
        // Concatenate pending data with new chunks
        const combined = concat(pending, ...chunks);
        const lines = [];
        let start = 0;

        for (let i = 0; i < combined.length; i++) {
          if (combined[i] === 0x0a) { // newline
            lines.push(combined.slice(start, i));
            start = i + 1;
          }
        }

        pending = combined.slice(start);
        if (lines.length > 0) yield lines;
      }
    }
  };
}

const output = Stream.pull(source, createLineParser());

Abort時にクリーンアップが必要な変換には、Abortハンドラーを追加します。

// Stateful transform with resource cleanup
function createGzipCompressor() {
  // Hypothetical compression API...
  const deflate = new Deflater({ gzip: true });

  return {
    async *transform(source) {
      for await (const chunks of source) {
        if (chunks === null) {
          // Flush: finalize compression
          deflate.push(new Uint8Array(0), true);
          if (deflate.result) yield [deflate.result];
        } else {
          for (const chunk of chunks) {
            deflate.push(chunk, false);
            if (deflate.result) yield [deflate.result];
          }
        }
      }
    },
    abort(reason) {
      // Clean up compressor resources on error/cancellation
    }
  };
}

実装者にとって、start()transform()flush() メソッドとコントローラーの調整が、独自の隠れたステートマシンとバッファリングメカニズムを持つ TransformStream クラスに渡される Transformer プロトコルはありません。変換は単なる関数または単純なオブジェクト:実装とテストははるかに簡単です。

明示的なバックプレッシャーポリシー

区切られたバッファがいっぱいになり、プロデューサーがさらに書き込みたい場合、できることはいくつかあります。

  1. 書き込みを拒否:それ以上のデータの受け入れを拒否

  2. 待機:スペースが利用可能になるまでブロック

  3. 古いデータを破棄:すでにバッファリングされているものを退避させ、スペースを作る

  4. 新しいデータを破棄:受信データをドロップする

これで、以上です。その他のレスポンスは、これらのバリエーション(「バッファのサイズを変更する」など、実際には選択を遅延させているだけ)、または一般的なストリーミングプリミティブに属さないドメイン固有のロジックのいずれかです。現在、Webストリームは現在、デフォルトで常に「待機」を選択します。

新しいAPIでは、これら4つのいずれかを明示的に選択することになります。

  • strict(デフォルト):バッファがフルで、保留中の書き込みが多すぎる場合、書き込みを拒否します。プロデューサーがバックプレッシャーを無視する「消火活動的」パターンを捕捉します。

  • block: バッファスペースが利用可能になるまで書き込みを待ちます。プロデューサーが適切に書き込みを待つと信頼できる場合に使います。

  • drop-oldest: 最も古いバッファリングデータを削除し、スペースを空けます。古くなったデータが価値を失うライブフィードに有用。

  • drop-newest: 満杯時に受信データを破棄します。圧倒されることなく処理したい場合に有用です。

const { writer, readable } = Stream.push({
  highWaterMark: 10,
  backpressure: 'strict' // or 'block', 'drop-oldest', 'drop-newest'
});

プロデューサーの協力を期待することはもうありません。選択したポリシーによって、バッファが満杯になったときに何が起こるかが決定されます。

プロデューサーの書き込みがコンシューマーの読み取りより速い場合の、各ポリシーの動作は次のようになります。

// strict: Catches fire-and-forget writes that ignore backpressure
const strict = Stream.push({ highWaterMark: 2, backpressure: 'strict' });
strict.writer.write(chunk1);  // ok (not awaited)
strict.writer.write(chunk2);  // ok (fills slots buffer)
strict.writer.write(chunk3);  // ok (queued in pending)
strict.writer.write(chunk4);  // ok (pending buffer fills)
strict.writer.write(chunk5);  // throws! too many pending writes

// block: Wait for space (unbounded pending queue)
const blocking = Stream.push({ highWaterMark: 2, backpressure: 'block' });
await blocking.writer.write(chunk1);  // ok
await blocking.writer.write(chunk2);  // ok
await blocking.writer.write(chunk3);  // waits until consumer reads
await blocking.writer.write(chunk4);  // waits until consumer reads
await blocking.writer.write(chunk5);  // waits until consumer reads

// drop-oldest: Discard old data to make room
const dropOld = Stream.push({ highWaterMark: 2, backpressure: 'drop-oldest' });
await dropOld.writer.write(chunk1);  // ok
await dropOld.writer.write(chunk2);  // ok
await dropOld.writer.write(chunk3);  // ok, chunk1 discarded

// drop-newest: Discard incoming data when full
const dropNew = Stream.push({ highWaterMark: 2, backpressure: 'drop-newest' });
await dropNew.writer.write(chunk1);  // ok
await dropNew.writer.write(chunk2);  // ok
await dropNew.writer.write(chunk3);  // silently dropped

マルチコンシューマーのパターンを顕著に

// Share with explicit buffer management
const shared = Stream.share(source, {
  highWaterMark: 100,
  backpressure: 'strict'
});

const consumer1 = shared.pull();
const consumer2 = shared.pull(decompress);

隠れた制限のないバッファを持つtee()の代わりに、明示的なマルチコンシューマープリミティブが得られます。Stream.share()はプルベースです。コンシューマーは共有ソースからプルし、バッファ制限とバックプレッシャーポリシーを事前に設定します。

また、プッシュベースのマルチコンシューマーシナリオ用のStream.broadcast()もあります。どちらも、コンシューマーが異なる速度で動作する場合にどうなるかについて考えなければなりません。なぜなら、それは隠れるべき問題ではない現実的な懸念だからです。

同期/非同期の分離

すべてのストリーミングワークロードにI/Oが含まれるわけではありません。ソースがメモリ内にあり、変換が純粋な関数である場合、非同期機械学習はオーバーヘッドを追加します。あなたは、メリットがない「待機」の調整に対してお金を払っているのです。

新しいAPIには、完全なパラレル同期バージョンがあります。Stream.pullSync()Stream.bytesSync()Stream.textSync()などがあります。ソースと変換がすべて同期されている場合、単一の約束なしでパイプライン全体を処理できます。

// Async — when source or transforms may be asynchronous
const textAsync = await Stream.text(source);

// Sync — when all components are synchronous
const textSync = Stream.textSync(source);

以下は、圧縮、変換、消費を含む完全な同期パイプラインで、非同期オーバーヘッドがゼロです:

// Synchronous source from in-memory data
const source = Stream.fromSync([inputBuffer]);

// Synchronous transforms
const compressed = Stream.pullSync(source, zlibCompressSync);
const encrypted = Stream.pullSync(compressed, aesEncryptSync);

// Synchronous consumption — no promises, no event loop trips
const result = Stream.bytesSync(encrypted);

パイプライン全体が単一のコールスタックで実行されます。約束は作成されず、マイクロタスクキュースケジューリングも発生せず、短時間の非同期処理マシンによるGC圧力も発生しません。解析、圧縮、またはメモリ内データの変換など、CPUに依存するワークロードの場合、これは同等のWebストリームコードよりも大幅に高速化できます。これにより、すべてのコンポーネントが同期である場合でも、非同期境界が強制されます。

Webストリームには同期経路がありません。たとえソースにデータの準備ができていて、変換が純粋な関数だとしても、すべての操作でプロミスの作成とマイクロタスクのスケジューリングに対する支払いが発生します。Promisは、待ち時間が実際的に必要な場合に素晴らしいのですが、必ずしも待ち時間が必要ではありません。新しいAPIを使用すると、必要なときに同期ランドの状態に維持することができます。

情報とWebストリームのギャップを埋める

非同期イテレータベースのアプローチは、この代替アプローチとWebストリームの間に自然な橋渡しを提供します。ReadableStreamからこの新しいアプローチにアクセスする場合、ReadableStreamがバイトを生成するように設定されている場合、入力としてreadableを渡すだけで、期待通りに機能します。

const readable = getWebReadableStreamSomehow();
const input = Stream.pull(readable, transform1, transform2);
for await (const chunks of input) {
  // process chunks
}

別のアプローチではチャンクのバッチが発生するため、ReadableStreamに適応する場合、もう少し作業が必要になりますが、適応層は簡単です。

async function* adapt(input) {
  for await (const chunks of input) {
    for (const chunk of chunks) {
      yield chunk;
    }
  }
}

const input = Stream.pull(source, transform1, transform2);
const readable = ReadableStream.from(adapt(input));

現実世界における従来からの障害への対処方法

  • 消費されていない本文:プルセマンティックスは、反復するまで何も起こらないことを意味します。隠れたリソースの保持安全もありません。ストリームを消費しなければ、接続が開いたままにするバックグラウンドマシンはありません。

  • The tee() メモリの崖: Stream.share()明示的なバッファ設定が必要です高いWA型とBackpressureのポリシーを事前に選択すれば、コンシューマーが異なる速度で実行するときに静かな無限の成長はなくなります。

  • バックプレッシャーギャップを変革:プルスルー変換をオンデマンドで実行します。データが中間バッファを経由することはありません。消費者がプルした場合にのみ流れます。反復を止め、処理を止めます。

  • SSRでのGCスラッシング: バッチ処理されたチャンク(Uint8Array[])は、非同期のオーバーヘッドを軽減します。Stream.pullSync() 経由でパイプラインを同期させるCPUを必要とするワークロードへの割り当てを完全に排除します。

パフォーマンス

設計の選択はパフォーマンスに影響を与えます。以下は、この代替手段の参照実装のベンチマークで、Webストリーム(Node.js v24.x、Apple M1 Pro、平均10回以上実行):

シナリオ

代替

Webストリーム

差異

小さいチャンク(1KB × 5000)

約13 GB/秒

~4 GB/秒

最大3倍の高速化

小さなチャンク(100B × 10000)

~4 GB/秒

~450 MB/秒

最大8倍の速さ

非同期反復(8KB × 1000)

~530 GB/秒

最大35GB/秒

最大15倍の速さ

連鎖した3倍の変換(8KB x 500)

~275GB/秒

最大3 GB/秒

最大80~90倍の高速化

高頻度(64B × 20000)

~7.5 GB/秒

~280 MB/秒

最大25倍の高速化

連鎖した変換の結果は特に驚くべきものです。プルスルーセマンティックスは、Webストリームパイプラインを悩ませる中間バッファリングを排除します。各TransformStreamが内部バッファを急速に満たす代わりに、データはオンデマンドでコンシューマーからソースへ流れます。

まず、Node.jsは、Webストリーム実装のパフォーマンスを完全に最適化するために、まだ大した労力を費やしていません。ホットパスを最適化するための小さな努力によって、Node.jsのパフォーマンス結果には大きな改善の余地があるでしょう。とはいえ、DenoとBunでベンチマークを実行すると、Webストリーム実装のどちらよりも、この代替イテレータベースのアプローチによる大幅なパフォーマンスの向上が示されています。

ブラウザベンチマーク(Chrome/Blink、3回の実行平均値)でも、一貫した向上が見られます。

シナリオ

代替

Webストリーム

差異

3KBチャンクをプッシュ

最大135,000オペレーション/秒

最大2万4,000運用/秒

最大5~6倍の高速化

100KBのチャンクをプッシュ

最大2万4,000運用/秒

3,000オペレーション/秒

最大7~8倍の高速化

3つの変換連鎖

最大4,600万オペレーション/秒

最大880オペレーション/秒

~5倍の高速化

5つの変換チェーン

最大2,400万オペレーション/秒

約550オペレーション/秒

最大4倍の速さ

bytes()消費

最大7万3,000件の運用/秒

最大11,000オペレーション/秒

最大6~7倍の高速化

非同期反復

最大110万運用/秒

1万オペレーション/秒

最大40~100倍の高速化

これらのベンチマークは、制御されたシナリオでのスループットを測定します。実際のパフォーマンスは、特定のユースケースによって異なります。Node.jsとブラウザの向上の違いは、環境ごとにWebストリームに対して異なる最適化経路を反映しています。

これらのベンチマークは、新しいAPIの純粋なTypeScript/JavaScript実装と、各ランタイムのWebストリームのネイティブ(JavaScript/C++/Rust)実装を比較していることに注意する必要があります。新しいAPIのリファレンス実装には、パフォーマンスの最適化機能がありませんでした。利益はすべて設計から得られます。ネイティブ実装では、さらなる改善が見られる可能性が高いでしょう。

この利点は、基本的な設計の選択肢がどのように組み合わされているかを示しています。バッチ処理は、非同期オーバーヘッドを合理化し、プルセマンティクスが中間バッファリングを排除し、データがすぐに利用可能であれば実装が同期の高速パスを使用できるようになるというメリットです。

「私たちは、ノードストリームのパフォーマンスと一貫性を改善するために何度も行ってきましたが、ゼロから始めることには、独特の強みがあります。新しいストリームのアプローチは、従来型のお荷物になることなく、最新のランタイムの現実を取り入れ、よりシンプルで、パフォーマンスが高く、より一貫性のあるストリームモデルへの扉を開くことができるのです。」 - Robert Nagy、Node.js TSCメンバー兼Node.js Streamsコントリビューター

今後の展開は?

対話のきっかけとしてこれを公開します。何が正しかったのでしょうか?見逃していたものは?このモデルに合わないユースケースはありますか?このアプローチの移行経路はどのようなものになりますか?Webストリームの苦痛を感じた開発者からのフィードバックを集め、より良いAPIとはどのようなものになるべきかについて意見を集めることを目的としています。

ご自身で試してみてください

この代替アプローチの参照実装は現在利用可能で、https://github.com/jasnell/new-streamsでご覧いただけます。

問題、議論、プルリクエストを歓迎します。私が取り上げていないWebストリームの問題に遭遇したり、このアプローチにギャップがある場合は、ぜひご連絡ください。しかし、ここでも「この新しいオブジェクトを全部使いましょう!」という考え方ではありません。 Webストリームの現状を超えて、第一の原則へと戻る議論を開始することです。

Webストリームは、他に何もない時にWebプラットフォームにストリーミングを持ち込むという野心的なプロジェクトでした。非同期イテレーション前、何年もの制作経験がエッジケースを明らかにする前、2014年当時の制約を考えると、設計者は合理的な選択をしました。

以来、私たちは多くのことを学びました。JavaScriptは進化を遂げました。現在設計されているストリーミングAPIは、よりシンプルで、より言語に合わせたものであり、バックプレッシャーやマルチコンシューマーの動作など、重要なことをより明確に示すことができます。

より良いStream APIが必要です。では、その使用例についてお話しましょう。

Cloudflareは企業ネットワーク全体を保護し、お客様がインターネット規模のアプリケーションを効率的に構築し、あらゆるWebサイトやインターネットアプリケーションを高速化し、DDoS攻撃を退けハッカーの侵入を防ぎゼロトラスト導入を推進できるようお手伝いしています。

ご使用のデバイスから1.1.1.1 にアクセスし、インターネットを高速化し安全性を高めるCloudflareの無料アプリをご利用ください。

より良いインターネットの構築支援という当社の使命について、詳しくはこちらをご覧ください。新たなキャリアの方向性を模索中の方は、当社の求人情報をご覧ください。
標準JavaScriptTypeScriptオープンソースCloudflare WorkersNode.jsパフォーマンスAPI

Xでフォロー

Cloudflare|@cloudflare

関連ブログ投稿

2026年3月23日 13:00

Launching Cloudflare’s Gen 13 servers: trading cache for cores for 2x edge compute performance

Cloudflare’s Gen 13 servers double our compute throughput by rethinking the balance between cache and cores. Moving to high-core-count AMD EPYC ™ Turin CPUs, we traded large L3 cache for raw compute density. By running our new Rust-based FL2 stack, we completely mitigated the latency penalty to unlock twice the performance....

2026年3月09日 14:00

Pingora OSSデプロイメントにおけるリクエストスニッフィングの脆弱性を修正する

本日は、オープンソースのPingoraサービスをイングレスプロキシとしてデプロイした際のリクエストスニッフィングの脆弱性と、Pingora 0.8.0での修正方法を公開します。 ...