Dify
单论会话
import time
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
DIFY_API_KEY = "app-xx"
DIFY_API_URL = "http://xxx/v1/chat-messages"
app = FastAPI()
def extract_user_input(messages: list) -> str:
if not messages:
return ""
raw_content = messages[-1]["content"]
if isinstance(raw_content, list):
texts = []
for item in raw_content:
if item.get("type") == "text":
text = item.get("text", "")
lines = text.split("\n")
clean_lines = []
skip = False
for line in lines:
if "Sender (untrusted metadata)" in line:
skip = True
if skip and line.strip() == "":
skip = False
continue
if not skip:
if line.startswith("[") and "GMT" in line:
parts = line.split("] ", 1)
if len(parts) > 1:
clean_lines.append(parts[1])
else:
clean_lines.append(line)
texts.append("\n".join(clean_lines).strip())
return " ".join(t for t in texts if t)
return raw_content
async def stream_dify_response(user_input: str, model: str):
"""调用 Dify streaming 接口,转成 OpenAI SSE 格式"""
chat_id = "chatcmpl-" + str(int(time.time()))
created = int(time.time())
async with httpx.AsyncClient(timeout=60) as client:
async with client.stream(
"POST",
DIFY_API_URL,
headers={
"Authorization": f"Bearer {DIFY_API_KEY}",
"Content-Type": "application/json",
},
json={
"inputs": {},
"query": user_input,
"response_mode": "streaming", # ✅ 改成 streaming
"user": "openai-proxy",
},
) as r:
async for line in r.aiter_lines():
if not line.startswith("data:"):
continue
raw = line[5:].strip()
if not raw:
continue
try:
event = json.loads(raw)
except Exception:
continue
event_type = event.get("event")
# 流式文本块
if event_type == "agent_message" or event_type == "message":
delta = event.get("answer", "")
chunk = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": delta},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
# 结束
elif event_type == "message_end":
usage = event.get("metadata", {}).get("usage", {})
chunk = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return
# 错误
elif event_type == "error":
error_chunk = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": f"Error: {event.get('message', '')}"},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return
@app.post("/v1/chat/completions")
async def chat(req: Request):
data = await req.json()
messages = data.get("messages", [])
user_input = extract_user_input(messages)
model = data.get("model", "dify")
print("CLEAN INPUT:", user_input)
return StreamingResponse(
stream_dify_response(user_input, model),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # 防止 nginx 缓冲 SSE
},
)
多轮会话版
import time
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
DIFY_API_KEY = "app-"
DIFY_API_URL = "http://ssh./v1/chat-messages"
app = FastAPI()
# ✅ 按 user 维护 conversation_id
conversation_store: dict[str, str] = {}
def extract_user_input(messages: list) -> str:
if not messages:
return ""
raw_content = messages[-1]["content"]
if isinstance(raw_content, list):
texts = []
for item in raw_content:
if item.get("type") == "text":
text = item.get("text", "")
lines = text.split("\n")
clean_lines = []
skip = False
for line in lines:
if "Sender (untrusted metadata)" in line:
skip = True
if skip and line.strip() == "":
skip = False
continue
if not skip:
if line.startswith("[") and "GMT" in line:
parts = line.split("] ", 1)
if len(parts) > 1:
clean_lines.append(parts[1])
else:
clean_lines.append(line)
texts.append("\n".join(clean_lines).strip())
return " ".join(t for t in texts if t)
return raw_content
def get_user_id(data: dict) -> str:
"""从请求中提取 user 标识,用于隔离不同用户的会话"""
# OpenClaw 可能带 user 字段,没有就用 model 兜底
return data.get("user", data.get("model", "default-user"))
async def stream_dify_response(user_input: str, model: str, user_id: str):
chat_id = "chatcmpl-" + str(int(time.time()))
created = int(time.time())
# ✅ 取出该用户已有的 conversation_id
conversation_id = conversation_store.get(user_id, "")
print(f"USER: {user_id}, CONVERSATION_ID: {conversation_id or '(new)'}")
async with httpx.AsyncClient(timeout=60) as client:
async with client.stream(
"POST",
DIFY_API_URL,
headers={
"Authorization": f"Bearer {DIFY_API_KEY}",
"Content-Type": "application/json",
},
json={
"inputs": {},
"query": user_input,
"response_mode": "streaming",
"conversation_id": conversation_id, # ✅ 传入,空字符串代表新会话
"user": user_id,
},
) as r:
async for line in r.aiter_lines():
if not line.startswith("data:"):
continue
raw = line[5:].strip()
if not raw:
continue
try:
event = json.loads(raw)
except Exception:
continue
event_type = event.get("event")
# ✅ 从任意事件中捕获 conversation_id 并保存
if event.get("conversation_id"):
new_conv_id = event["conversation_id"]
if conversation_store.get(user_id) != new_conv_id:
conversation_store[user_id] = new_conv_id
print(f"SAVED CONVERSATION_ID: {new_conv_id} for user: {user_id}")
if event_type in ("message", "agent_message"):
delta = event.get("answer", "")
chunk = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": delta},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
elif event_type == "message_end":
usage = event.get("metadata", {}).get("usage", {})
chunk = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return
elif event_type == "error":
error_chunk = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": f"Error: {event.get('message', '')}"},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return
@app.post("/v1/chat/completions")
async def chat(req: Request):
data = await req.json()
messages = data.get("messages", [])
user_input = extract_user_input(messages)
model = data.get("model", "dify")
user_id = get_user_id(data)
print("CLEAN INPUT:", user_input)
return StreamingResponse(
stream_dify_response(user_input, model, user_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
# ✅ 可选:提供清除会话的接口
@app.delete("/v1/conversations/{user_id}")
async def clear_conversation(user_id: str):
conversation_store.pop(user_id, None)
return {"message": f"Conversation cleared for user: {user_id}"}
增加提示词
import time
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
DIFY_API_KEY = "app-"
DIFY_API_URL = "http://ssh./v1/chat-messages"
app = FastAPI()
conversation_store: dict[str, str] = {}
def extract_user_input(messages: list) -> tuple[str, str]:
"""
返回 (system_prompt, user_input)
✅ 提取 system 角色内容作为 inputs 传给 Dify
"""
system_prompt = ""
user_input = ""
for msg in messages:
if msg.get("role") == "system":
content = msg.get("content", "")
if isinstance(content, list):
system_prompt = " ".join(
item.get("text", "") for item in content if item.get("type") == "text"
)
else:
system_prompt = content
# 取最后一条 user 消息
last_user = next(
(m for m in reversed(messages) if m.get("role") == "user"), None
)
if last_user:
raw_content = last_user.get("content", "")
if isinstance(raw_content, list):
texts = []
for item in raw_content:
if item.get("type") == "text":
text = item.get("text", "")
lines = text.split("\n")
clean_lines = []
skip = False
for line in lines:
if "Sender (untrusted metadata)" in line:
skip = True
if skip and line.strip() == "":
skip = False
continue
if not skip:
if line.startswith("[") and "GMT" in line:
parts = line.split("] ", 1)
if len(parts) > 1:
clean_lines.append(parts[1])
else:
clean_lines.append(line)
texts.append("\n".join(clean_lines).strip())
user_input = " ".join(t for t in texts if t)
else:
user_input = raw_content
return system_prompt, user_input
def get_user_id(data: dict) -> str:
return data.get("user", data.get("model", "default-user"))
async def stream_dify_response(system_prompt: str, user_input: str, model: str, user_id: str):
chat_id = "chatcmpl-" + str(int(time.time()))
created = int(time.time())
conversation_id = conversation_store.get(user_id, "")
print(f"USER: {user_id}, CONVERSATION_ID: {conversation_id or '(new)'}")
print(f"SYSTEM PROMPT: {system_prompt[:100] if system_prompt else '(none)'}")
print(f"CLEAN INPUT: {user_input}")
# ✅ system prompt 通过两种方式传给 Dify
# 方式1: 放进 inputs(适合 Dify 工作流有定义变量的情况)
inputs = {}
if system_prompt:
inputs["system_prompt"] = system_prompt
# ✅ 方式2: 没有 conversation_id 时,把 system prompt 拼到第一条消息前面
if system_prompt and not conversation_id:
query = f"[系统指令]\n{system_prompt}\n\n[用户消息]\n{user_input}"
else:
query = user_input
async with httpx.AsyncClient(timeout=60) as client:
async with client.stream(
"POST",
DIFY_API_URL,
headers={
"Authorization": f"Bearer {DIFY_API_KEY}",
"Content-Type": "application/json",
},
json={
"inputs": inputs,
"query": query,
"response_mode": "streaming",
"conversation_id": conversation_id,
"user": user_id,
},
) as r:
async for line in r.aiter_lines():
if not line.startswith("data:"):
continue
raw = line[5:].strip()
if not raw:
continue
try:
event = json.loads(raw)
except Exception:
continue
event_type = event.get("event")
if event.get("conversation_id"):
new_conv_id = event["conversation_id"]
if conversation_store.get(user_id) != new_conv_id:
conversation_store[user_id] = new_conv_id
print(f"SAVED CONVERSATION_ID: {new_conv_id}")
if event_type in ("message", "agent_message"):
delta = event.get("answer", "")
chunk = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": delta},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
elif event_type == "message_end":
usage = event.get("metadata", {}).get("usage", {})
chunk = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": usage.get("prompt_tokens", 0),
"completion_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
},
}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return
elif event_type == "error":
error_chunk = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created,
"model": model,
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": f"Error: {event.get('message', '')}"},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return
@app.post("/v1/chat/completions")
async def chat(req: Request):
data = await req.json()
messages = data.get("messages", [])
system_prompt, user_input = extract_user_input(messages)
model = data.get("model", "dify")
user_id = get_user_id(data)
return StreamingResponse(
stream_dify_response(system_prompt, user_input, model, user_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
@app.delete("/v1/conversations/{user_id}")
async def clear_conversation(user_id: str):
conversation_store.pop(user_id, None)
return {"message": f"Conversation cleared for user: {user_id}"}