知识AI
约 2137 字大约 7 分钟
2025-07-02
1.前言
基于策略模式封装多 LLM 模型(讯飞星火、通义千问、智谱),结合大模型的 Stream API 与 SSE 实现回答逐步输出
2.策略模式
2.1 介绍
定义:策略模式是一种行为型设计模式,可以在运行时根据不同的策略选择不同的算法或者行为
策略模式的优点:
- 将算法的定义和使用分离,符合开闭原则,方便扩展添加新的算法,只需要增加新的具体策略类,不需要修改策略上下文
- 代替原来大量的条件判断语句,提高代码的可读性和可维护性
策略模式的缺点:
- 客户端需要了解不同的策略类,来选择合适的策略对象,增加了客户端的复杂性
- 策略模式增加了对象的数量,每个具体策略类都需要一个独立的对象,导致系统中对象数量增加
解决第一个缺点的方法:外观模式
外观模式是一种结构型设计模式, 能为程序库、 框架或其他复杂类提供一个简单的接口
ChatFacade 外观模式来解决了这个问题,将具体的 AI 选择逻辑给屏蔽了,对于调用者而言,直接调用 chatService.autoChat(aisource,question,callback)
2.2 使用姿势
定义抽象策略接口:设计一个通用的 ChatService 接口,定义聊天相关的方法
public interface ChatService { /** * 具体AI选择 */ AISourceEnum source(); /** * 是否时异步优先 */ default boolean asyncFirst() { return true; } /** * 开始进入聊天 */ ChatRecordsVo chat(Long user, String question); /** * 开始进入聊天 */ ChatRecordsVo chat(Long user, String question, Consumer<ChatRecordsVo> consumer); /** * 异步聊天 */ ChatRecordsVo asyncChat(Long user, String question, Consumer<ChatRecordsVo> consumer); /** * 查询聊天历史 */ ChatRecordsVo getChatHistory(Long user, AISourceEnum aiSource); }
具体的策略实现,实现 ChatService 接口和方法
- XunFeiAiServicelmpl: 讯飞星火大模型的聊天实现
- ZhipuAiServiceImpl:智谱 AI
- AliAiServiceImpl:通义千问 AI
选择策略的上下文:ChatServiceFactory
这里主要借助了Spring的bean List注入方式,一次拿到上面的这些策略实现,然后保存到一个Map中,然后枚举 AiSourceEnum 来进行选择具体的策略
@Component public class ChatServiceFactory { private final Map<AISourceEnum, ChatService> chatServiceMap; public ChatServiceFactory(List<ChatService> chatServiceList) { chatServiceMap = Maps.newHashMapWithExpectedSize(chatServiceList.size()); for (ChatService chatService : chatServiceList) { chatServiceMap.put(chatService.source(), chatService); } } public ChatService getChatService(AISourceEnum aiSource) { return chatServiceMap.get(aiSource); } }
外观模式
public ChatRecordsVo autoChat(AISourceEnum source, String question, Consumer<ChatRecordsVo> callback) { if (source.asyncSupport() && chatServiceFactory.getChatService(source).asyncFirst()) { // 支持异步且异步优先的场景下,自动选择异步方式进行聊天 return asyncChat(source, question, callback); } return chat(source, question, callback); }
3.切换语言大模型
3.1 客户端
每当我们在页面上切换大模型时,都会建立一个 websocket 连接,如下所示,包含了 ws://地址/gpt/ id / 大模型类型
页面源码如下:每次切换大模型,都会执行 initWs 方法
function initWs() {
let protocol = window.location.protocol.replace("http", "ws");
let host = window.location.host;
let aiType = $('#chat-type').val();
console.log("AITYPE = ", aiType);
let socket = new WebSocket(`${protocol}//${host}/gpt/${session}/${aiType}`);
stompClient = Stomp.over(socket);
stompClient.connect({}, function (frame) {
console.log('ws连接成功: ' + frame);
wsConnected = true;
inputField.removeAttr("disabled");
// 改变 inputField 的 placeholder
inputField.val("");
inputField.attr("placeholder", "可按回车发送");
sendBtnText.text("发送");
sendBtn.removeAttr("disabled");
// 改变 chatTitle 的内容
chatTitle.text(user.userName);
3.2 服务端配置消息代理
- stomp协议的websocket实现的chatgpt聊天方式
@Slf4j
@Configuration
@EnableWebSocketMessageBroker // 开启websocket代理
public class WsChatConfig implements WebSocketMessageBrokerConfigurer {
}
WsChatConfig 类添加一个服务端点,来接收客户端的连接,即客户端创建ws时,指定的地址, chat/index.html: let socket = new WebSocket(${protocol}//${host}/gpt/${session}/${aiType}
);
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 注册一个 /gpt/{id} 的 WebSocket endPoint; 其中 {id} 用于让用户连接终端时都可以有自己的路径
// 作为 Principal 的标识,以便实现向指定用户发送信息
// sockjs 可以解决浏览器对 WebSocket 的兼容性问题,
registry.addEndpoint("/gpt/{id}/{aiType}", "/notify")
.setHandshakeHandler(new AuthHandshakeHandler())
.addInterceptors(new AuthHandshakeInterceptor())
// 注意下面这个,不要使用 setAllowedOrigins("*"),使用之后有啥问题可以实操验证一下🐕
// setAllowedOrigins接受一个字符串数组作为参数,每个元素代表一个允许访问的客户端地址,内部的值为具体的 "http://localhost:8080"
// setAllowedOriginPatterns接受一个正则表达式数组作为参数,每个元素代表一个允许访问的客户端地址的模式, 内部值可以为正则,如 "*", "http://*:8080"
.setAllowedOriginPatterns("*")
;
}
3.3 服务端配置握手拦截器
握手拦截器,用于用户身份校验和设置语言大模型类型
@Slf4j
public class AuthHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
/**
* 握手前,进行用户身份校验识别
*
* @param request
* @param response
* @param wsHandler
* @param attributes: 即对应的是Message中的 simpSessionAttributes 请求头
* @return
* @throws Exception
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
log.info("准备开始握手了!");
String session = SessionUtil.findCookieByName(request, LoginService.SESSION_KEY);
ReqInfoContext.ReqInfo reqInfo = new ReqInfoContext.ReqInfo();
SpringUtil.getBean(GlobalInitService.class).initLoginUser(session, reqInfo);
if (reqInfo.getUser() == null) {
log.info("websocket 握手失败,请登录之后再试");
return false;
}
// 将用户信息写入到属性中
attributes.put(MdcUtil.TRACE_ID_KEY, SelfTraceIdGenerator.generate());
attributes.put(LoginService.SESSION_KEY, reqInfo);
attributes.put(WsAnswerHelper.AI_SOURCE_PARAM, initAiSource(request.getURI().getPath()));
return true;
}
private String initAiSource(String path) {
int index = path.lastIndexOf("/");
return path.substring(index + 1);
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) {
log.info("握手成功了!!!");
super.afterHandshake(request, response, wsHandler, ex);
}
}
3.3 客户端发送聊天消息
@MessageMapping("/chat/{session}")
public void chat(String msg,
@DestinationVariable("session") String session,
@Header("simpSessionAttributes") Map<String, Object> attrs,
SimpMessageHeaderAccessor accessor) {
String aiType = (String) attrs.get(WsAnswerHelper.AI_SOURCE_PARAM);
WebSocketResponseUtil.execute(accessor, () -> {
log.info("{} 用户开始了对话: {} - {}", ReqInfoContext.getReqInfo().getUser(), aiType, msg);
AISourceEnum source = aiType == null ? null : AISourceEnum.valueOf(aiType);
answerHelper.sendMsgToUser(source, session, msg);
});
}
3.4 小结
- 用户每次切换大模型,客户端会重新建立一个 websocket 连接
- 服务端通过实现 WebSocketMessageBrokerConfigurer 接口来配置 WebSocket 消息代理,并配置服务端点和设置对应的握手拦截器
- 在握手拦截器中,在握手前进行身份验证无误后,保存用户信息和选择的语言大模型类型到 SimpMessageHeaderAccessor 的键值对中
- 用户每次发送请求到后端中,从 SimpMessageHeaderAccessor 读取到大语言模型类型
4.接收用户发送的消息
5.处理大模型响应消息
5.1 DeepSeek
- 初始化一个基于 OkHttp—SSE 的交互对象 OkHttpClient 对象
@Slf4j
@Component
public class DeepSeekIntegration {
@Autowired
private DeepSeekConf deepSeekConf;
private OkHttpClient okHttpClient;
@PostConstruct
public void init() {
this.okHttpClient = new OkHttpClient.Builder()
.connectTimeout(deepSeekConf.getTimeout(), TimeUnit.SECONDS) // 建立连接的超时时间
.readTimeout(deepSeekConf.getTimeout(), TimeUnit.SECONDS) // 建立连接后读取数据的超时时间
.writeTimeout(deepSeekConf.getTimeout(), TimeUnit.SECONDS)
.build();
}
}
- 封装与 DeekSeek 开放平台的请求交互
/**
* 使用流式聊天接口发送聊天请求
* 该方法将聊天请求转换为流式请求,并使用EventSource监听器处理响应
*
* @param req 聊天请求对象,包含聊天所需的参数
* @param listener EventSource监听器,用于处理服务器发送的事件
*/
private void executeStreamChat(ChatReq req, EventSourceListener listener) {
// 设置请求为流式请求
req.setStream(true);
try {
// 创建EventSource工厂,用于生成EventSource对象
EventSource.Factory factory = EventSources.createFactory(okHttpClient);
// 将聊天请求对象转换为JSON字符串
String body = JsonUtil.toStr(req);
// 构建请求对象,指定URL、认证头、内容类型头以及请求体
Request request = new Request.Builder()
.url(deepSeekConf.getApiHost() + "/chat/completions")
.addHeader("Authorization", "Bearer " + deepSeekConf.getApiKey())
.addHeader("Content-Type", "application/json")
.post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), body))
.build();
// 使用工厂创建新的EventSource,并传入请求和监听器
factory.newEventSource(request, listener);
} catch (Exception e) {
// 记录请求失败的日志
log.error("deepseek联调请求失败: {}", req, e);
}
}
- 提供对外交互的业务方法
public void streamReturn(List<ChatItemVo> list, EventSourceListener listener) {
// 构建多轮聊天的会话上下文
List<ChatMsg> msgList = ChatConstants.toMsgList(list, this::toMsg);
// 执行流式聊天,将构建好的对话上下文传递给聊天机器人,并监听响应事件
this.executeStreamChat(msgList, listener);
}
- 使用
@Override
public AiChatStatEnum doAsyncAnswer(Long user, ChatRecordsVo response, BiConsumer<AiChatStatEnum, ChatRecordsVo> consumer) {
// 获取问答中的最新的记录,用于问答
ChatItemVo item = response.getRecords().get(0);
// 创建一个抽象流监听器来处理流式返回的结果
AbstractStreamListener listener = new AbstractStreamListener() {
// 当接收到消息时的处理
@Override
public void onMsg(String message) {
// 成功返回结果的场景, 过滤掉开头的空行
if (StringUtils.isNotBlank(lastMessage)) {
item.appendAnswer(message);
consumer.accept(AiChatStatEnum.MID, response);
if (log.isDebugEnabled()) {
log.debug("DeepSeek返回内容: {}", lastMessage);
}
}
}
// 当遇到错误时的处理
@Override
public void onError(Throwable throwable, String res) {
// 返回异常的场景
item.appendAnswer("Error:" + (StringUtils.isBlank(res) ? throwable.getMessage() : res))
.setAnswerType(ChatAnswerTypeEnum.STREAM_END);
consumer.accept(AiChatStatEnum.ERROR, response);
if (log.isDebugEnabled()) {
log.debug("DeepSeek返回异常: {}", lastMessage);
}
}
};
// 注册回答结束的回调钩子
listener.setOnComplate((s) -> {
if (log.isDebugEnabled()) {
log.debug("这一轮对话聊天已结束,完整的返回结果是:{}", s);
}
item.appendAnswer("\n")
.setAnswerType(ChatAnswerTypeEnum.STREAM_END);
consumer.accept(AiChatStatEnum.END, response);
});
// 调用深度寻求流式返回的方法
deepSeekIntegration.streamReturn(response.getRecords(), listener);
return AiChatStatEnum.IGNORE;
}
}