近期在对已经交付的项目维护时需要还原现场发现的关于kafka数据订阅的问题,但是又不想在自己电脑上装一堆东西,所以想到了用Docker安装kafka用来还原生产环境进行测试。
启动Docker服务,打开CMD窗口或powershell窗口。
搜索zookeeper镜像
docker search zookeeper拉取zookeeper镜像
docker pull wurstmeister/zookeeper创建并启动zookeeper容器
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
一切看起来很顺利,到这里却卡壳了,这一步执行始终不成功,找了半天资料才发现是zookeeper默认使用是2181端口被占用了,占用者又恰好是Docker所依赖的Hyper-V虚拟化服务。解决办法看下面了。
由于Hyper-V占用的2181端口,所以必须将Hyper-V停掉后把2181释放出来,但是没了Hyper-V,Dokcer就不能运行了,所以改完还要将Hyper-V启动。
查看端口占用情况
netsh interface ipv4 show excludedportrange protocol=tcp禁用Hyper-V服务
dism.exe /Online /Disable-Feature:Microsoft-Hyper-V解除2181端口限制
netsh int ipv4 add excludedportrange protocol=tcp startport=2181 numberofports=1启用Hyper-V服务
dism.exe /Online /Enable-Feature:Microsoft-Hyper-V /All检查确认端口占用情况
netsh interface ipv4 show excludedportrange protocol=tcp
问题解决,zookeeper容器顺利创建并启动
搜索镜像kafka
docker search kafka
拉取kafka镜像
docker pull wurstmeister/kafka
创建并启动kafka容器,连接到上一步创建的zookeeper
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
kafka容器启动成功,在Docker的Dashboard中可以看到已启动的容器。
安装客户端工具kafka-tool,连接zookeeper,创建topic用于测试。
测试代码是将原有的独立kafka消费端迁移到Web服务端,采用DotnetCore.CAP将原有功能在ASP.NET Core项目中重新实现。
为Web项目安装DotnetCore.CAP.Kafka和DotnetCore.CAP.SqlServer程序包,在Startup.cs中注入CAP服务。
在ConfigureServices方法中配置服务
services.AddCap(x => { x.UseSqlServer(_appConfiguration["ConnectionStrings:Default"]); x.UseKafka(_appConfiguration["MQ:Host"]); x.UseDashboard(); });在Configure方法中配置面板
app.UseCapDashboard();
新建ASP.NET Core控制器并声明CAP服务,添加topic订阅方法。
public class MessageController : RunGoControllerBase { private readonly ICapPublisher _publisher; public MessageController(ICapPublisher publisher) { _publisher = publisher; } [CapSubscribe(RunGoWebConsts.DataTopic)] public void DataMonitor(string message) { try { ProjectData<SensorData> projectData = message.FromJsonString<ProjectData<SensorData>>(); _sensorManager.HandleData(projectData); } catch (Exception ex) { throw new UserFriendlyException(ex.Message); } } [CapSubscribe(RunGoWebConsts.AlarmTopic)] public void AlarmMonitor(string message) { try { ProjectData<AlarmSensorData> projectData = message.FromJsonString<ProjectData<AlarmSensorData>>(); } catch (Exception ex) { throw new UserFriendlyException(ex.Message); } } [CapSubscribe(RunGoWebConsts.WechatTopic)] public void WechatMonitor(string message) { try { MessageData data = message.FromJsonString<MessageData>(); } catch (Exception ex) { throw new UserFriendlyException(ex.Message); } } }
启动站点访问路径http://xxx:xx/cap
DotnetCore.CAP提供了多种数据库和消息队列的支持,在消息队列的消息订阅/发布、数据同步、数据最终一致性上有着良好的实用性。