跳转到内容

进阶指南

本章节展示 spring-cloud-statrer-stream-rocketmq 的实现和相关配置。

Spring Cloud Stream RocketMQ Binder 的实现

架构实现

spring-cloud-statrer-stream-rocketmq 去除了对 RocketMQ-Spring 框架的依赖 。 Spring Cloud Stream Binder 核心类 RocketMQMessageChannelBinder 实现了 Spring Cloud Stream 规范,内部会构建 RocketMQInboundChannelAdapter 和 RocketMQProducerMessageHandler。

RocketMQProducerMessageHandler 会基于 Binding 配置通过 RocketMQProduceFactory 构造 RocketMQ Producer,其内部会把 spring-messaging 模块内 org.springframework.messaging.Message 消息类转换成 RocketMQ 的消息类 org.apache.rocketmq.common.message.Message,然后发送出去。

RocketMQInboundChannelAdapter 也会基于 Binding 配置通过 RocketMQConsumerFactory 构造 DefaultMQPushConsumer,其内部会启动 RocketMQ Consumer 接收消息。

NOTERocketMQ-Spring 框架的兼容需要手动处理。

目前 Binder 支持在 Header 中设置相关的 key 来进行 RocketMQ Message 消息的特性设置。

比如 TAGS、KEYS、TRANSACTIONAL_ARGS 等 RocketMQ 消息对应的标签,详情见 com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst

MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, "binder")
.setHeader(RocketMQHeaders.KEYS, "my-key");
Message message = builder.build();
output().send(message);

或者使用 StreamBridge:

MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, "binder")
.setHeader(RocketMQHeaders.KEYS, "my-key");
Message message = builder.build();
streamBridge.send("producer-out-0", message);

NOTE 更多使用请参考样例:com.alibaba.cloud.examples.SenderService

更多配置项参考

绑定器配置

关于以 spring-cloud-starter-stream-rocketmq-binder 为前缀的配置项如下所示:

配置项key默认值说明
RocketMQ NameServer 地址spring.cloud.stream.rocketmq.binder.name-server127.0.0.1:9876老版本使用 namesrv-addr 配置项
身份验证公钥spring.cloud.stream.rocketmq.binder.access-keynull阿里云账号 AccessKey
身份验证私钥spring.cloud.stream.rocketmq.binder.secret-keynull阿里云账号 SecretKey
消息轨迹功能spring.cloud.stream.rocketmq.binder.enable-msg-tracetrue是否为 Producer 和 Consumer 开启消息轨迹功能
topic 名称spring.cloud.stream.rocketmq.binder.customized-trace-topicRMQ_SYS_TRACE_TOPIC消息轨迹开启后存储的 topic 名称

消息消费者配置

关于以 spring-cloud-starter-stream-rocketmq-binder-consumer 为前缀的配置项如下所示:

配置项key默认值说明
是否启用 Consumerspring.cloud.starter.stream.rocketmq.binder. consumer.enabletrue
Consumer 基于 TAGS 订阅spring.cloud.starter.stream.rocketmq.binder. consumer.subscriptionempty多个 tag 以 &#124&#124 分割。更多见 subscription
Consumer 消费模式spring.cloud.starter.stream.rocketmq.binder. consumer.messageModelCLUSTERING如果想让每一个的订阅者都能接收到消息,可以使用广播模式。更多见 MessageModel
Consumer 从哪里开始消费spring.cloud.starter.stream.rocketmq.binder. consumer.consumeFromWhere更多见 ConsumeFromWhere

NOTE 更多见 RocketMQConsumerProperties

关于以 spring-cloud-starter-stream-rocketmq-binder-consumer-push 为前缀的配置项如下所示:

配置项key默认值说明
是否同步消费消息模式spring.cloud.starter.stream.rocketmq.binder. consumer.push.pushorderlyfalse
消费失败重试策略spring.cloud.starter.stream.rocketmq.binder. consumer.push.delayLevelWhenNextConsume0同步消费消息模式下。-1,不重复直接放入死信队列。0,broker 控制重试策略。0,client 控制重试策略。
消费失败后再次消费的时间间隔spring.cloud.starter.stream.rocketmq.binder. consumer.push.suspendCurrentQueueTimeMillis1000同步消费消息模式下。

NOTE 其他更多参数见 RocketMQConsumerProperties.Push

关于以 spring-cloud-starter-stream-rocketmq-binder-consumer-pull 为前缀的配置项如下所示:

配置项key默认值说明
消费时拉取的线程数spring.cloud.starter.stream.rocketmq.binder.consumer.pull.pullThreadNums20
拉取时的超时毫秒数spring.cloud.starter.stream.rocketmq.binder.consumer.push.pollTimeoutMillis1000 * 5

NOTE 其他更多参数见 RocketMQConsumerProperties.Pull

消息生产者配置

关于以 spring-cloud-starter-stream-rocketmq-binder-producer 为前缀的配置项如下所示:

配置项key默认值说明
是否启用 Producerspring.cloud.starter.stream.rocketmq. binder.producer.enabletrue
生产者集群名称spring.cloud.starter.stream.rocketmq. binder.producer.groupempty
消息发送的最大字节数spring.cloud.starter.stream.rocketmq. binder.producer.maxMessageSize8249344
消息生产者类型spring.cloud.starter.stream.rocketmq. binder.producer.producerTypeNormal普通或者事务。更多见 RocketMQProducerProperties.ProducerType
事务消息监听器的 beanNamespring.cloud.starter.stream.rocketmq. binder.producer.transactionListenerproducerType=Trans 时才有效;必须是实现 TransactionListener 接口的 Spring Bean
消息发送类型spring.cloud.starter.stream.rocketmq. binder.producer.sendTypeSync同步、异步、单向。更多见RocketMQProducerProperties.SendType
消息发送后回调函数的 beanNamespring.cloud.starter.stream.rocketmq. binder.producer.sendCallBacksendType=Async 时才有效;必须是实现 SendCallback 接口的 Spring Bean
是否在 Vip Channel 上发送消息spring.cloud.starter.stream.rocketmq. binder.producer.vipChannelEnabledtrue
发送消息的超时时间spring.cloud.starter.stream.rocketmq. binder.producer.sendMessageTimeout3000单位为毫秒
消息体压缩阀值spring.cloud.starter.stream.rocketmq. binder.producer.compressMessageBodyThreshold
在同步发送消息的模式下,消息发送失败的重试次数spring.cloud.starter.stream.rocketmq. binder.producer.retryTimesWhenSendFailed2
在异步发送消息的模式下,消息发送失败的重试次数spring.cloud.starter.stream.rocketmq. binder.producer.retryTimesWhenSendAsyncFailed2
消息发送失败的情况下是否重试其它的 brokerspring.cloud.starter.stream.rocketmq. binder.producer.retryAnotherBrokerfalse

NOTE 生产者其他更多参数请见:RocketMQProducerProperties