流式响应
SSE 解析, 中断处理, 何时必须 stream=true
流式响应
GPUShare 支持 SSE (Server-Sent Events) 流式响应。首 token 延迟从秒级降到百毫秒级,UI 可以边生成边渲染。
基本用法
OpenAI SDK
stream = client.chat.completions.create(
model="claude-sonnet-4-5-20250929",
messages=[{"role": "user", "content": "Stream a haiku"}],
stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta.content or ""
print(delta, end="", flush=True)
Anthropic SDK
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
messages=[{"role": "user", "content": "Stream a haiku"}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
Gemini SDK
stream = client.models.generate_content_stream(
model="gemini-2.5-pro",
contents="Stream a haiku",
)
for chunk in stream:
print(chunk.text, end="", flush=True)
何时必须用 stream=true
| 工具 | 必须 stream | 原因 |
|---|---|---|
image_generation | ✅ | 上游 sub2api WebSocket V2 通道不返回同步响应,只发流 |
web_search | ✅ | 同上,搜索过程通过流式 chunk 实时返回 |
function 工具 | 任意 | 走 HTTP 通道,流 / 非流均可 |
| 纯文本对话 | 任意 | 流 / 非流均可 |
不满足时,gateway 返回 400 invalid_request_error,提示 stream: true required for image_generation/web_search。
SSE 协议格式
GPUShare 透传上游 SSE,每条 data: 行 = 一个 chunk。
OpenAI Chat 流式 chunk
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{"content":"Hello"}}]}
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{"content":" world"}}]}
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","choices":[{"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":10,"completion_tokens":2}}
data: [DONE]
Anthropic 流式 events
event: message_start
data: {"type":"message_start","message":{...}}
event: content_block_delta
data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"Hello"}}
event: message_stop
data: {"type":"message_stop"}
Gemini 流式 (newline-delimited JSON)
data: {"candidates":[{"content":{"parts":[{"text":"Hello"}]}}]}
data: {"candidates":[{"content":{"parts":[{"text":" world"}]},"finishReason":"STOP"}]}
手工解析 (curl + 客户端无 SDK 时)
curl https://api.dflop.top/v1/chat/completions \
-H "Authorization: Bearer $GPUSHARE_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-5.4",
"stream": true,
"messages": [{"role":"user","content":"Hello"}]
}' \
--no-buffer | while read line; do
echo "$line"
done
JavaScript fetch 解析:
const resp = await fetch("https://api.dflop.top/v1/chat/completions", {
method: "POST",
headers: {
"Authorization": `Bearer ${process.env.GPUSHARE_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: "gpt-5.4",
stream: true,
messages: [{ role: "user", content: "Hello" }],
}),
});
const reader = resp.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const data = line.slice(6);
if (data === "[DONE]") return;
const chunk = JSON.parse(data);
process.stdout.write(chunk.choices[0]?.delta?.content ?? "");
}
}
拿 token usage
OpenAI Chat 流式默认不返回 trailing usage chunk。要拿真实 token 数,加 stream_options:
stream = client.chat.completions.create(
model="gpt-5.4",
messages=[...],
stream=True,
stream_options={"include_usage": True}, # 关键
)
last_chunk = None
for chunk in stream:
if chunk.choices:
print(chunk.choices[0].delta.content or "", end="")
last_chunk = chunk
# 最后一个 chunk 带 usage
print(f"\nTokens: {last_chunk.usage}")
GPUShare gateway 内部对大部分上游通道自动注入 stream_options.include_usage=true (除了 GLM 直连通道,它本来就发 usage chunk)。所以客户端不强制设也能拿到 token,但显式设更稳。
中断 / 取消
OpenAI SDK (Python)
stream = client.chat.completions.create(..., stream=True)
try:
for chunk in stream:
if some_user_canceled():
stream.close() # 显式关闭,gateway 收到 client disconnect
break
print(chunk.choices[0].delta.content or "", end="")
except KeyboardInterrupt:
stream.close()
fetch (JavaScript)
const controller = new AbortController();
const resp = await fetch("https://api.dflop.top/v1/chat/completions", {
signal: controller.signal,
// ...
});
// 之后想中断:
controller.abort();
GPUShare gateway 收到客户端断开后,会向上游也发取消 (尽力而为,不保证立即停止扣费 —— 已生成的 token 仍计费)。
错误处理
流式过程中 gateway 内部错误会以错误事件形式发出 (协议自适配):
OpenAI Chat (/v1/chat/completions)
data: {"error":{"message":"upstream timeout","type":"upstream_error","code":"upstream_timeout"}}
data: [DONE]
Anthropic Messages (/v1/messages)
event: error
data: {"type":"error","error":{"type":"api_error","message":"upstream timeout"}}
客户端解析时先检查 chunk 是否有 error 字段再处理 delta.content —— 上游中途挂了,前面已经流过的 content 是部分有效的,把 error 当 turn 结束信号处理即可。
反向代理 / Nginx 注意
GPUShare 在响应里附加:
Content-Type: text/event-stream
Cache-Control: no-cache, no-store, no-transform
X-Accel-Buffering: no
如果你在自己的反向代理后面再代理 GPUShare,确保别开 buffering (proxy_buffering off; for Nginx),否则流式整个响应会被缓冲到结束才一次性下发,失去流式意义。
调试
curl 直接看 raw SSE 是最直观的:
curl https://api.dflop.top/v1/chat/completions \
-H "Authorization: Bearer $GPUSHARE_API_KEY" \
-H "Content-Type: application/json" \
-d '{"model":"gpt-5.4","stream":true,"messages":[{"role":"user","content":"hi"}]}' \
--no-buffer 2>&1 | head -20
如果看到的是一次性响应而不是逐 chunk 显示,说明中间某层在缓冲。