以下案例基于Java实现(若基于其他语言实现,需注意pulsar客户端版本与服务端2.8.1版本匹配)
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>2.8.1</version>
</dependency>
package com.marktrace.animal.common;
import com.marktrace.animal.common.config.pulsar.PulsarConfig;
import org.apache.pulsar.client.api.*;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Order(1)
@Component
public class PulsarConsumer implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://node1:6650,node2:6650,node3:6650")//服务连接地址,该地址仅为示例
.authentication(AuthenticationFactory.token("云上畜牧平台分配的token"))
.listenerName("external")
.build();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("云上畜牧平台分配的topic")
.subscriptionName(pulsarConfig.getTenantId())
.subscriptionType(SubscriptionType.Key_Shared)
// 设置支持批量读取参数配置
.batchReceivePolicy(
BatchReceivePolicy.builder()
.maxNumBytes(1024 * 1024)
.maxNumMessages(100)
.timeout(5, TimeUnit.SECONDS)
.build()
).subscribe();
while (true) {
//读取消息(批量)
Messages<String> messages = consumer.batchReceive();
//获取消息数据
for (Message<String> message : messages) {
String mqMsg = message.getValue();
System.out.println("消息数据为:"+mqMsg);
//例如:{"pushType":1,"data":{"event":2,"pushTime":"2022-08-25 17:25:10","warnDesc":{"markId":653595043893874611,"labelNumber":"100000100016511","farmId":653214654679482311,"farmName":"中融松坪山养殖场","houseId":653329956331323311,"houseName":"1舍","columnId":653595040794284011,"columnName":"1号栏","createTime":"2022-05-06 02:43:45","deviceCode":"202108161010211","errMsg":null}}}
//3.3 ack确认
consumer.acknowledge(message);
}
}
}
}
建议您取到数据后及时将数据存储或转发出去,不要在消费端里做复杂业务操作。#
注意:您可能有测试环境,部署时要注意不要用生产环境的密钥,这样可能会导致数据被测试环境的消费者消费掉,让您误判生产环境的消费者没有收到数据。您可通过修改订阅者名字(subscriptionName)区分生产和测试环境
public class TenantMsgPushDto {
/**
* 1 预警(对应data均为WarnInfo类单条数据封装)
* 2 实时体温(对应data为集合)
* 3 设备心跳记录(2023-05-30新增)
* 4 耳标替换推送(2023-11-29新增,测试中)
* 后续可能增加其他类型,建议通过枚举或ifelse判断控制
*/
private Integer pushType;
/**
* 数据内容
*/
private Object data;
//省略set、get方法
}
{
"pushType": 1,
"data": {
"event": 2,
"pushTime": "2022-08-25 17:25:10",
"warnDesc": {
"markId": 653595043893874688,
"labelNumber": "100000100016592",
"farmId": 653214654679482368,
"farmName": "洮南王忠和养殖场",
"houseId": 653329956331323392,
"houseName": "1舍",
"columnId": 653595040794284032,
"columnName": "1号栏",
"createTime": "2022-05-06 02:43:45",
"deviceCode": "202108161010235",
"errMsg": null
}
}
}
{
"pushType": 2,
"data": [
{
"temperature": 37.51,
"xAxis": -10,
"yAxis": -13,
"zAxis": 20,
"collectTime": "2022-05-21 13:00:00",
"deviceCode": "5F5G4H2J8E003C",
"tagVersion": "V0.6",
"tagStatus": 1,
"labelNumber": "100000100012345",
"tagType": 7,
"singalStrength": -87,
"envTemp": -15.15,
"envHumidity": 85
}
]
}
{
"pushType": 3, //设备心跳记录(若设备带有定位模块,则推送时会携带2分钟内最后一次上报的经纬度,2024-05-27新增说明)
"data": {
"netSignalStrength": 21,
"uploadType": "GPRS",
"firmwareVersion": "V11.3",
"deviceCode": "202106251010196",
"longitude":"113.123456",
"latitude":"22.123456",
"time": "2023-05-25 16:15:10"
}
}
{
"pushType": 4,
"data": {
"oldLabelNumber": "15位原耳标号",
"newLabelNumber": "15位新耳标号",
"updateTime": "2023-11-29 14:00:15",
}
}
public class WarnInfo {
/**
* 1 健康预警
* 2 非法离栏
* 3 死亡预警
* 4 分娩提醒
* 5 发情提醒
* 6 基站异常提醒
* 7 耳标异常提醒
*/
private Integer event;
/**
* 推送至pulsar的时间(非NULL)
* yyyy-MM-dd HH:mm:ss
*/
private String pushTime;
/**
* 预警详细内容
*/
private WarnDesc warnDesc;
//省略set、get方法
}
public class WarnDesc {
/**
* 牲畜唯一身份id(除分娩/发情/纯视觉非法离栏预警外,其他场景均非NULL)
*/
private Long markId;
/**
* 牲畜耳标号(除分娩/发情/纯视觉非法离栏预警外,其他场景均非NULL)
*/
private String labelNumber;
/**
* 牲畜所属养殖场id(可NULL)
*/
private Long farmId;
/**
* 牲畜所属养殖场名称(可NULL)
*/
private String farmName;
/**
* 牲畜所属养殖栋id(可NULL)
*/
private Long houseId;
/**
* 牲畜所属养殖栋名称(可NULL)
*/
private String houseName;
/**
* 牲畜所属养殖栏id(可NULL)
*/
private Long columnId;
/**
* 牲畜所属养殖栏名称(可NULL)
*/
private String columnName;
/**
* 生成预警时间(非NULL)
*/
private String createTime;
/**
* 设备号(可NULL)
*/
private String deviceCode;
/**
* 异常说明(可NULL)
*/
private String errMsg;
/**
* 2024-07-05新增
* 预警事件图片(摄像机触发纯视觉方案的非法离栏模型时会附带http格式的图片url)
*/
private String warnImg;
//省略set、get方法
}
Modified at 2025-03-07 09:12:30