RocketMQ复杂过滤尝试

news/2024/7/7 19:38:37 标签: rocketmq

需求

消息实体,根据实体中的一个字段,决定推给多个业务系统。

例:一个点位信息Bean,这个点位信息,设备、能源、安全都有用,那么点位信息表中有适用模块标识。

点位新增 需要通知所有勾选业务系统  tag - add

点位编辑 需要新增勾选业务系统标识 tag - add ,移除勾选 tag - delete ,不变 tag - update

点位删除 通知所有勾选的系统 tag - delete

分析

1、MQ不支持同个消息,一下子发送到不同的topic。发多次也可以实现,但是多少有点抵触

2、那么发送到同一个topic下,让各个业务系统来取,那么必定需要去过滤,不然拿到不属于本业务系统的点位信息了,仅仅靠tag明显是不够的,服务端过滤可以采用SQL92方式

3、那么我随之就想到也可以在各个业务系统中过滤了,不是本业务系统的标识,直接返回。不执行相关逻辑。

本人更倾向于第二种实现

编码

尝试一:SQL92过滤实现

再broker.conf 新增配置 enablePropertyFilter=true

生产者

public class AddProducer {
 
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("a-group");
        producer.setNamesrvAddr("192.168.0.211:9876");
        producer.start();

        Point point0 = new Point(0, "point0");
        Point point1 = new Point(1, "point1");
        Point point2 = new Point(2, "point2");
        Point point3 = new Point(3, "point3");
        Point point4 = new Point(4, "point4");
        Point point5 = new Point(5, "point5");
        Point point6 = new Point(6, "point6");
        Point point7 = new Point(7, "point7");
        Point point8 = new Point(8, "point8");
        Point point9 = new Point(9, "point9");

        ArrayList<Point> list = new ArrayList<>();
        list.add(point0);
        list.add(point1);
        list.add(point2);
        list.add(point3);
        list.add(point4);
        list.add(point5);
        list.add(point6);
        list.add(point7);
        list.add(point8);
        list.add(point9);
 
 
 
 
        try {
 
            for (Point bean : list) {
                Message msg = new Message("UNIFIED_POINT", "add", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
                msg.putUserProperty("testValue",String.valueOf(bean.getId()));
                producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
                System.out.println(bean.getId() + " Continue execution ");
            }

            Thread.sleep(20000);

            for (Point bean : list) {
                Message msg = new Message("UNIFIED_POINT", "update", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
                msg.putUserProperty("testValue",String.valueOf(bean.getId()));
                producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
                System.out.println(bean.getId() + " Continue execution ");
            }
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者1 - 模拟业务系统1

    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("calc_rules_point");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and testValue between 0 and 3)"));
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setSuspendCurrentQueueTimeMillis(2000);
 
        // 注册消息监听器
        consumer.registerMessageListener(new CustomMessageListenerOrderly());
 
        // 启动消费者
        consumer.start();
        System.out.printf("Group1ConsumerTagA Started.%n");
    }

消费者2 - 业务系统2

public class consumer2 {
 
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("a_point");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and testValue between 4 and 6)"));
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setSuspendCurrentQueueTimeMillis(2000);
 
        // 注册消息监听器
        consumer.registerMessageListener(new CustomMessageListenerOrderly());
 
        // 启动消费者
        consumer.start();
        System.out.printf("Group1ConsumerTagA Started.%n");
    }
}

测试结果

消费者1

消费者2

sql92 确实可以实现我的需求。

那么我把testValue换成各业务系统唯一标识,逗号拼接

把生产者调整一下

msg.putUserProperty("testValue","aaaa,bbbb,cccc");

客户端试了 in 不行

consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and 'aaaa' in (testValue))"));

like

consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and testValue LIKE '%aaaa%')"));

不行,只支持这些关键字

in 关键字 只能变量in 特定字符集合这样表达。

思路转变一下,也不是说不能实现

msg.putUserProperty("aaaa","aaaa");
msg.putUserProperty("bbbb","bbbb");
msg.putUserProperty("cccc","cccc");

生产者 setProperty 每个业务系统一个区分开来。

消费者即可实现

consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("aaaa is not null"));

尝试二:消费端逻辑处理

修改生产者

测试的点位信息加上适用系统标识(逗号分隔)

public class AddProducer {
 
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("a-group");
        producer.setNamesrvAddr("192.168.0.211:9876");
        producer.start();

        Point point1 = new Point(1, "point1","1,2");
        Point point2 = new Point(2, "point2","1,2");
        Point point3 = new Point(3, "point3","1,4");
        Point point4 = new Point(4, "point4","1,4");
        Point point5 = new Point(5, "point5","1,5");

        ArrayList<Point> list = new ArrayList<>();
        list.add(point1);
        list.add(point2);
        list.add(point3);
        list.add(point4);
        list.add(point5);

 
 
 
        try {
 
            for (Point bean : list) {
                Message msg = new Message("UNIFIED_POINT", "add", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
                System.out.println(bean.getId() + " Continue execution ");
            }

            Thread.sleep(20000);

            for (Point bean : list) {
                Message msg = new Message("UNIFIED_POINT", "update", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
                System.out.println(bean.getId() + " Continue execution ");
            }
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

模拟消费者一:业务系统标识为1

public class consumer {
 
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("calc_rules_point");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("UNIFIED_POINT", "*");
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setSuspendCurrentQueueTimeMillis(2000);
 
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                try {
                    for (MessageExt msg : list) {
                        String data = new String(msg.getBody());
                        Point p = JSONUtil.toBean(data, Point.class);

                        List<String> ids = Stream.of(p.getApplyModuleIds().split(","))
                                .map(String::trim)
                                .collect(Collectors.toList());
                        if(!ids.contains("1")){
                            //非本业务系统 直接返回
                            return ConsumeOrderlyStatus.SUCCESS;
                        }
                        if(msg.getTags().equals("add")){
                            System.out.println("新增消费:" + p + msg.getQueueId());
                        }else if(msg.getTags().equals("update")){
                            System.out.println("修改消费:" + p + msg.getQueueId());
                        }

                    }

                    return ConsumeOrderlyStatus.SUCCESS;
                } catch (Exception e) {
                    MessageExt msg = list.get(0);
                    log.error("consumer news error " + new String(msg.getBody()));
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });
 
        // 启动消费者
        consumer.start();
        System.out.printf("Group1ConsumerTagA Started.%n");
    }
}

模拟消费者二:业务系统标识2

public class consumer2 {
 
    public static void main(String[] args) throws Exception {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("a_point");
        // 指定Namesrv地址
        consumer.setNamesrvAddr("192.168.0.211:9876");
        // 订阅主题和标签
        consumer.subscribe("UNIFIED_POINT", "*");
        // 设置Consumer第一次启动是从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.setSuspendCurrentQueueTimeMillis(2000);
 
        // 注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                try {
                    for (MessageExt msg : list) {
                        String data = new String(msg.getBody());
                        Point p = JSONUtil.toBean(data, Point.class);

                        List<String> ids = Stream.of(p.getApplyModuleIds().split(","))
                                .map(String::trim)
                                .collect(Collectors.toList());
                        if(!ids.contains("2")){
                            //非本业务系统 直接返回
                            return ConsumeOrderlyStatus.SUCCESS;
                        }
                        if(msg.getTags().equals("add")){
                            System.out.println("新增消费:" + p + msg.getQueueId());
                        }else if(msg.getTags().equals("update")){
                            System.out.println("修改消费:" + p + msg.getQueueId());
                        }

                    }

                    return ConsumeOrderlyStatus.SUCCESS;
                } catch (Exception e) {
                    MessageExt msg = list.get(0);
                    log.error("consumer news error " + new String(msg.getBody()));
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });
 
        // 启动消费者
        consumer.start();
        System.out.printf("Group1ConsumerTagA Started.%n");
    }
}

测试结果 - 消费者1

测试结果 - 消费者2

结论

SQL92方式:过滤在服务端,但是功能还是局限的。如果服务器性能好压力不大,且过滤方式能满足。个人任务还是可用的

消费端逻辑处理方式:略费带宽,服务器压力小。过滤在消费端


http://www.niftyadmin.cn/n/5534993.html

相关文章

透过 Go 语言探索 Linux 网络通信的本质

大家好&#xff0c;我是码农先森。 前言 各种编程语言百花齐放、百家争鸣&#xff0c;但是 “万变不离其中”。对于网络通信而言&#xff0c;每一种编程语言的实现方式都不一样&#xff1b;但其实&#xff0c;调用的底层逻辑都是一样的。linux 系统底层向上提供了统一的 Sock…

hive4 从入门到精通

查询hive 架构 准备 HDFS配置 vim $HADOOP_HOME/etc/hadoop/core-site.xml <!--配置所有节点的root用户都可作为代理用户--><property><name>hadoop.proxyuser.root.hosts</name><value>*</value></property><!--配置root用户…

在数据库中,什么是主码、候选码、主属性、非主属性?

在数据库中&#xff0c;主码、候选码、主属性和非主属性是几个重要的概念&#xff0c;它们对于理解数据库的结构和数据的完整性至关重要。以下是对这些概念的详细解释&#xff1a; 一、主码&#xff08;Primary Key&#xff09; 定义&#xff1a;主码&#xff0c;也被称为主键…

数据库设计 物理模型和逻辑模型

在数据库设计中&#xff0c;物理模型和逻辑模型是两个关键阶段&#xff0c;它们分别代表了数据库设计的不同层面和细节。以下是对这两个模型的详细解释及涉及到的内容&#xff1a; 逻辑模型&#xff08;Logical Data Model, LDM&#xff09; 定义与概述&#xff1a; 逻辑数据…

java如何在字符串中间插入字符串

java在字符串中插入字符串&#xff0c;需要用到insert语句 语法格式为 sbf.insert(offset,str) 其中,sbf是任意字符串 offset是插入的索引 str是插入的字符串 public class Insert {public static void main(String[] args) {// 将字符串插入到指定索引StringBuffer sbfn…

用Vue3和Rough.js绘制一个交互式3D图

本文由ScriptEcho平台提供技术支持 项目地址&#xff1a;传送门 基于Rough.js和GSAP创建交互式SVG图形卡片 应用场景 本代码适用于需要创建动态交互式SVG图形卡片的场景&#xff0c;例如网页设计、数据可视化和交互式艺术作品。 基本功能 该代码利用Rough.js和GSAP库&…

编写高效的Java工具类:实用技巧与设计模式

编写高效的Java工具类&#xff1a;实用技巧与设计模式 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 1. 工具类的定义与作用 在软件开发中&#xff0c;工具…

VMware中的三种虚拟网络模式

虚拟机网络模式 1 主机网络环境2 VMware中的三种虚拟网络模式2.1 桥接模式2.2 NAT模式2.3 仅主机模式 3 网络模式选择及配置NAT模式3.1 VMware虚拟网络配置3.2 虚拟机选择网络模式3.3 Windows主机网络配置 4 配置静态IP 虚拟机联网方式为桥接模式&#xff0c;这种模式下&#x…