流式响应

SSE 解析, 中断处理, 何时必须 stream=true

流式响应

GPUShare 支持 SSE (Server-Sent Events) 流式响应。首 token 延迟从秒级降到百毫秒级,UI 可以边生成边渲染。

基本用法

OpenAI SDK

stream = client.chat.completions.create(
    model="claude-sonnet-4-5-20250929",
    messages=[{"role": "user", "content": "Stream a haiku"}],
    stream=True,
)

for chunk in stream:
    delta = chunk.choices[0].delta.content or ""
    print(delta, end="", flush=True)

Anthropic SDK

with client.messages.stream(
    model="claude-sonnet-4-5-20250929",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Stream a haiku"}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

Gemini SDK

stream = client.models.generate_content_stream(
    model="gemini-2.5-pro",
    contents="Stream a haiku",
)
for chunk in stream:
    print(chunk.text, end="", flush=True)

何时必须用 stream=true

工具必须 stream原因
image_generation上游 sub2api WebSocket V2 通道不返回同步响应,只发流
web_search同上,搜索过程通过流式 chunk 实时返回
function 工具任意走 HTTP 通道,流 / 非流均可
纯文本对话任意流 / 非流均可

不满足时,gateway 返回 400 invalid_request_error,提示 stream: true required for image_generation/web_search

SSE 协议格式

GPUShare 透传上游 SSE,每条 data: 行 = 一个 chunk

OpenAI Chat 流式 chunk

data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{"content":"Hello"}}]}

data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{"content":" world"}}]}

data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":10,"completion_tokens":2}}

data: [DONE]

Anthropic 流式 events

event: message_start
data: {"type":"message_start","message":{...}}

event: content_block_delta
data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"Hello"}}

event: message_stop
data: {"type":"message_stop"}

Gemini 流式 (newline-delimited JSON)

data: {"candidates":[{"content":{"parts":[{"text":"Hello"}]}}]}

data: {"candidates":[{"content":{"parts":[{"text":" world"}]},"finishReason":"STOP"}]}

手工解析 (curl + 客户端无 SDK 时)

curl https://api.dflop.top/v1/chat/completions \
  -H "Authorization: Bearer $GPUSHARE_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "gpt-5.4",
    "stream": true,
    "messages": [{"role":"user","content":"Hello"}]
  }' \
  --no-buffer | while read line; do
    echo "$line"
  done

JavaScript fetch 解析:

const resp = await fetch("https://api.dflop.top/v1/chat/completions", {
  method: "POST",
  headers: {
    "Authorization": `Bearer ${process.env.GPUSHARE_API_KEY}`,
    "Content-Type": "application/json",
  },
  body: JSON.stringify({
    model: "gpt-5.4",
    stream: true,
    messages: [{ role: "user", content: "Hello" }],
  }),
});

const reader = resp.body.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  buffer += decoder.decode(value, { stream: true });

  const lines = buffer.split("\n");
  buffer = lines.pop() ?? "";

  for (const line of lines) {
    if (!line.startsWith("data: ")) continue;
    const data = line.slice(6);
    if (data === "[DONE]") return;
    const chunk = JSON.parse(data);
    process.stdout.write(chunk.choices[0]?.delta?.content ?? "");
  }
}

拿 token usage

OpenAI Chat 流式默认不返回 trailing usage chunk。要拿真实 token 数,加 stream_options:

stream = client.chat.completions.create(
    model="gpt-5.4",
    messages=[...],
    stream=True,
    stream_options={"include_usage": True},  # 关键
)

last_chunk = None
for chunk in stream:
    if chunk.choices:
        print(chunk.choices[0].delta.content or "", end="")
    last_chunk = chunk

# 最后一个 chunk 带 usage
print(f"\nTokens: {last_chunk.usage}")

GPUShare gateway 内部对大部分上游通道自动注入 stream_options.include_usage=true (除了 GLM 直连通道,它本来就发 usage chunk)。所以客户端不强制设也能拿到 token,但显式设更稳。

中断 / 取消

OpenAI SDK (Python)

stream = client.chat.completions.create(..., stream=True)
try:
    for chunk in stream:
        if some_user_canceled():
            stream.close()  # 显式关闭,gateway 收到 client disconnect
            break
        print(chunk.choices[0].delta.content or "", end="")
except KeyboardInterrupt:
    stream.close()

fetch (JavaScript)

const controller = new AbortController();

const resp = await fetch("https://api.dflop.top/v1/chat/completions", {
  signal: controller.signal,
  // ...
});

// 之后想中断:
controller.abort();

GPUShare gateway 收到客户端断开后,会向上游也发取消 (尽力而为,不保证立即停止扣费 —— 已生成的 token 仍计费)。

错误处理

流式过程中 gateway 内部错误会以错误事件形式发出 (协议自适配):

OpenAI Chat (/v1/chat/completions)

data: {"error":{"message":"upstream timeout","type":"upstream_error","code":"upstream_timeout"}}

data: [DONE]

Anthropic Messages (/v1/messages)

event: error
data: {"type":"error","error":{"type":"api_error","message":"upstream timeout"}}

客户端解析时先检查 chunk 是否有 error 字段再处理 delta.content —— 上游中途挂了,前面已经流过的 content 是部分有效的,把 error 当 turn 结束信号处理即可。

反向代理 / Nginx 注意

GPUShare 在响应里附加:

Content-Type: text/event-stream
Cache-Control: no-cache, no-store, no-transform
X-Accel-Buffering: no

如果你在自己的反向代理后面再代理 GPUShare,确保别开 buffering (proxy_buffering off; for Nginx),否则流式整个响应会被缓冲到结束才一次性下发,失去流式意义。

调试

curl 直接看 raw SSE 是最直观的:

curl https://api.dflop.top/v1/chat/completions \
  -H "Authorization: Bearer $GPUSHARE_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"model":"gpt-5.4","stream":true,"messages":[{"role":"user","content":"hi"}]}' \
  --no-buffer 2>&1 | head -20

如果看到的是一次性响应而不是逐 chunk 显示,说明中间某层在缓冲。