WebSocketStream:将流与 WebSocket API 集成

通过应用背压来防止您的应用在 WebSocket 消息中被淹没,或用消息淹没 WebSocket 服务器。

背景

WebSocket API

WebSocket API 提供了 WebSocket 协议的 JavaScript 接口,可以在用户浏览器与服务器之间打开双向交互式通信会话。借助此 API,您可以向服务器发送消息并接收事件驱动的响应,而无需轮询服务器以获取回复。

Streams API

Streams API 允许 JavaScript 以编程方式访问通过网络接收的数据流块,并根据需要处理它们。在流环境中,一个重要概念就是背压。这是单个数据流或管道链调节读取或写入速度的过程。当流本身或管道链中后期的流仍然繁忙且尚未准备好接受更多分块时,它会在链中向后发送信号,以酌情减慢传送速度。

当前 WebSocket API 的问题

无法对收到的邮件应用背压

使用当前的 WebSocket API 时,对消息做出响应发生在 WebSocket.onmessage 中,这是在从服务器收到消息时调用的 EventHandler

假设您有一个应用,该应用在每次收到新消息时都需要执行大量的数据处理操作。您可能会设置类似于以下代码的流程,由于您通过 await 返回 process() 调用的结果,应该没有问题,对吧?

// A heavy data crunching operation.
const process = async (data) => {
  return new Promise((resolve) => {
    window.setTimeout(() => {
      console.log('WebSocket message processed:', data);
      return resolve('done');
    }, 1000);
  });
};

webSocket.onmessage = async (event) => {
  const data = event.data;
  // Await the result of the processing step in the message handler.
  await process(data);
};

不对!当前 WebSocket API 的问题是无法应用背压。当消息到达速度快于 process() 方法可以处理它们的速度时,渲染进程要么通过缓冲这些消息来填满内存,要么由于 CPU 使用率为 100% 而变得无响应,或者两者兼有。

对发送的邮件应用背压不符合人体工程学

可以对已发送的消息应用背压,但涉及轮询 WebSocket.bufferedAmount 属性,这样做效率低下且不符合人体工学要求。此只读属性会返回已使用 WebSocket.send() 调用排入队列但尚未传输至网络的字节数。发送完所有加入队列的数据后,此值会重置为零,但如果您继续调用 WebSocket.send(),它将继续攀升。

什么是 WebSocketStream API?

WebSocketStream API 通过将数据流与 WebSocket API 集成,解决了不存在的或不符合人体工学的背压问题。这意味着,您可以“免费”应用背压,而无需支付任何额外费用。

WebSocketStream API 的建议用例

可以使用此 API 的网站示例包括:

  • 需要保持互动(尤其是视频和屏幕共享)的高带宽 WebSocket 应用。
  • 同样,视频拍摄功能以及会在浏览器中生成大量需要上传到服务器的大量数据的其他应用。在背压状态下,客户端可以停止产生数据,而不是在内存中累积数据。

当前状态

| 步骤 | 状态 | | ------------------------------------------ | ---------------------------- | | 1. 创建铺垫消息 | [完成][explainer] | | 2. 创建规范的初始草稿 | [进行中][spec] | | 3. 收集反馈并不断改进设计 | [进行中](#feedback) | | 4. 源试用 | [完成][ot] | | 5. 发布 | 未开始 |

如何使用 WebSocketStream API

入门示例

WebSocketStream API 基于 promise,这使得在现代 JavaScript 世界中处理它时感觉自然。首先,构造一个新的 WebSocketStream 并向其传递 WebSocket 服务器的网址。接下来,等待连接状态变为 opened,这会产生 ReadableStream 和/或 WritableStream

通过调用 ReadableStream.getReader() 方法,最终会获得 ReadableStreamDefaultReader,然后您可以获取 read() 数据,直到流完成(即它返回 {value: undefined, done: true} 形式的对象)。

因此,通过调用 WritableStream.getWriter() 方法,您最终会获得 WritableStreamDefaultWriter,然后,您可以将数据 write() 应用到该数据库中。

  const wss = new WebSocketStream(WSS_URL);
  const {readable, writable} = await wss.opened;
  const reader = readable.getReader();
  const writer = writable.getWriter();

  while (true) {
    const {value, done} = await reader.read();
    if (done) {
      break;
    }
    const result = await process(value);
    await writer.write(result);
  }

背压

那么承诺的背压功能呢?正如我在上文所述,您可以“免费”获得,无需执行任何额外的步骤。如果 process() 需要额外的时间,则系统仅会在流水线准备就绪后使用下一条消息。 同样,WritableStreamDefaultWriter.write() 步骤只有在安全的情况下才会继续。

高级示例

WebSocketStream 的第二个参数是一个选项包,用于未来扩展。目前唯一的选项是 protocols,其行为与 WebSocket 构造函数的第二个参数相同:

const chatWSS = new WebSocketStream(CHAT_URL, {protocols: ['chat', 'chatv2']});
const {protocol} = await chatWSS.opened;

所选的 protocol 以及可能的 extensions 是通过 WebSocketStream.opened promise 提供的字典的一部分。有关实时连接的所有信息都由此 promise 提供,因为如果连接失败,则这些信息无关紧要。

const {readable, writable, protocol, extensions} = await chatWSS.opened;

有关已关闭 WebSocketStream 连接的信息

从 WebSocket API 中的 WebSocket.oncloseWebSocket.onerror 事件提供的信息现在可通过 WebSocketStream.closed promise 获取。如果异常关闭,promise 会拒绝,否则会解析为服务器发送的代码和原因。

如需了解所有可能的状态代码及其含义,请参阅 CloseEvent 状态代码列表

const {code, reason} = await chatWSS.closed;

关闭 WebSocketStream 连接

WebSocketStream 可以使用 AbortController 关闭。因此,请将 AbortSignal 传递给 WebSocketStream 构造函数。

const controller = new AbortController();
const wss = new WebSocketStream(URL, {signal: controller.signal});
setTimeout(() => controller.abort(), 1000);

作为替代方案,您还可以使用 WebSocketStream.close() 方法,但该方法的主要用途是允许指定代码和原因。

wss.close({code: 4000, reason: 'Game over'});

渐进式增强和互操作性

Chrome 是目前唯一实现 WebSocketStream API 的浏览器。为实现与传统 WebSocket API 的互操作性,不能对收到的消息应用背压。可以对已发送的消息应用背压,但涉及轮询 WebSocket.bufferedAmount 属性,这样做效率低下且不符合人体工学要求。

功能检测

如需检查 WebSocketStream API 是否受支持,请使用以下命令:

if ('WebSocketStream' in window) {
  // `WebSocketStream` is supported!
}

演示

在支持的浏览器中,您可以在嵌入式 iframe 中或直接在 Glitch 上查看 WebSocketStream API 的实际应用。

反馈

Chrome 团队想了解您使用 WebSocketStream API 的体验。

告诉我们有关 API 设计的信息

此 API 是否存在无法正常运行的情况? 或者,是否缺少一些方法或属性来实施您的想法? 对安全模型有疑问或意见? 在相应的 GitHub 代码库上提交规范问题,或对现有问题提出您的想法。

报告实施方面的问题

您是否发现了 Chrome 实现存在错误? 或者,实现是否与规范不同? 在 new.crbug.com 上提交 bug。请务必提供尽可能多的详情和简单的重现说明,并在组件框中输入 Blink>Network>WebSocketsGlitch 非常适合用于分享轻松快速的重现案例。

显示对该 API 的支持

您是否打算使用 WebSocketStream API? 您的公开支持有助于 Chrome 团队确定功能的优先级,并向其他浏览器供应商说明支持这些功能的重要性。

请使用 # 标签 #WebSocketStream@ChromiumDev 发送 Twitter 微博,并告知我们您是在何处以及如何使用它。

实用链接

致谢

WebSocketStream API 由 Adam RiceYutaka Hirano 实现。主打图片:Daan Mooij 来自 Unsplash