新站提交
  • 网站:264
  • 待审:2
  • 小程序:8
  • 文章:263051
  • 会员:36

今天需要使用  Springboot  来集成  Aliyun   消息队列,看了官方的  Demo  ,都是  Spring  XML 方式配置。对照着做了一遍  Springboot  配置的方式。直接上代码 配置。

Aliyun   队列申请,我就不做叙述了。链接:https://ons.console.aliyun.com/

Springboot yml MQ配置:

aliyun:
  accessKey: LTAIlH****7IMXnuV
  accessSecretKey: T981eNFj****EycS0jq8xdThdfXCo

  queue:
      log:
        topic: ios-****-log-topic_dev
        producerId: PID_ios-****-log_dev
        consumerId: CID_ios-****-log_dev
      stock:
        topic: ios-****-stock-topic_dev
        producerId: PID_ios-****-stock_dev
        consumerId: CID_ios-****-stock_dev

****是我不让你们看见而已,别你也****,另外建议本地测试或者开发采用公网的  MQ  ,线上配置采用你对应的  ECS  或者对应的地区的  MQ  。这样速度会快一点。

Springboot @Bean Java代码

@Value("${aliyun.accessKey}")
private String accessKey;
@Value("${aliyun.accessSecretKey}")
private String secretKey;


@Value("${aliyun.queue.log.producerId}")
private String logPriducerId;
@Value("${aliyun.queue.log.consumerId}")
private String logConsumerId;
@Value("${aliyun.queue.log.topic}")
private String logTopic;

@Value("${aliyun.queue.stock.producerId}")
private String stockPriducerId;
@Value("${aliyun.queue.stock.consumerId}")
private String stockConsumerId;
@Value("${aliyun.queue.stock.topic}")
private String stockTopic;

@Bean(name="logProducerBean")
public ProducerBean initLogProducerBean(){
    ProducerBean producerBean = new ProducerBean();
    Properties properties = new Properties();
    // 您在控制台创建的 Producer ID
    properties.put(PropertyKeyConst.ProducerId, logPriducerId);
    // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    properties.put(PropertyKeyConst.SecretKey, secretKey);

    producerBean.setProperties(properties);

    producerBean.start();
    return producerBean;
}
@Bean(name="stockProducerBean")
public ProducerBean initStockProducerBean(){
    ProducerBean producerBean = new ProducerBean();
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    properties.put(PropertyKeyConst.ProducerId, stockPriducerId);
    properties.put(PropertyKeyConst.SecretKey, secretKey);

    producerBean.setProperties(properties);
    producerBean.start();

    return producerBean;
}
@Bean(name="stockConsumerBean")
public ConsumerBean initStockConsumerBean(){
    ConsumerBean consumerBean = new ConsumerBean();
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.ConsumerId, stockConsumerId);
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    properties.put(PropertyKeyConst.SecretKey, secretKey);
    properties.put(PropertyKeyConst.ConsumeThreadNums, 50);

    consumerBean.setProperties(properties);

    //监听消息
    MessageListener messageListener = (message, context) -> {


        System.out.println("Receive2: "   message.getMsgID());
        try {

            return Action.CommitMessage;
        }catch (Exception e) {

            return Action.ReconsumeLater;
        }
    };
    Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();

    Subscription subscription = new Subscription();
    subscription.setTopic(stockTopic);
    subscription.setExpression("*");//tag 接受所有
    subscriptionTable.put(subscription,messageListener);
    consumerBean.setSubscriptionTable(subscriptionTable);
    consumerBean.start();




    return consumerBean;
}
@Bean(name="logConsumerBean")
public Consumer initLogConsumerBean(){
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.ConsumerId, logConsumerId);
    properties.put(PropertyKeyConst.AccessKey, accessKey);
    properties.put(PropertyKeyConst.SecretKey, secretKey);
    properties.put(PropertyKeyConst.ConsumeThreadNums, 40);


    Consumer consumer = ONSFactory.createConsumer(properties);

    consumer.start();
    //订阅消息
    //订阅多个 Tag  TagA||TagB,如果模糊订阅 *
    consumer.subscribe(logTopic, "TagA||TagB", (message, context) -> {
        System.out.println("Receive: "   message);
        try {
            //处理你的业务
            return Action.CommitMessage;
        }catch (Exception e) {

            return Action.ReconsumeLater;
        }
    });


    return consumer;
}

阿里云 队列重试次数说明:

这种普通队列,保持着至少一次的规则,就是当你返回 ReconsumeLater 后,它会按你的 Delay 次数来计算具体的时间再次请求。默认是16次,对应的时间表如下:

第几次重试 每次重试间隔时间 第几次重试 每次重试间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

自定义队列重试次数:

Properties properties = new Properties();
//配置对应 Consumer ID 的最大消息重试次数为20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);

队列重试规则的定义:

public class MessageListenerImpl implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext context) {
        //方法3:消息处理逻辑抛出异常,消息将重试
        doConsumeMessage(message);
        //方式1:返回 Action.ReconsumeLater,消息将重试
        return Action.ReconsumeLater;
        //方式2:返回 null,消息将重试
        return null;
        //方式3:直接抛出异常, 消息将重试
        throw new RuntimeException("Consumer Message exceotion");
    }
}


原文地址:293.html

转载时必须以链接形式注明原始出处及本声明。

本文主题: MQ Springboot Aliyun

如果本文对你有帮助,那么请你赞助我,让我更有激情的写下去,帮助更多的人。

¥我需要走的更远,点击我 赞助。 如果还有疑问,点击我加群,为你提供最好的解答。

本文由:link网站目录 https://www.votolink.com/ 收录整理发布

  admin

注册时间:

网站:0 个   小程序:3 个  文章:0 篇

  • 264

    网站

  • 8

    小程序

  • 263051

    文章

  • 36

    会员

赶快注册账号,推广您的网站吧!
热门网站
最新入驻小程序

数独大挑战2018-06-03

数独一种数学游戏,玩家需要根据9

答题星2018-06-03

您可以通过答题星轻松地创建试卷

全阶人生考试2018-06-03

各种考试题,题库,初中,高中,大学四六

运动步数有氧达人2018-06-03

记录运动步数,积累氧气值。还可偷

每日养生app2018-06-03

每日养生,天天健康

体育训练成绩评定2018-06-03

通用课目体育训练成绩评定