package dynamic.sys; import com.alibaba.fastjson.JSON; import dynamic.model.Bq_Msg; import dynamic.model.Bq_Msg_User; import dynamic.model.Bq_Topics; import dynamic.model.Bq_User_Subscrptid; import microbee.http.annotation.Controller; import microbee.http.annotation.ModelResources; import microbee.http.apps.dbnet.ConditionPJ; import microbee.http.apps.dynamic.HoContext; import microbee.http.modulars.mq.MQ; import microbee.http.modulars.mq.Msg; import microbee.http.modulars.mq.User; import microbee.http.utills.GlobalData; import org.apache.commons.lang.StringUtils; import java.util.*; /** * @author :quanjinlong * @date :Created in 2025/3/6 9:43 * @description:MQ相关控制器 */ @Controller public class MqController { @ModelResources Bq_Msg_User bq_msg_user; @ModelResources Bq_Msg bq_msg; @ModelResources Bq_Topics bq_topics; @ModelResources Bq_User_Subscrptid bq_user_subscrptid; /** * 用户注册接口 * @param hoContext * @return */ public String register(HoContext hoContext) { // 获取参数 Map params = hoContext.httpRequest.getParams(); int sbscrbtp = Integer.parseInt(params.get("sbscrbtp")); String route = params.get("route"); int utype = Integer.parseInt(params.get("utype")); if (sbscrbtp == 1 && route.equals("")){ return "{\"code\":102,\"msg\":\"订阅类型为push时,路由不能为空\",\"user\":\"\"}"; } // 调用mq类的注册方法 User user = new User(); try { MQ mq = new MQ(); user = mq.register(sbscrbtp, route, utype); return "{\"code\":100,\"msg\":\"用户注册成功\",\"user\":\""+user.toString()+"\"}"; } catch (Exception e) { System.err.println(e.getMessage()); } return "{\"code\":101,\"msg\":\"用户注册失败\",\"user\":\"\"}"; } /** * 生产者发布消息 * @param hoContext * @return */ public String sendMsg(HoContext hoContext) { Map params = hoContext.httpRequest.getParams(); int uid = Integer.parseInt(params.get("uid")); String token = params.get("token"); String msg = params.get("msg"); int tpcid = Integer.parseInt(params.get("tpcid")); // 定义MQ类 MQ mq = new MQ(uid,token); // 1:发布成功 // 2:该用户未订阅任何主题 或 该用户订阅的主题未包含该主题 // 3:该用户为消费者,无权发布消息 int code = mq.sendMsg(msg, tpcid); String remsg = ""; switch (code){ case 1:remsg="发布成功";break; case 2:remsg="该用户未订阅任何主题或订阅的主题未包含该主题";break; case 3:remsg="该用户为消费者,无权发布消息";break; } return "{\"code\":"+code+",\"msg\":\""+remsg+"\"}"; } /** * 消费者获取消息 * @param hoContext * @return */ public String getMsg(HoContext hoContext) { Map params = hoContext.httpRequest.getParams(); int uid = Integer.parseInt(params.get("uid")); String token = params.get("token"); // 定义MQ类 MQ mq = new MQ(uid,token); // 获取消息,并返回消息体 Msg msg = null; // 定义返回map。返回时tojson。防止msg为json时,直接拼接json字符串格式问题 Map finalMap = new HashMap<>(); try { msg = mq.getMsg(); } catch (Exception e) { System.err.println(e.getMessage()); finalMap.put("code",101); finalMap.put("id",""); finalMap.put("creattime",""); finalMap.put("msg", e.getMessage()); return JSON.toJSONString(finalMap); } if (msg == null) { finalMap.put("code",101); finalMap.put("msg",""); finalMap.put("id",""); finalMap.put("creattime",""); return JSON.toJSONString(finalMap); } finalMap.put("code",100); finalMap.put("msg",msg.getMsg()); finalMap.put("id",msg.getId()); finalMap.put("creattime",msg.getCreattime()); return JSON.toJSONString(finalMap); } /** * 根据用户id获取该用户队列数据 * @param hoContext * @return */ public String userMsgQueue(HoContext hoContext) { Map params = hoContext.httpRequest.getParams(); String uid = params.get("uid"); // 判断内存存储或持久化存储 int mq_save = GlobalData.server_conf_dom4j.getMq_save(); List> userMsg = new ArrayList<>(); // 持久化 if (mq_save==1) { userMsg = userQueueDataBase(uid); } else { userMsg = userQueueMem(uid); } return JSON.toJSONStringWithDateFormat(userMsg, "yyyy-MM-dd HH:mm:ss"); } /** * 用户订阅主题(删除原有主题,重新添加到数据库) * @param hoContext * @return */ public String subscriptionTopic(HoContext hoContext) { Map params = hoContext.httpRequest.getParams(); String uid = params.get("uid"); String tpcids = params.get("tpcids"); List conditionPJList = new ArrayList<>(); conditionPJList.add(ConditionPJ.init(1, "uid", "mcb_eq", uid)); // 查询该用户是否有订阅主题 Map countMap = bq_user_subscrptid.gainCount(conditionPJList); int count = Integer.parseInt(countMap.get("total").toString()); if (count > 0) { // 删除该用户下的所有主题 boolean del = bq_user_subscrptid.delete(conditionPJList); if (!del) { return "{\"code\":101,\"msg\":\"订阅失败\"}"; } } // 删除成功后执行添加操作 List> inLst = new ArrayList<>(); String[] split = tpcids.split(","); for (String tpcid:split) { Map inMap = new HashMap<>(); inMap.put("uid", uid); inMap.put("tpcid", tpcid); inLst.add(inMap); } try { bq_user_subscrptid.batchInsert(inLst); return "{\"code\":100,\"msg\":\"订阅成功\"}"; } catch (Exception e) { System.err.println(e.getMessage()); return "{\"code\":101,\"msg\":\"订阅失败\"}"; } } /***************************************私有方法****************************************/ /** * 持久化存储(MySQL) * @param uid 用户id * @return */ private List> userQueueDataBase(String uid){ // 定义返回list List> final_lst = new ArrayList<>(); // 获取该用户下消息队列 List conditionPJS = ConditionPJ.inits(1, "uid", "mcb_eq", uid); Map map = bq_msg_user.gainOne(conditionPJS); if (map == null || map.isEmpty() || map.size() <= 0) { return final_lst; } if (Objects.isNull(map.get("msgids"))) { return final_lst; } // 分割消息队列,获取消息信息 String msgids = String.valueOf(map.get("msgids")); String[] msgids_split = msgids.split(","); for (String msgid:msgids_split){ // 根据消息id查询数据并获取主题名称 Map data = new HashMap<>(); data.put("id", msgid); List> msglst = bq_msg.msgTopicBymid(data); if (msglst.size()>0) { final_lst.addAll(msglst); } } return final_lst; } /** * 内存存储(redis) * @param uid 用户id * @return */ private List> userQueueMem(String uid){ // 定义返回list List> final_lst = new ArrayList<>(); // 获取该用户的消息队列 String queueItme = GlobalData.redisUtil.getByKey("queue_"+uid); if(queueItme==null || queueItme.length()<=0 || StringUtils.isEmpty(queueItme)){ return final_lst; } // 分割消息队列,获取消息信息 String[] msgids_split = queueItme.split(","); for (String msgid:msgids_split){ Msg msg = (Msg) GlobalData.redisUtil.getObject("msg_"+msgid); // 根据主题id查询主题名称 int tpcid = msg.getTpcid(); Map topic = bq_topics.gainOneById(tpcid); String tpc_name = ""; if (Objects.nonNull(topic.get("tpc_name"))) { tpc_name = String.valueOf(topic.get("tpc_name")); } Map final_map = new HashMap<>(); final_map.put("id", msg.getId()); final_map.put("msg", msg.getMsg()); final_map.put("creattime", msg.getCreattime()); final_map.put("tpc_name", tpc_name); final_lst.add(final_map); } return final_lst; } }