403  
查询码:00000011
Docker(Windows版)安装zookeeper+kafka
作者: 潘帅 于 2020年04月27日 发布在分类 / 人防组 / 人防后端 下,并于 2020年04月27日 编辑
docker zookeeper kafka dotnetcore.cap

近期在对已经交付的项目维护时需要还原现场发现的关于kafka数据订阅的问题,但是又不想在自己电脑上装一堆东西,所以想到了用Docker安装kafka用来还原生产环境进行测试。


1.Docker安装zookeeper

启动Docker服务,打开CMD窗口或powershell窗口。

搜索zookeeper镜像

docker search zookeeper
拉取zookeeper镜像

docker pull wurstmeister/zookeeper
创建并启动zookeeper容器

docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper

一切看起来很顺利,到这里却卡壳了,这一步执行始终不成功,找了半天资料才发现是zookeeper默认使用是2181端口被占用了,占用者又恰好是Docker所依赖的Hyper-V虚拟化服务。解决办法看下面了。


1.1.解决2181端口占用问题

由于Hyper-V占用的2181端口,所以必须将Hyper-V停掉后把2181释放出来,但是没了Hyper-V,Dokcer就不能运行了,所以改完还要将Hyper-V启动。

查看端口占用情况

netsh interface ipv4 show excludedportrange protocol=tcp
禁用Hyper-V服务

dism.exe /Online /Disable-Feature:Microsoft-Hyper-V
解除2181端口限制

netsh int ipv4 add excludedportrange protocol=tcp startport=2181 numberofports=1
启用Hyper-V服务

dism.exe /Online /Enable-Feature:Microsoft-Hyper-V /All
检查确认端口占用情况

netsh interface ipv4 show excludedportrange protocol=tcp

问题解决,zookeeper容器顺利创建并启动

2.Docker安装kafka

搜索镜像kafka

docker search kafka

拉取kafka镜像

docker pull wurstmeister/kafka

创建并启动kafka容器,连接到上一步创建的zookeeper

docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka

kafka容器启动成功,在Docker的Dashboard中可以看到已启动的容器。


3.客户端工具kafka-tool连接zookeeper

安装客户端工具kafka-tool,连接zookeeper,创建topic用于测试。

4.测试代码编写

测试代码是将原有的独立kafka消费端迁移到Web服务端,采用DotnetCore.CAP将原有功能在ASP.NET Core项目中重新实现。


4.1.注入DotnetCore.CAP

为Web项目安装DotnetCore.CAP.Kafka和DotnetCore.CAP.SqlServer程序包,在Startup.cs中注入CAP服务。

在ConfigureServices方法中配置服务

            services.AddCap(x =>
            {
                x.UseSqlServer(_appConfiguration["ConnectionStrings:Default"]);
                x.UseKafka(_appConfiguration["MQ:Host"]);
                x.UseDashboard();
            });
在Configure方法中配置面板

            app.UseCapDashboard();


4.2.订阅kafka数据

新建ASP.NET Core控制器并声明CAP服务,添加topic订阅方法。

    public class MessageController : RunGoControllerBase
    {
        private readonly ICapPublisher _publisher;
        public MessageController(ICapPublisher publisher)
        {
            _publisher = publisher;
        }

        [CapSubscribe(RunGoWebConsts.DataTopic)]
        public void DataMonitor(string message)
        {
            try
            {
                ProjectData<SensorData> projectData = message.FromJsonString<ProjectData<SensorData>>();
                _sensorManager.HandleData(projectData);            }
            catch (Exception ex)
            {
                throw new UserFriendlyException(ex.Message);
            }

        }

        [CapSubscribe(RunGoWebConsts.AlarmTopic)]
        public void AlarmMonitor(string message)
        {
            try
            {
                ProjectData<AlarmSensorData> projectData = message.FromJsonString<ProjectData<AlarmSensorData>>();

            }
            catch (Exception ex)
            {

                throw new UserFriendlyException(ex.Message);
            }

        }

        [CapSubscribe(RunGoWebConsts.WechatTopic)]
        public void WechatMonitor(string message)
        {
            try
            {
                MessageData data = message.FromJsonString<MessageData>();

            }
            catch (Exception ex)
            {
                throw new UserFriendlyException(ex.Message);
            }

        }

    }


4.3.查看数据订阅/发布记录

启动站点访问路径http://xxx:xx/cap


DotnetCore.CAP提供了多种数据库和消息队列的支持,在消息队列的消息订阅/发布、数据同步、数据最终一致性上有着良好的实用性。




 推荐知识

 历史版本

修改日期 修改人 备注
2020-04-27 14:37:48[当前版本] 潘帅 1.0

 附件

附件类型

PNGPNG

知识分享平台 -V 4.8.7 -wcp