Skip to content

Advanced Guide

This section shows the implementation and related configuration of spring-cloud-statrer-stream-rocketmq.

Implementation of Spring Cloud Stream RocketMQ Binder

Architecture implementation

spring-cloud-statrer-stream-rocketmq removes the dependency on RocketMQ-Spring framework. The Spring Cloud Stream Binder core class RocketMQMessageChannelBinder implements the Spring Cloud Stream specification, and internally builds RocketMQInboundChannelAdapter and RocketMQProducerMessageHandler.

RocketMQProducerMessageHandler will construct RocketMQ Producer through RocketMQProduceFactory based on the Binding configuration, and internally convert the org.springframework.messaging.Message message class in the spring-messaging module into the RocketMQ message class org.apache.rocketmq.common.message.Message , and send it out.

RocketMQInboundChannelAdapter will also construct DefaultMQPushConsumer through RocketMQConsumerFactory based on Binding configuration, which will start RocketMQ Consumer to receive messages internally.

NOTE Compatibility with the RocketMQ-Spring framework needs to be handled manually.

Currently Binder supports setting related keys in the Header to set the characteristics of the RocketMQ Message.

For example, tags corresponding to RocketMQ messages such as TAGS, KEYS, TRANSACTIONAL_ARGS, etc., see com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst.

MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, "binder")
.setHeader(RocketMQHeaders.KEYS, "my-key");
Message message = builder.build();
output().send(message);

Or use StreamBridge:

MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, "binder")
.setHeader(RocketMQHeaders.KEYS, "my-key");
Message message = builder.build();
streamBridge.send("producer-out-0", message);

NOTE For more usage, please refer to the sample: com.alibaba.cloud.examples.SenderService.

More configuration item reference

Binder Properties

The configuration items prefixed with spring-cloud-starter-stream-rocketmq-binder are as follows:

Configuration itemKeyDefault valueDescription
RocketMQ NameServer addressspring.cloud.stream.rocketmq.binder.name-server127.0.0.1:9876Older versions use the namesrv-addr configuration item
Authentication public keyspring.cloud.stream.rocketmq.binder.access-keynullAliyun account AccessKey
Authentication private keyspring.cloud.stream.rocketmq.binder.secret-keynullAliyun account SecretKey
Message track functionspring.cloud.stream.rocketmq.binder.enable-msg-tracetrueWhether to enable the message track function for Producer and Consumer
Topic namespring.cloud.stream.rocketmq.binder.customized-trace-topicRMQ_SYS_TRACE_TOPICThe topic name stored after the message track is turned on

Consumer Properties

The configuration items prefixed with spring-cloud-starter-stream-rocketmq-binder-consumer are as follows:

Configuration itemKeyDefault valueDescription
Whether to enable Consumerspring.cloud.starter.stream.rocketmq.binder. consumer.enabletrue
Consumer Subscription based on TAGSspring.cloud.starter.stream.rocketmq.binder. consumer.subscriptionemptyMultiple tags are separated by &#124&#124. See subscription for more.
Consumer consumption modespring.cloud.starter.stream.rocketmq.binder. consumer.messageModelCLUSTERINGIf you want every subscriber to receive the message, you can use the broadcast mode. See MessageModel for more
Where does Consumer start consumingspring.cloud.starter.stream.rocketmq.binder. consumer.consumeFromWhereSee ConsumeFromWhere for more.

NOTE See more RocketMQConsumerProperties.

The configuration items prefixed with spring-cloud-starter-stream-rocketmq-binder-consumer-push are as follows:

Configuration itemKeyDefault valueDescription
Whether to consume message mode synchronouslyspring.cloud.starter.stream.rocketmq.binder. consumer.push.pushorderlyfalse
Consumption failure retry strategyspring.cloud.starter.stream.rocketmq.binder. consumer.push.delayLevelWhenNextConsume0In synchronous consumption message mode. -1, directly put into the dead letter queue without repeating. 0, the broker controls the retry strategy. 0, the client controls the retry policy.
Time interval for consumption again after consumption failurespring.cloud.starter.stream.rocketmq.binder. consumer.push.suspendCurrentQueueTimeMillis1000In synchronous consumption message mode.

NOTE For more parameters, see RocketMQConsumerProperties.Push.

The configuration items prefixed with spring-cloud-starter-stream-rocketmq-binder-consumer-pull are as follows:

Configuration itemKeyDefault valueDescription
The number of threads pulled when consumingspring.cloud.starter.stream.rocketmq. binder.consumer.pull.pullThreadNums20
Timeout in milliseconds when pullingspring.cloud.starter.stream.rocketmq. binder.consumer.push.pollTimeoutMillis1000 * 5

NOTE For more parameters, see RocketMQConsumerProperties.Pull.

Provider Properties

The configuration items prefixed with spring-cloud-starter-stream-rocketmq-binder-producer are as follows:

Configuration itemKeyDefault valueDescription
Whether to enable Producerspring.cloud.starter.stream.rocketmq. binder.producer.enabletrue
Producer cluster namespring.cloud.starter.stream.rocketmq. binder.producer.groupempty
Maximum number of bytes to send for a messagespring.cloud.starter.stream.rocketmq. binder.producer.maxMessageSize8249344
Message producer typespring.cloud.starter.stream.rocketmq. binder.producer.producerTypeNormalOrdinary or business. See more RocketMQProducerProperties.ProducerType
BeanName of the transaction message listenerspring.cloud.starter.stream.rocketmq. binder.producer.transactionListenerOnly valid when producerType=Trans; must implement TransactionListener interface Spring Bean
message sending typespring.cloud.starter.stream.rocketmq. binder.producer.sendTypeSyncSynchronous, asynchronous, one-way. See more RocketMQProducerProperties.SendType
The beanName of the callback function after the message is sentspring.cloud.starter.stream.rocketmq. binder.producer.sendCallBackOnly valid when sendType=Async; must implement SendCallback interface Spring Bean
Whether to send messages on Vip Channelspring.cloud.starter.stream.rocketmq. binder.producer.vipChannelEnabledtrue
Timeout for sending messagesspring.cloud.starter.stream.rocketmq. binder.producer.sendMessageTimeout3000In milliseconds
Message body compression thresholdspring.cloud.starter.stream.rocketmq. binder.producer.compressMessageBodyThreshold
In the mode of sending messages synchronously, the number of retries for message sending failuresspring.cloud.starter.stream.rocketmq. binder.producer.retryTimesWhenSendFailed2
In the mode of sending messages asynchronously, the number of retries for message sending failuresspring.cloud.starter.stream.rocketmq. binder.producer.retryTimesWhenSendAsyncFailed2
Whether to retry other brokers in case of message sending failurespring.cloud.starter.stream.rocketmq. binder.producer.retryAnotherBrokerfalse

NOTE For more parameters of the producer, please see: RocketMQProducerProperties.