东莞商城网站建设价格,一台vps主机可以建设多少个网站,兰州网站建设多少钱,德州市建设局质监站网站目录
1.序言
2.后端集成
2.1.pom.xml和集成配置
2.2.行为接口封装
2.3.Spring的自定义消息策略模式
2.3.1.定义接口#xff0c;描点注解和消息枚举类
2.3.2.策略接口实现类
2.4.2.策略工厂类
2.4.Spring的事件通知机制
2.4.1.自定义事件源
2.4.2.事件监听器
2.4.3…目录
1.序言
2.后端集成
2.1.pom.xml和集成配置
2.2.行为接口封装
2.3.Spring的自定义消息策略模式
2.3.1.定义接口描点注解和消息枚举类
2.3.2.策略接口实现类
2.4.2.策略工厂类
2.4.Spring的事件通知机制
2.4.1.自定义事件源
2.4.2.事件监听器
2.4.3.事件发布器
2.4.4.测试
3.前端集成
3.1.页面集成
3.2.vite的反向代理配置
4.总之感觉还是很简单的 1.序言 WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工(full-duplex)通讯的协议。没有了 Request 和 Response 的概念两者地位完全平等连接一旦建立就建立了真持久性连接双方可以通过WebSocket随时向对方发送数据。
目的
服务器端的资源经常在更新客户端需要尽量及时地知道这些更新从而展示给用户。
常用的解决方案
前端定时轮询效率低非常浪费资源网络带宽和计算资源。有一定延迟、服务器压力较大并且大部分是无效请求。
SSE一种基于HTTP的以流的形式由服务端持续向客户端发送数据的技术是一种半双工通讯方式。听说有用过有机会再整理
DWR不熟悉的可以参考我的另一篇文章专门针对DWR框架的讲解
Websocket想了解原理或者属性方法等可以参考我的下载有专门的PPT讲解
本篇文章主要是实战开发后端语言是java前端是vue3vite
2.后端集成
2.1.pom.xml和集成配置
springboot2.5.3版本
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-websocket/artifactId
/dependency
增加配置文件WebsocketConfig.java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;EnableWebSocket
Configuration
public class WebsocketConfig {Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}
2.2.行为接口封装
IWebsocket接口定义包含了websocket的4大基础方法业务逻辑客户端的增查删改操作打标记等行为。
import com.xxx.notification.websocket.dto.CacheClient;import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;public interface IWebsocket {interface ILinkPoint {//连接时回调void onOpen(Session session);//收到消息时回调void onMessage(String message);//连接关闭时回调void onClose();//发生错误时回调void onError(Session session, Throwable throwable);}interface IClient {//获取会话Session getSession();//获取标记String getTag();//发送文本void sendText(String text);//发送对象void send(Object object);}interface IManagerT extends CacheClient {//向指定客户端发送文本void sendText(String text, ListT clients);//向所有客户端发送文本void sendTextYoAll(String text);//添加客户端void addClient(T client);//获取所有客户端CopyOnWriteArraySetT all();//移除客户端void removeClients(ListT clients);//根据标记获取客户端T getClientByTag(String tag);//根据标记获取多个客户端T[] getClientsByTags(ListString tags);}
接口实现类ClientService 包含了基础操作
import com.xxx.exception.I18nServerEndException;
import com.xxx.notification.websocket.IWebsocket;
import com.xxx.notification.websocket.dto.CacheClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;Slf4j
Component
ServerEndpoint(/api/v1/ws/{username})
public class ClientService implements IWebsocket.ILinkPoint, IWebsocket.IClient {private static ManagerService managerService ManagerService.getInstance();private String tag;private Session session;OnOpenOverridepublic void onOpen(Session session) {MapString, ListString requestParameterMap session.getRequestParameterMap();String username requestParameterMap.get(username).get(0);String types requestParameterMap.get(types).get(0);ListString typeList Arrays.asList(types.split(,));CacheClient cacheClient CacheClient.getInstance(username, typeList, this);managerService.addClient(cacheClient);this.session session;this.tag username;}OnMessageOverridepublic void onMessage(String message) {}OnCloseOverridepublic void onClose() {SetCacheClient allCacheClients managerService.all();ListCacheClient list allCacheClients.stream().filter(client - StringUtils.equals(client.getUsername(), tag)).collect(Collectors.toList());if(list ! null list.size() ! 0){managerService.removeClients(list);}}OnErrorOverridepublic void onError(Session session, Throwable throwable) {try {session.close();onClose();} catch (IOException e) {e.printStackTrace();log.error(websocket报错, e);throw new I18nServerEndException(common.tips_12);}}Overridepublic Session getSession() {return this.session;}Overridepublic String getTag() {return this.tag;}Overridepublic synchronized void sendText(String text) {try {session.getBasicRemote().sendText(text);} catch (IOException e) {e.printStackTrace();log.error(推送消息失败, e);}}Overridepublic synchronized void send(Object object) {try {session.getBasicRemote().sendObject(object);} catch (IOException | EncodeException e) {log.error(推送消息失败, e);}}
}
接口实现类ClientService 包含了上层的行为操作
import com.xxx.notification.websocket.IWebsocket;
import com.xxx.notification.websocket.dto.CacheClient;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;public class ManagerService implements IWebsocket.IManagerCacheClient {private static CopyOnWriteArraySetCacheClient linkSet new CopyOnWriteArraySet();//单例private static ManagerService instance new ManagerService();private ManagerService() {}public static ManagerService getInstance() {return instance;}Overridepublic void sendText(String text, ListCacheClient clients) {for (CacheClient cacheClient : clients) {cacheClient.getClientService().sendText(text);}}Overridepublic void sendTextYoAll(String text) {for (CacheClient cacheClient : linkSet) {cacheClient.getClientService().sendText(text);}}Overridepublic void addClient(CacheClient client) {linkSet.add(client);}Overridepublic CopyOnWriteArraySetCacheClient all() {return linkSet;}Overridepublic void removeClients(ListCacheClient clients) {for (CacheClient cacheClient : clients) {linkSet.remove(cacheClient);}}Overridepublic CacheClient getClientByTag(String tag) {for (CacheClient clientService : linkSet) {if (clientService.getClientService().getTag().equals(tag)) {return clientService;}}return null;}Overridepublic CacheClient[] getClientsByTags(ListString tags) {if (null tags || tags.size() 0) {return null;}SetString tagSet tags.stream().collect(Collectors.toSet());ListCacheClient clientList linkSet.stream().filter(c - tagSet.contains(c.getClientService().getTag())).collect(Collectors.toList());CacheClient[] clients new CacheClient[clientList.size()];clientList.toArray(clients);return clients;}
}
涉及到的泛型对象CacheClient
import com.xxx.notification.websocket.service.ClientService;
import com.xxx.system.dao.UserDao;
import com.xxx.system.model.User;
import com.xxx.util.SpringContextUtil;
import lombok.Data;
import java.util.List;
import java.util.stream.Collectors;Data
public class CacheClient {private ListInteger types;private ClientService clientService;private String username;private String domain;public static CacheClient getInstance(String username, ListString types, ClientService clientService){UserDao userDao (UserDao)SpringContextUtil.getBean(userDao);User user userDao.getByUsername(username);String domain user.getDomain();CacheClient cacheClient new CacheClient();cacheClient.setUsername(username);cacheClient.setTypes(types.stream().map(Integer::parseInt).collect(Collectors.toList()));cacheClient.setClientService(clientService);cacheClient.setDomain(domain);return cacheClient;}Overridepublic boolean equals(Object o) {if (this o) {return true;}if (o null || getClass() ! o.getClass()) {return false;}CacheClient that (CacheClient) o;return username ! null ? username.equals(that.username) : that.username null;}Overridepublic int hashCode() {int result types ! null ? types.hashCode() : 0;result 31 * result (username ! null ? username.hashCode() : 0);result 31 * result (domain ! null ? domain.hashCode() : 0);return result;}
}
2.3.Spring的自定义消息策略模式
2.3.1.定义接口描点注解和消息枚举类
import com.alibaba.fastjson.JSON;
import com.xxx.notification.constants.NotificationType;
import com.xxx.notification.websocket.dto.CacheClient;
import com.xxx.notification.websocket.service.ManagerService;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;/*** 消息服务*/
public interface INotificationServiceP {ManagerService managerService ManagerService.getInstance();default void sendMessage(NotificationType notificationType, Object message, SetCacheClient allCacheClients){Integer type notificationType.getType();ListCacheClient list allCacheClients.stream().filter(client - client.getTypes().contains(type)).collect(Collectors.toList());if(list ! null list.size() ! 0){managerService.sendText(JSON.toJSONString(message), list);}}/*** 发起推送* param p* return*/public void pushNotification(P p);
}import java.lang.annotation.*;/*** 策略描点注解*/
Target({ElementType.TYPE})
Retention(RetentionPolicy.RUNTIME)
Documented
public interface StrategyAnchor {NotificationType[] type();
}/*** 通知类型枚举*/
public enum NotificationType {DEVICE_ALARM(1, deviceAlarm),SYSTEM_ALARM(2, systemAlarm);private Integer type;private String name;NotificationType(Integer type, String name) {this.type type;this.name name;}public Integer getType() {return type;}public void setType(Integer type) {this.type type;}public String getName() {return name;}public void setName(String name) {this.name name;}
}
2.3.2.策略接口实现类
只写了一种实现类
import com.xxx.dto.ObjectResponse;
import com.xxx.notification.constants.NotificationType;
import com.xxx.notification.constants.StrategyAnchor;
import com.xxx.notification.dto.NotificationDto;
import com.xxx.notification.event.AlarmEvent;
import com.xxx.notification.service.INotificationService;
import com.xxx.notification.websocket.dto.CacheClient;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Service;
import java.util.Set;
import java.util.stream.Collectors;Service
StrategyAnchor(type NotificationType.DEVICE_ALARM)
public class DeviceAlarmNotificationService implements INotificationServiceAlarmEvent {Overridepublic void pushNotification(AlarmEvent alarmEvent) {NotificationType notificationType alarmEvent.getNotificationType();String domain alarmEvent.getDomain();if(StringUtils.isBlank(domain)) return;SetCacheClient allCacheClient managerService.all();SetCacheClient cacheClients allCacheClient.stream().filter(client - {String clientDomain client.getDomain();if(StringUtils.isBlank(clientDomain)) return false;return StringUtils.startsWith(domain, stripEndStr(clientDomain));}).collect(Collectors.toSet());//发送websocketNotificationDto notificationDto NotificationDto.getInstance(notificationType, alarmEvent);sendMessage(notificationType, ObjectResponse.generateNormalResponse(notificationDto), cacheClients);}private String stripEndStr(String data){String temp StringUtils.stripEnd(data, 0);temp StringUtils.isBlank(temp) ? 0 : temp;return temp;}
}
2.4.2.策略工厂类
import com.xxx.notification.constants.NotificationType;
import com.xxx.notification.constants.StrategyAnchor;
import com.xxx.notification.service.INotificationService;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;Component
public class NotificationFactory implements ApplicationContextAware {private static MapInteger, INotificationService beans new ConcurrentHashMap();public static INotificationService getNotificationService(NotificationType notificationType) {return beans.get(notificationType.getType());}Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {MapString, INotificationService map applicationContext.getBeansOfType(INotificationService.class);map.forEach((beanName, bean) - {StrategyAnchor anchor applicationContext.findAnnotationOnBean(beanName, StrategyAnchor.class);Optional.ofNullable(anchor).ifPresent(an - Arrays.stream(anchor.type()).forEach(type - beans.put(type.getType(), bean)));});}
}
2.4.Spring的事件通知机制
结合消息策略模式和解耦合线程池模式AsyncEnableAsync异步推送消息
2.4.1.自定义事件源
import com.xxx.notification.constants.NotificationType;
import org.springframework.context.ApplicationEvent;public class AlarmEvent extends ApplicationEvent {private NotificationType notificationType;private String domain;public AlarmEvent(Object source) {super(source);}public NotificationType getNotificationType() {return notificationType;}public void setNotificationType(NotificationType notificationType) {this.notificationType notificationType;}public String getDomain() {return domain;}public void setDomain(String domain) {this.domain domain;}
}
2.4.2.事件监听器
import com.xxx.notification.NotificationFactory;
import com.xxx.notification.constants.NotificationType;
import com.xxx.notification.service.INotificationService;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;Component
EnableAsync
public class AlarmEventListener {AsyncEventListenerpublic void handleAlarmEvent(AlarmEvent event) {NotificationType notificationType event.getNotificationType();if(notificationType null || notificationType.getType() null) return;INotificationService notificationService NotificationFactory.getNotificationService(notificationType);if(notificationService null) return;notificationService.pushNotification(event);}
}
2.4.3.事件发布器
import com.xxx.alarm.model.Alarm;
import com.xxx.notification.constants.NotificationType;
import org.springframework.context.ApplicationEventPublisher;public class AlarmEventPublisher {/*** 设备告警发送事件* param alarm* param eventPublisher*/public static void publishDeviceAlarmEvent(Alarm alarm, ApplicationEventPublisher eventPublisher){AlarmEvent alarmEvent new AlarmEvent(alarm);alarmEvent.setNotificationType(NotificationType.DEVICE_ALARM);alarmEvent.setDomain(alarm.getDomain());eventPublisher.publishEvent(alarmEvent);}/*** 系统告警发送事件* param obj* param eventPublisher*/public static void publishSystemAlarmEvent(Object obj, ApplicationEventPublisher eventPublisher){AlarmEvent alarmEvent new AlarmEvent(obj);alarmEvent.setNotificationType(NotificationType.SYSTEM_ALARM);eventPublisher.publishEvent(alarmEvent);}
}
2.4.4.测试
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;Autowired
private ApplicationEventPublisher eventPublisher;
Autowired
private AlarmService alarmService;GetMapping(/test)
public voidtestEvent(String id){Alarm alarm alarmService.findOne(......);//实时消息推送AlarmEventPublisher.publishDeviceAlarmEvent(alarm, eventPublisher);
}
3.前端集成
vue3vite的方式
3.1.页面集成
templateh1Web Socket/h1
/templatescript langtsimport { ref,onUnmounted } from vue;export default {setup() {const ws ref();const initWs () {console.log(${location.host},location.host);ws.value new WebSocket(ws://${location.host}/wsUrl/{登录的用户名}?types1,2)// //连接发生错误的回调方法ws.value.onerror function () {console.log(ws连接发生错误);};//连接成功建立的回调方法ws.value.onopen function () {console.log(ws连接成功);}//接收到消息的回调方法ws.value.onmessage function (event:any) {console.log(的,event.data);}}initWs();onUnmounted(() {closeWebSocket();});const closeWebSocket () {ws.value.close();};return {};},
};
/script
3.2.vite的反向代理配置
在vite.config.ts文件中
server: {host: 0.0.0.0,// host: localhost,port: 3001,// // 是否自动在浏览器打开// open: true,// // 是否开启 https// https: false,// // 服务端渲染// ssr: false,proxy: {/wsUrl:{target: ws://后端IP地址:9090/xxx/api/v1/ws/,changeOrigin: true,ws: true,rewrite: (path) path.replace(/wsUrl, )},/api: {target: http://localhost:3333/,changeOrigin: true,ws: true,rewrite: (pathStr) pathStr.replace(/api, )},},},
其中rewrite属性把访问路径中的wsUrl设置成空串另外wsUrl可替换成任意字符只作为前端标识符的作用。
例如
前端访问地址是ws://127.0.0.1:3001/wsUrl/xyz经过反向代理给后端的地址是
ws://后端IP地址:9090/xxx/api/v1/ws/xyz
如果没有加rewrite属性
前端访问地址是ws://127.0.0.1:3001/wsUrl/经过反向代理给后端的地址是
ws://后端IP地址:9090/xxx/api/v1/ws/wsUrl就不符合实际要求了
4.总之感觉还是很简单的