Skip to content

Dify

单论会话

import time
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

DIFY_API_KEY = "app-xx"
DIFY_API_URL = "http://xxx/v1/chat-messages"

app = FastAPI()


def extract_user_input(messages: list) -> str:
    if not messages:
        return ""
    raw_content = messages[-1]["content"]
    if isinstance(raw_content, list):
        texts = []
        for item in raw_content:
            if item.get("type") == "text":
                text = item.get("text", "")
                lines = text.split("\n")
                clean_lines = []
                skip = False
                for line in lines:
                    if "Sender (untrusted metadata)" in line:
                        skip = True
                    if skip and line.strip() == "":
                        skip = False
                        continue
                    if not skip:
                        if line.startswith("[") and "GMT" in line:
                            parts = line.split("] ", 1)
                            if len(parts) > 1:
                                clean_lines.append(parts[1])
                        else:
                            clean_lines.append(line)
                texts.append("\n".join(clean_lines).strip())
        return " ".join(t for t in texts if t)
    return raw_content


async def stream_dify_response(user_input: str, model: str):
    """调用 Dify streaming 接口,转成 OpenAI SSE 格式"""
    chat_id = "chatcmpl-" + str(int(time.time()))
    created = int(time.time())

    async with httpx.AsyncClient(timeout=60) as client:
        async with client.stream(
            "POST",
            DIFY_API_URL,
            headers={
                "Authorization": f"Bearer {DIFY_API_KEY}",
                "Content-Type": "application/json",
            },
            json={
                "inputs": {},
                "query": user_input,
                "response_mode": "streaming",  # ✅ 改成 streaming
                "user": "openai-proxy",
            },
        ) as r:
            async for line in r.aiter_lines():
                if not line.startswith("data:"):
                    continue

                raw = line[5:].strip()
                if not raw:
                    continue

                try:
                    event = json.loads(raw)
                except Exception:
                    continue

                event_type = event.get("event")

                # 流式文本块
                if event_type == "agent_message" or event_type == "message":
                    delta = event.get("answer", "")
                    chunk = {
                        "id": chat_id,
                        "object": "chat.completion.chunk",
                        "created": created,
                        "model": model,
                        "choices": [
                            {
                                "index": 0,
                                "delta": {"role": "assistant", "content": delta},
                                "finish_reason": None,
                            }
                        ],
                    }
                    yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"

                # 结束
                elif event_type == "message_end":
                    usage = event.get("metadata", {}).get("usage", {})
                    chunk = {
                        "id": chat_id,
                        "object": "chat.completion.chunk",
                        "created": created,
                        "model": model,
                        "choices": [
                            {
                                "index": 0,
                                "delta": {},
                                "finish_reason": "stop",
                            }
                        ],
                        "usage": {
                            "prompt_tokens": usage.get("prompt_tokens", 0),
                            "completion_tokens": usage.get("completion_tokens", 0),
                            "total_tokens": usage.get("total_tokens", 0),
                        },
                    }
                    yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
                    yield "data: [DONE]\n\n"
                    return

                # 错误
                elif event_type == "error":
                    error_chunk = {
                        "id": chat_id,
                        "object": "chat.completion.chunk",
                        "created": created,
                        "model": model,
                        "choices": [
                            {
                                "index": 0,
                                "delta": {"role": "assistant", "content": f"Error: {event.get('message', '')}"},
                                "finish_reason": "stop",
                            }
                        ],
                    }
                    yield f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n"
                    yield "data: [DONE]\n\n"
                    return


@app.post("/v1/chat/completions")
async def chat(req: Request):
    data = await req.json()
    messages = data.get("messages", [])
    user_input = extract_user_input(messages)
    model = data.get("model", "dify")

    print("CLEAN INPUT:", user_input)

    return StreamingResponse(
        stream_dify_response(user_input, model),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # 防止 nginx 缓冲 SSE
        },
    )

多轮会话版

import time
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

DIFY_API_KEY = "app-"
DIFY_API_URL = "http://ssh./v1/chat-messages"

app = FastAPI()

# ✅ 按 user 维护 conversation_id
conversation_store: dict[str, str] = {}


def extract_user_input(messages: list) -> str:
    if not messages:
        return ""
    raw_content = messages[-1]["content"]
    if isinstance(raw_content, list):
        texts = []
        for item in raw_content:
            if item.get("type") == "text":
                text = item.get("text", "")
                lines = text.split("\n")
                clean_lines = []
                skip = False
                for line in lines:
                    if "Sender (untrusted metadata)" in line:
                        skip = True
                    if skip and line.strip() == "":
                        skip = False
                        continue
                    if not skip:
                        if line.startswith("[") and "GMT" in line:
                            parts = line.split("] ", 1)
                            if len(parts) > 1:
                                clean_lines.append(parts[1])
                        else:
                            clean_lines.append(line)
                texts.append("\n".join(clean_lines).strip())
        return " ".join(t for t in texts if t)
    return raw_content


def get_user_id(data: dict) -> str:
    """从请求中提取 user 标识,用于隔离不同用户的会话"""
    # OpenClaw 可能带 user 字段,没有就用 model 兜底
    return data.get("user", data.get("model", "default-user"))


async def stream_dify_response(user_input: str, model: str, user_id: str):
    chat_id = "chatcmpl-" + str(int(time.time()))
    created = int(time.time())

    # ✅ 取出该用户已有的 conversation_id
    conversation_id = conversation_store.get(user_id, "")
    print(f"USER: {user_id}, CONVERSATION_ID: {conversation_id or '(new)'}")

    async with httpx.AsyncClient(timeout=60) as client:
        async with client.stream(
            "POST",
            DIFY_API_URL,
            headers={
                "Authorization": f"Bearer {DIFY_API_KEY}",
                "Content-Type": "application/json",
            },
            json={
                "inputs": {},
                "query": user_input,
                "response_mode": "streaming",
                "conversation_id": conversation_id,  # ✅ 传入,空字符串代表新会话
                "user": user_id,
            },
        ) as r:
            async for line in r.aiter_lines():
                if not line.startswith("data:"):
                    continue

                raw = line[5:].strip()
                if not raw:
                    continue

                try:
                    event = json.loads(raw)
                except Exception:
                    continue

                event_type = event.get("event")

                # ✅ 从任意事件中捕获 conversation_id 并保存
                if event.get("conversation_id"):
                    new_conv_id = event["conversation_id"]
                    if conversation_store.get(user_id) != new_conv_id:
                        conversation_store[user_id] = new_conv_id
                        print(f"SAVED CONVERSATION_ID: {new_conv_id} for user: {user_id}")

                if event_type in ("message", "agent_message"):
                    delta = event.get("answer", "")
                    chunk = {
                        "id": chat_id,
                        "object": "chat.completion.chunk",
                        "created": created,
                        "model": model,
                        "choices": [
                            {
                                "index": 0,
                                "delta": {"role": "assistant", "content": delta},
                                "finish_reason": None,
                            }
                        ],
                    }
                    yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"

                elif event_type == "message_end":
                    usage = event.get("metadata", {}).get("usage", {})
                    chunk = {
                        "id": chat_id,
                        "object": "chat.completion.chunk",
                        "created": created,
                        "model": model,
                        "choices": [
                            {
                                "index": 0,
                                "delta": {},
                                "finish_reason": "stop",
                            }
                        ],
                        "usage": {
                            "prompt_tokens": usage.get("prompt_tokens", 0),
                            "completion_tokens": usage.get("completion_tokens", 0),
                            "total_tokens": usage.get("total_tokens", 0),
                        },
                    }
                    yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
                    yield "data: [DONE]\n\n"
                    return

                elif event_type == "error":
                    error_chunk = {
                        "id": chat_id,
                        "object": "chat.completion.chunk",
                        "created": created,
                        "model": model,
                        "choices": [
                            {
                                "index": 0,
                                "delta": {"role": "assistant", "content": f"Error: {event.get('message', '')}"},
                                "finish_reason": "stop",
                            }
                        ],
                    }
                    yield f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n"
                    yield "data: [DONE]\n\n"
                    return


@app.post("/v1/chat/completions")
async def chat(req: Request):
    data = await req.json()
    messages = data.get("messages", [])
    user_input = extract_user_input(messages)
    model = data.get("model", "dify")
    user_id = get_user_id(data)

    print("CLEAN INPUT:", user_input)

    return StreamingResponse(
        stream_dify_response(user_input, model, user_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        },
    )


# ✅ 可选:提供清除会话的接口
@app.delete("/v1/conversations/{user_id}")
async def clear_conversation(user_id: str):
    conversation_store.pop(user_id, None)
    return {"message": f"Conversation cleared for user: {user_id}"}

增加提示词

import time
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

DIFY_API_KEY = "app-"
DIFY_API_URL = "http://ssh./v1/chat-messages"
app = FastAPI()

conversation_store: dict[str, str] = {}


def extract_user_input(messages: list) -> tuple[str, str]:
    """
    返回 (system_prompt, user_input)
    ✅ 提取 system 角色内容作为 inputs 传给 Dify
    """
    system_prompt = ""
    user_input = ""

    for msg in messages:
        if msg.get("role") == "system":
            content = msg.get("content", "")
            if isinstance(content, list):
                system_prompt = " ".join(
                    item.get("text", "") for item in content if item.get("type") == "text"
                )
            else:
                system_prompt = content

    # 取最后一条 user 消息
    last_user = next(
        (m for m in reversed(messages) if m.get("role") == "user"), None
    )
    if last_user:
        raw_content = last_user.get("content", "")
        if isinstance(raw_content, list):
            texts = []
            for item in raw_content:
                if item.get("type") == "text":
                    text = item.get("text", "")
                    lines = text.split("\n")
                    clean_lines = []
                    skip = False
                    for line in lines:
                        if "Sender (untrusted metadata)" in line:
                            skip = True
                        if skip and line.strip() == "":
                            skip = False
                            continue
                        if not skip:
                            if line.startswith("[") and "GMT" in line:
                                parts = line.split("] ", 1)
                                if len(parts) > 1:
                                    clean_lines.append(parts[1])
                            else:
                                clean_lines.append(line)
                    texts.append("\n".join(clean_lines).strip())
            user_input = " ".join(t for t in texts if t)
        else:
            user_input = raw_content

    return system_prompt, user_input


def get_user_id(data: dict) -> str:
    return data.get("user", data.get("model", "default-user"))


async def stream_dify_response(system_prompt: str, user_input: str, model: str, user_id: str):
    chat_id = "chatcmpl-" + str(int(time.time()))
    created = int(time.time())
    conversation_id = conversation_store.get(user_id, "")

    print(f"USER: {user_id}, CONVERSATION_ID: {conversation_id or '(new)'}")
    print(f"SYSTEM PROMPT: {system_prompt[:100] if system_prompt else '(none)'}")
    print(f"CLEAN INPUT: {user_input}")

    # ✅ system prompt 通过两种方式传给 Dify
    # 方式1: 放进 inputs(适合 Dify 工作流有定义变量的情况)
    inputs = {}
    if system_prompt:
        inputs["system_prompt"] = system_prompt

    # ✅ 方式2: 没有 conversation_id 时,把 system prompt 拼到第一条消息前面
    if system_prompt and not conversation_id:
        query = f"[系统指令]\n{system_prompt}\n\n[用户消息]\n{user_input}"
    else:
        query = user_input

    async with httpx.AsyncClient(timeout=60) as client:
        async with client.stream(
            "POST",
            DIFY_API_URL,
            headers={
                "Authorization": f"Bearer {DIFY_API_KEY}",
                "Content-Type": "application/json",
            },
            json={
                "inputs": inputs,
                "query": query,
                "response_mode": "streaming",
                "conversation_id": conversation_id,
                "user": user_id,
            },
        ) as r:
            async for line in r.aiter_lines():
                if not line.startswith("data:"):
                    continue
                raw = line[5:].strip()
                if not raw:
                    continue
                try:
                    event = json.loads(raw)
                except Exception:
                    continue

                event_type = event.get("event")

                if event.get("conversation_id"):
                    new_conv_id = event["conversation_id"]
                    if conversation_store.get(user_id) != new_conv_id:
                        conversation_store[user_id] = new_conv_id
                        print(f"SAVED CONVERSATION_ID: {new_conv_id}")

                if event_type in ("message", "agent_message"):
                    delta = event.get("answer", "")
                    chunk = {
                        "id": chat_id,
                        "object": "chat.completion.chunk",
                        "created": created,
                        "model": model,
                        "choices": [
                            {
                                "index": 0,
                                "delta": {"role": "assistant", "content": delta},
                                "finish_reason": None,
                            }
                        ],
                    }
                    yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"

                elif event_type == "message_end":
                    usage = event.get("metadata", {}).get("usage", {})
                    chunk = {
                        "id": chat_id,
                        "object": "chat.completion.chunk",
                        "created": created,
                        "model": model,
                        "choices": [
                            {
                                "index": 0,
                                "delta": {},
                                "finish_reason": "stop",
                            }
                        ],
                        "usage": {
                            "prompt_tokens": usage.get("prompt_tokens", 0),
                            "completion_tokens": usage.get("completion_tokens", 0),
                            "total_tokens": usage.get("total_tokens", 0),
                        },
                    }
                    yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
                    yield "data: [DONE]\n\n"
                    return

                elif event_type == "error":
                    error_chunk = {
                        "id": chat_id,
                        "object": "chat.completion.chunk",
                        "created": created,
                        "model": model,
                        "choices": [
                            {
                                "index": 0,
                                "delta": {"role": "assistant", "content": f"Error: {event.get('message', '')}"},
                                "finish_reason": "stop",
                            }
                        ],
                    }
                    yield f"data: {json.dumps(error_chunk, ensure_ascii=False)}\n\n"
                    yield "data: [DONE]\n\n"
                    return


@app.post("/v1/chat/completions")
async def chat(req: Request):
    data = await req.json()
    messages = data.get("messages", [])
    system_prompt, user_input = extract_user_input(messages)
    model = data.get("model", "dify")
    user_id = get_user_id(data)

    return StreamingResponse(
        stream_dify_response(system_prompt, user_input, model, user_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        },
    )


@app.delete("/v1/conversations/{user_id}")
async def clear_conversation(user_id: str):
    conversation_store.pop(user_id, None)
    return {"message": f"Conversation cleared for user: {user_id}"}
☁️ 部署建议
如果你打算长期运行项目(博客 / API / 自动化脚本),建议直接用云服务器,会比本地稳定很多。
👉 查看云服务器(新用户优惠)